diff --git a/src/it/scala/net/snowflake/spark/snowflake/TruncateTableSuite.scala b/src/it/scala/net/snowflake/spark/snowflake/TruncateTableSuite.scala new file mode 100644 index 00000000..541c28c9 --- /dev/null +++ b/src/it/scala/net/snowflake/spark/snowflake/TruncateTableSuite.scala @@ -0,0 +1,223 @@ +package net.snowflake.spark.snowflake + +import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SaveMode} + +import scala.util.Random + +class TruncateTableSuite extends IntegrationSuiteBase{ + val table = s"test_table_$randomSuffix" + + lazy val st1 = new StructType( + Array( + StructField("num1", LongType, nullable = false), + StructField("num2", FloatType, nullable = false) + ) + ) + + lazy val df1 = sqlContext.createDataFrame( + sc.parallelize(1 to 100).map[Row]( + _ => { + val rand = new Random(System.nanoTime()) + Row(rand.nextLong(), rand.nextFloat()) + } + ), + st1 + ) + + lazy val st2 = new StructType( + Array( + StructField("num1", IntegerType, nullable = false), + StructField("num2", IntegerType, nullable = false) + ) + ) + + lazy val df2 = sqlContext.createDataFrame( + sc.parallelize(1 to 100).map[Row]( + _ => { + val rand = new Random(System.nanoTime()) + Row(rand.nextInt(), rand.nextInt()) + } + ), + st2 + ) + + override def beforeAll(): Unit = { + super.beforeAll() + } + + + test("use truncate table with staging table") { + + jdbcUpdate(s"drop table if exists $table") + + //create one table + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //replace previous table and overwrite schema + df1.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //truncate previous table and keep schema + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "on") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //check schema + assert(checkSchema1()) + + + + } + + test("use truncate table without staging table") { + + jdbcUpdate(s"drop table if exists $table") + + //create table + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //replace previous table and overwrite schema + df1.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //truncate table and keep schema + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "on") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //checker schema + assert(checkSchema1()) + + } + + test("don't truncate table with staging table") { + + jdbcUpdate(s"drop table if exists $table") + + //create one table + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //replace previous table and overwrite schema + df1.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //truncate previous table and overwrite schema + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "on") + .mode(SaveMode.Overwrite) + .save() + + //check schema + assert(checkSchema2()) + } + test("don't truncate table without staging table") { + + jdbcUpdate(s"drop table if exists $table") + + //create one table + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //replace previous table and overwrite schema + df1.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //truncate previous table and overwrite schema + df2.write.format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", table) + .option("truncate_table", "off") + .option("usestagingtable", "off") + .mode(SaveMode.Overwrite) + .save() + + //check schema + assert(checkSchema2()) + } + + def checkSchema2(): Boolean = { + val st = DefaultJDBCWrapper.resolveTable(conn, table) + val st1 = new StructType( + Array( + StructField("NUM1", DecimalType(38, 0), nullable = false), + StructField("NUM2", DecimalType(38, 0), nullable = false) + ) + ) + st.equals(st1) + } + + def checkSchema1(): Boolean = { + val st = DefaultJDBCWrapper.resolveTable(conn, table) + val st1 = new StructType( + Array( + StructField("NUM1", DecimalType(38, 0), nullable = false), + StructField("NUM2", DoubleType, nullable = false) + ) + ) + st.equals(st1) + } + + + + + override def afterAll(): Unit = { + jdbcUpdate(s"drop table if exists $table") + super.afterAll() + } +} diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala index 8e91be19..bc4145f8 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala @@ -169,10 +169,10 @@ private[io] object StageWriter { jdbcWrapper.tableExists(conn, table.toString)){ if(params.useStagingTable){ if(params.truncateTable) - execute(s"create or replace $tempTable like $table") + execute(s"create or replace table $tempTable like $table") } else if(params.truncateTable) execute(s"truncate $table") - else execute(s"drop table is exists $table") + else execute(s"drop table if exists $table") } //create table