Skip to content

Commit

Permalink
add TruncateTableSuite test
Browse files Browse the repository at this point in the history
  • Loading branch information
binglihub committed Jul 6, 2018
1 parent e0674b9 commit b07551c
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 2 deletions.
223 changes: 223 additions & 0 deletions src/it/scala/net/snowflake/spark/snowflake/TruncateTableSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package net.snowflake.spark.snowflake

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}

import scala.util.Random

class TruncateTableSuite extends IntegrationSuiteBase{
val table = s"test_table_$randomSuffix"

lazy val st1 = new StructType(
Array(
StructField("num1", LongType, nullable = false),
StructField("num2", FloatType, nullable = false)
)
)

lazy val df1 = sqlContext.createDataFrame(
sc.parallelize(1 to 100).map[Row](
_ => {
val rand = new Random(System.nanoTime())
Row(rand.nextLong(), rand.nextFloat())
}
),
st1
)

lazy val st2 = new StructType(
Array(
StructField("num1", IntegerType, nullable = false),
StructField("num2", IntegerType, nullable = false)
)
)

lazy val df2 = sqlContext.createDataFrame(
sc.parallelize(1 to 100).map[Row](
_ => {
val rand = new Random(System.nanoTime())
Row(rand.nextInt(), rand.nextInt())
}
),
st2
)

override def beforeAll(): Unit = {
super.beforeAll()
}


test("use truncate table with staging table") {

jdbcUpdate(s"drop table if exists $table")

//create one table
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//replace previous table and overwrite schema
df1.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//truncate previous table and keep schema
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "on")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//check schema
assert(checkSchema1())



}

test("use truncate table without staging table") {

jdbcUpdate(s"drop table if exists $table")

//create table
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//replace previous table and overwrite schema
df1.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//truncate table and keep schema
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "on")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//checker schema
assert(checkSchema1())

}

test("don't truncate table with staging table") {

jdbcUpdate(s"drop table if exists $table")

//create one table
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//replace previous table and overwrite schema
df1.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//truncate previous table and overwrite schema
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "on")
.mode(SaveMode.Overwrite)
.save()

//check schema
assert(checkSchema2())
}
test("don't truncate table without staging table") {

jdbcUpdate(s"drop table if exists $table")

//create one table
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//replace previous table and overwrite schema
df1.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//truncate previous table and overwrite schema
df2.write.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", table)
.option("truncate_table", "off")
.option("usestagingtable", "off")
.mode(SaveMode.Overwrite)
.save()

//check schema
assert(checkSchema2())
}

def checkSchema2(): Boolean = {
val st = DefaultJDBCWrapper.resolveTable(conn, table)
val st1 = new StructType(
Array(
StructField("NUM1", DecimalType(38, 0), nullable = false),
StructField("NUM2", DecimalType(38, 0), nullable = false)
)
)
st.equals(st1)
}

def checkSchema1(): Boolean = {
val st = DefaultJDBCWrapper.resolveTable(conn, table)
val st1 = new StructType(
Array(
StructField("NUM1", DecimalType(38, 0), nullable = false),
StructField("NUM2", DoubleType, nullable = false)
)
)
st.equals(st1)
}




override def afterAll(): Unit = {
jdbcUpdate(s"drop table if exists $table")
super.afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ private[io] object StageWriter {
jdbcWrapper.tableExists(conn, table.toString)){
if(params.useStagingTable){
if(params.truncateTable)
execute(s"create or replace $tempTable like $table")
execute(s"create or replace table $tempTable like $table")
}
else if(params.truncateTable) execute(s"truncate $table")
else execute(s"drop table is exists $table")
else execute(s"drop table if exists $table")
}

//create table
Expand Down

0 comments on commit b07551c

Please sign in to comment.