From 016f7b61365a52bd06b5d4f263286b90e79e7214 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 14 Aug 2023 13:32:59 +0200 Subject: [PATCH] Enable time travel for write statements with source queries --- src/context.rs | 151 ++++++++------- tests/statements/dml.rs | 36 ++-- tests/statements/mod.rs | 19 +- tests/statements/query.rs | 234 +---------------------- tests/statements/time_travel.rs | 319 ++++++++++++++++++++++++++++++++ 5 files changed, 444 insertions(+), 315 deletions(-) create mode 100644 tests/statements/time_travel.rs diff --git a/src/context.rs b/src/context.rs index 1ecc1cf2..b5f3d962 100644 --- a/src/context.rs +++ b/src/context.rs @@ -31,7 +31,7 @@ use object_store::path::Path; use sqlparser::ast::{ AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, FunctionDefinition, - ObjectType, Statement, TableFactor, TableWithJoins, + ObjectType, Query, Statement, TableFactor, TableWithJoins, }; use arrow_schema::{DataType, TimeUnit}; @@ -516,6 +516,57 @@ impl DefaultSeafowlContext { .try_for_each(|f| self.register_function(&f.name, &f.details)) } + // Determine if some of the tables reference a non-latest version using table function syntax. + // If so, rename the tables in the query by appending the explicit version to the name, and add + // it to the schema provider's map inside a new session state. + // Should become obsolete once `sqlparser-rs` introduces support for some form of the `AS OF` + // clause: https://en.wikipedia.org/wiki/SQL:2011. + async fn rewrite_time_travel_query(&self, q: &mut Query) -> Result { + let mut version_processor = + TableVersionProcessor::new(self.database.clone(), DEFAULT_SCHEMA.to_string()); + version_processor.visit_query(q); + + if version_processor.table_versions.is_empty() { + // No time-travel syntax detected, just return the regular session state + return Ok(self.inner.state()); + } + + debug!("Time travel query rewritten to: {}", q); + + // Create a new session context and session state, to avoid potential race + // conditions leading to schema provider map leaking into other queries (and + // thus polluting e.g. the information_schema output), or even worse reloading + // the map and having the versioned query fail during execution. + let session_ctx = SessionContext::with_state(self.inner.state()); + + for (table, version) in &version_processor.table_versions { + let name_with_version = + TableVersionProcessor::table_with_version(table, version); + + let full_table_name = table.to_string(); + let mut resolved_ref = TableReference::from(full_table_name.as_str()) + .resolve(&self.database, DEFAULT_SCHEMA); + + // We only support datetime DeltaTable version specification for start + let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?; + let table_object_store = + self.internal_object_store.for_delta_table(table_uuid); + let datetime = TableVersionProcessor::version_to_datetime(version)?; + + let mut delta_table = DeltaTable::new(table_object_store, Default::default()); + delta_table.load_with_datetime(datetime).await?; + let table_provider_for_version = Arc::from(delta_table); + + resolved_ref.table = Cow::Borrowed(name_with_version.as_str()); + + if !session_ctx.table_exist(resolved_ref.clone())? { + session_ctx.register_table(resolved_ref, table_provider_for_version)?; + } + } + + Ok(session_ctx.state()) + } + // Check that the TableReference doesn't have a database/schema in it. // We create all external tables in the staging schema (backed by DataFusion's // in-memory schema provider) instead. @@ -933,57 +984,24 @@ impl SeafowlContext for DefaultSeafowlContext { // planning phase. We could use a lighter structure for that, and implement `ContextProvider` for // it rather than for DefaultSeafowlContext. self.reload_schema().await?; - let state = self.inner.state(); - - match statement.clone() { - DFStatement::Statement(s) => match *s { - Statement::Query(mut q) => { - // Determine if some of the tables reference a non-latest version using table - // function syntax. If so, rename the tables in the query by appending the - // explicit version to the name, and add it to the schema provider's map. - - let mut version_processor = TableVersionProcessor::new(self.database.clone(), DEFAULT_SCHEMA.to_string()); - version_processor.visit_query(&mut q); - if !version_processor.table_versions.is_empty() { - debug!("Time travel query rewritten to: {}", q); - - // Create a new session context and session state, to avoid potential race - // conditions leading to schema provider map leaking into other queries (and - // thus polluting e.g. the information_schema output), or even worse reloading - // the map and having the versioned query fail during execution. - let session_ctx = SessionContext::with_state(self.inner.state()); - - for (table, version) in &version_processor.table_versions { - let name_with_version = - TableVersionProcessor::table_with_version(table, version); - - let full_table_name = table.to_string(); - let mut resolved_ref = TableReference::from(full_table_name.as_str()).resolve(&self.database, DEFAULT_SCHEMA); - - // We only support datetime DeltaTable version specification for start - let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?; - let table_object_store = - self.internal_object_store.for_delta_table(table_uuid); - let datetime = TableVersionProcessor::version_to_datetime(version)?; - - let mut delta_table = DeltaTable::new(table_object_store, Default::default()); - delta_table.load_with_datetime(datetime).await?; - let table_provider_for_version = Arc::from(delta_table); - - resolved_ref.table = Cow::Borrowed(name_with_version.as_str()); - - if !session_ctx.table_exist(resolved_ref.clone())? { - session_ctx.register_table(resolved_ref, table_provider_for_version)?; - } - } - - let state = session_ctx.state(); - return state.statement_to_plan(DFStatement::Statement(Box::from(Statement::Query(q)))).await; - } - - state.statement_to_plan(DFStatement::Statement(Box::from(Statement::Query(q)))).await - }, + // Create a mutable clone of the statement so that we can rewrite table names if we encounter + // time travel syntax. + // Alternatively, this could be done without the `mut`, except then we'd need to construct + // and return a new `DFStatement` after the rewrite. In case of `Statement::Query` this is + // straight-forward as that enum variant contains a struct, however `Statement::Insert` and + // `Statement::CreateTable` wrap dozens of fields in curly braces, and we'd need to capture + // and re-populate each of those, so it would be very verbose. + // TODO: If `sqlparser-rs` encapsulates these fields inside a struct at some point remove + // the mut and go with @ capture and de-structuring to pass along other fields unchanged. + let mut stmt = statement.clone(); + + match &mut stmt { + DFStatement::Statement(ref mut s) => match &mut **s { + Statement::Query(ref mut query) => { + let state = self.rewrite_time_travel_query(query).await?; + state.statement_to_plan(stmt).await + } // Delegate generic queries to the basic DataFusion logical planner // (though note EXPLAIN [our custom query] will mean we have to implement EXPLAIN ourselves) Statement::Explain { .. } @@ -992,9 +1010,10 @@ impl SeafowlContext for DefaultSeafowlContext { | Statement::ShowColumns { .. } | Statement::CreateSchema { .. } | Statement::CreateView { .. } - | Statement::CreateDatabase { .. } => state.statement_to_plan(statement).await, - Statement::Insert{ .. } => { - let plan = state.statement_to_plan(statement).await?; + | Statement::CreateDatabase { .. } => self.inner.state().statement_to_plan(stmt).await, + Statement::Insert{ ref mut source, .. } => { + let state = self.rewrite_time_travel_query(source).await?; + let plan = state.statement_to_plan(stmt).await?; state.optimize(&plan) } Statement::Update { @@ -1003,7 +1022,7 @@ impl SeafowlContext for DefaultSeafowlContext { } // We only support the most basic form of UPDATE (no aliases or FROM or joins) if with_hints.is_empty() && joins.is_empty() => { - let plan = state.statement_to_plan(statement).await?; + let plan = self.inner.state().statement_to_plan(stmt).await?; // Create a custom optimizer as a workaround for "Optimizer rule 'push_down_projection' failed", // which seems to be a regression for the UPDATE statement planning. @@ -1031,10 +1050,11 @@ impl SeafowlContext for DefaultSeafowlContext { ) }, Statement::Delete{ .. } => { - let plan = state.statement_to_plan(statement).await?; + let state = self.inner.state(); + let plan = state.statement_to_plan(stmt).await?; state.optimize(&plan) } - Statement::Drop { object_type: ObjectType::Table, .. } => state.statement_to_plan(statement).await, + Statement::Drop { object_type: ObjectType::Table, .. } => self.inner.state().statement_to_plan(stmt).await, Statement::Drop { object_type: ObjectType::Schema, if_exists: _, names, @@ -1062,12 +1082,12 @@ impl SeafowlContext for DefaultSeafowlContext { && table_properties.is_empty() && with_options.is_empty() => { - let schema = build_schema(columns)?; + let schema = build_schema(columns.to_vec())?; Ok(LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::CreateTable(CreateTable { schema, name: name.to_string(), - if_not_exists, + if_not_exists: *if_not_exists, output_schema: Arc::new(DFSchema::empty()) })), })) @@ -1099,7 +1119,10 @@ impl SeafowlContext for DefaultSeafowlContext { // Other CREATE TABLE: SqlToRel only allows CreateTableAs statements and makes // a CreateMemoryTable node. We're fine with that, but we'll execute it differently. - Statement::CreateTable { .. } => state.statement_to_plan(statement).await, + Statement::CreateTable { query: Some(ref mut input), .. } => { + let state = self.rewrite_time_travel_query(input).await?; + state.statement_to_plan(stmt).await + }, Statement::CreateFunction { or_replace, @@ -1110,14 +1133,14 @@ impl SeafowlContext for DefaultSeafowlContext { } => { // We abuse the fact that in CREATE FUNCTION AS [class_name], class_name can be an arbitrary string // and so we can get the user to put some JSON in there - let function_details: CreateFunctionDetails = serde_json::from_str(&details) + let function_details: CreateFunctionDetails = serde_json::from_str(details) .map_err(|e| { Error::Execution(format!("Error parsing UDF details: {e:?}")) })?; Ok(LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::CreateFunction(CreateFunction { - or_replace, + or_replace: *or_replace, name: name.to_string(), details: function_details, output_schema: Arc::new(DFSchema::empty()) @@ -1155,7 +1178,7 @@ impl SeafowlContext for DefaultSeafowlContext { func_desc.iter().map(|desc| desc.name.to_string()).collect(); Ok(LogicalPlan::Extension(Extension { node: Arc::new(SeafowlExtensionNode::DropFunction(DropFunction { - if_exists, + if_exists: *if_exists, func_names, output_schema: Arc::new(DFSchema::empty()), })) @@ -1166,7 +1189,7 @@ impl SeafowlContext for DefaultSeafowlContext { ))), }, DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => { - state.statement_to_plan(statement).await + self.inner.state().statement_to_plan(stmt).await } DFStatement::CopyTo(_) => Err(Error::NotImplemented(format!( "Unsupported SQL statement: {statement:?}" diff --git a/tests/statements/dml.rs b/tests/statements/dml.rs index 99c6886d..b5956f8d 100644 --- a/tests/statements/dml.rs +++ b/tests/statements/dml.rs @@ -25,16 +25,18 @@ async fn test_insert_two_different_schemas( .unwrap(); let results = context.collect(plan).await.unwrap(); - let expected = ["+---------------------+------------+------------------+-----------------+----------------+", + let expected = [ + "+---------------------+------------+------------------+-----------------+----------------+", "| some_time | some_value | some_other_value | some_bool_value | some_int_value |", "+---------------------+------------+------------------+-----------------+----------------+", - "| 2022-01-01T20:01:01 | 42.0 | | | 1111 |", - "| 2022-01-01T20:02:02 | 43.0 | | | 2222 |", - "| 2022-01-01T20:03:03 | 44.0 | | | 3333 |", + "| 2022-01-01T20:01:01 | 42.0 | 1.0000000000 | | 1111 |", + "| 2022-01-01T20:02:02 | 43.0 | 1.0000000000 | | 2222 |", + "| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |", "| | 41.0 | 2.1500000000 | false | |", "| | 45.0 | 9.1200000000 | true | |", "| | | 44.3400000000 | false | |", - "+---------------------+------------+------------------+-----------------+----------------+"]; + "+---------------------+------------+------------------+-----------------+----------------+", + ]; assert_batches_eq!(expected, &results); } @@ -376,18 +378,18 @@ async fn test_update_statement( "+---------------------+------------+------------------+-----------------+----------------+", "| some_time | some_value | some_other_value | some_bool_value | some_int_value |", "+---------------------+------------+------------------+-----------------+----------------+", - "| 2022-01-01T21:21:21 | 31.0 | | | 5555 |", - "| 2022-01-01T21:21:21 | 32.0 | | | 5555 |", - "| 2022-01-01T21:21:21 | 32.0 | | | 5555 |", - "| 2022-01-01T21:21:21 | 33.0 | | | 5555 |", - "| | 40.0 | | | |", - "| 2022-01-01T20:03:03 | 44.0 | | | 3333 |", - "| | 45.0 | | | |", - "| | 46.0 | | | |", - "| | 46.0 | | | |", - "| | 47.0 | | | |", - "| | 47.0 | | | |", - "| | 48.0 | | | |", + "| 2022-01-01T21:21:21 | 31.0 | 4.0000000000 | | 5555 |", + "| 2022-01-01T21:21:21 | 32.0 | 1.0000000000 | | 5555 |", + "| 2022-01-01T21:21:21 | 32.0 | 4.0000000000 | | 5555 |", + "| 2022-01-01T21:21:21 | 33.0 | 1.0000000000 | | 5555 |", + "| | 40.0 | 4.0000000000 | | |", + "| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |", + "| | 45.0 | 2.0000000000 | | |", + "| | 46.0 | 2.0000000000 | | |", + "| | 46.0 | 3.0000000000 | | |", + "| | 47.0 | 2.0000000000 | | |", + "| | 47.0 | 3.0000000000 | | |", + "| | 48.0 | 3.0000000000 | | |", "+---------------------+------------+------------------+-----------------+----------------+", ]; assert_batches_eq!(expected, &results); diff --git a/tests/statements/mod.rs b/tests/statements/mod.rs index 9b6c064a..19009e5b 100644 --- a/tests/statements/mod.rs +++ b/tests/statements/mod.rs @@ -36,6 +36,7 @@ mod query; #[allow(dead_code)] #[path = "../../src/testutils.rs"] mod testutils; +mod time_travel; mod vacuum; enum ObjectStoreType { @@ -182,10 +183,10 @@ async fn create_table_and_insert(context: &DefaultSeafowlContext, table_name: &s context .plan_query( format!( - "INSERT INTO {table_name:} (some_int_value, some_time, some_value) VALUES - (1111, '2022-01-01T20:01:01Z', 42), - (2222, '2022-01-01T20:02:02Z', 43), - (3333, '2022-01-01T20:03:03Z', 44)" + "INSERT INTO {table_name:} (some_int_value, some_other_value, some_time, some_value) VALUES + (1111, 1.0, '2022-01-01T20:01:01Z', 42), + (2222, 1.0, '2022-01-01T20:02:02Z', 43), + (3333, 1.0, '2022-01-01T20:03:03Z', 44)" ) .as_str(), ) @@ -240,7 +241,7 @@ async fn create_table_and_some_partitions( // Add another partition for table version 2 context .plan_query( - format!("INSERT INTO {table_name} (some_value) VALUES (45), (46), (47)") + format!("INSERT INTO {table_name} (some_value, some_other_value) VALUES (45, 2.0), (46, 2.0), (47, 2.0)") .as_str(), ) .await @@ -258,7 +259,7 @@ async fn create_table_and_some_partitions( // Add another partition for table_version 3 context .plan_query( - format!("INSERT INTO {table_name} (some_value) VALUES (46), (47), (48)") + format!("INSERT INTO {table_name} (some_value, some_other_value) VALUES (46, 3.0), (47, 3.0), (48, 3.0)") .as_str(), ) .await @@ -276,7 +277,7 @@ async fn create_table_and_some_partitions( // Add another partition for table_version 4 context .plan_query( - format!("INSERT INTO {table_name} (some_value) VALUES (42), (41), (40)") + format!("INSERT INTO {table_name} (some_value, some_other_value) VALUES (42, 4.0), (41, 4.0), (40, 4.0)") .as_str(), ) .await @@ -293,3 +294,7 @@ async fn create_table_and_some_partitions( (version_results, version_timestamps) } + +fn timestamp_to_rfc3339(timestamp: Timestamp) -> String { + Utc.timestamp_opt(timestamp, 0).unwrap().to_rfc3339() +} diff --git a/tests/statements/query.rs b/tests/statements/query.rs index b044a026..f07d6ab3 100644 --- a/tests/statements/query.rs +++ b/tests/statements/query.rs @@ -73,13 +73,15 @@ async fn test_create_table_and_insert() { .unwrap(); let results = context.collect(plan).await.unwrap(); - let expected = ["+---------------------+------------+------------------+-----------------+----------------+", + let expected = [ + "+---------------------+------------+------------------+-----------------+----------------+", "| some_time | some_value | some_other_value | some_bool_value | some_int_value |", "+---------------------+------------+------------------+-----------------+----------------+", - "| 2022-01-01T20:01:01 | 42.0 | | | 1111 |", - "| 2022-01-01T20:02:02 | 43.0 | | | 2222 |", - "| 2022-01-01T20:03:03 | 44.0 | | | 3333 |", - "+---------------------+------------+------------------+-----------------+----------------+"]; + "| 2022-01-01T20:01:01 | 42.0 | 1.0000000000 | | 1111 |", + "| 2022-01-01T20:02:02 | 43.0 | 1.0000000000 | | 2222 |", + "| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |", + "+---------------------+------------+------------------+-----------------+----------------+", + ]; assert_batches_eq!(expected, &results); @@ -115,228 +117,6 @@ async fn test_create_table_and_insert() { assert_batches_eq!(expected, &results); } -#[tokio::test] -async fn test_table_time_travel() { - let (context, _temp_dir) = make_context_with_pg(ObjectStoreType::Local).await; - let (version_results, version_timestamps) = create_table_and_some_partitions( - &context, - "test_table", - Some(Duration::from_secs(1)), - ) - .await; - - let timestamp_to_rfc3339 = |timestamp: Timestamp| -> String { - Utc.timestamp_opt(timestamp, 0).unwrap().to_rfc3339() - }; - - // - // Verify that the new table versions are shown in the corresponding system table - // - - let plan = context - .plan_query("SELECT table_schema, table_name, version FROM system.table_versions") - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - let expected = [ - "+--------------+------------+---------+", - "| table_schema | table_name | version |", - "+--------------+------------+---------+", - "| public | test_table | 0 |", - "| public | test_table | 1 |", - "| public | test_table | 2 |", - "| public | test_table | 3 |", - "| public | test_table | 4 |", - "+--------------+------------+---------+", - ]; - assert_batches_eq!(expected, &results); - - // - // Test that filtering the system table works, given that we provide all rows to DF and expect - // it to do it. - // - let plan = context - .plan_query( - format!( - " - SELECT version FROM system.table_versions \ - WHERE version < 4 AND creation_time > to_timestamp('{}') - ", - timestamp_to_rfc3339(version_timestamps[&1]) - ) - .as_str(), - ) - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - let expected = [ - "+---------+", - "| version |", - "+---------+", - "| 2 |", - "| 3 |", - "+---------+", - ]; - assert_batches_eq!(expected, &results); - - // - // Now use the recorded timestamps to query specific earlier table versions and compare them to - // the recorded results for that version. - // - - async fn query_table_version( - context: &DefaultSeafowlContext, - version_id: i64, - version_results: &HashMap>, - version_timestamps: &HashMap, - timestamp_converter: fn(Timestamp) -> String, - ) { - let plan = context - .plan_query( - format!( - "SELECT * FROM test_table('{}')", - timestamp_converter(version_timestamps[&version_id]) - ) - .as_str(), - ) - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - assert_eq!(version_results[&version_id], results); - } - - for version_id in [1, 2, 3, 4] { - query_table_version( - &context, - version_id, - &version_results, - &version_timestamps, - timestamp_to_rfc3339, - ) - .await; - } - - // - // Use multiple different version specifiers in the same complex query (including the latest - // version both explicitly and in the default notation). - // Ensures row differences between different versions are consistent: - // 4 - ((4 - 3) + (3 - 2) + (2 - 1)) = 1 - // - - let plan = context - .plan_query( - format!( - r#" - WITH diff_2_1 AS ( - SELECT * FROM test_table('{}') - EXCEPT - SELECT * FROM test_table('{}') - ), diff_3_2 AS ( - SELECT * FROM test_table('{}') - EXCEPT - SELECT * FROM test_table('{}') - ), diff_4_3 AS ( - SELECT * FROM test_table('{}') - EXCEPT - SELECT * FROM test_table('{}') - ) - SELECT * FROM test_table - EXCEPT ( - SELECT * FROM diff_4_3 - UNION - SELECT * FROM diff_3_2 - UNION - SELECT * FROM diff_2_1 - ) - ORDER BY some_int_value - "#, - timestamp_to_rfc3339(version_timestamps[&2]), - timestamp_to_rfc3339(version_timestamps[&1]), - timestamp_to_rfc3339(version_timestamps[&3]), - timestamp_to_rfc3339(version_timestamps[&2]), - timestamp_to_rfc3339(version_timestamps[&4]), - timestamp_to_rfc3339(version_timestamps[&3]), - ) - .as_str(), - ) - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - assert_eq!(version_results[&1], results); - - // Ensure the context table map contains the versioned + the latest table entries - assert_eq!( - sorted( - context - .inner() - .state() - .catalog_list() - .catalog(DEFAULT_DB) - .unwrap() - .schema(DEFAULT_SCHEMA) - .unwrap() - .table_names() - ) - .collect::>(), - vec![ - "test_table".to_string(), - format!( - "test_table:{}", - timestamp_to_rfc3339(version_timestamps[&1]).to_ascii_lowercase() - ), - format!( - "test_table:{}", - timestamp_to_rfc3339(version_timestamps[&2]).to_ascii_lowercase() - ), - format!( - "test_table:{}", - timestamp_to_rfc3339(version_timestamps[&3]).to_ascii_lowercase() - ), - format!( - "test_table:{}", - timestamp_to_rfc3339(version_timestamps[&4]).to_ascii_lowercase() - ), - ], - ); - - // - // Verify that information schema is not polluted with versioned tables/columns - // - - let results = list_tables_query(&context).await; - - let expected = [ - "+--------------------+-------------+", - "| table_schema | table_name |", - "+--------------------+-------------+", - "| information_schema | columns |", - "| information_schema | df_settings |", - "| information_schema | tables |", - "| information_schema | views |", - "| public | test_table |", - "+--------------------+-------------+", - ]; - assert_batches_eq!(expected, &results); - - let results = list_columns_query(&context).await; - - let expected = [ - "+--------------+------------+------------------+------------------------------+", - "| table_schema | table_name | column_name | data_type |", - "+--------------+------------+------------------+------------------------------+", - "| public | test_table | some_time | Timestamp(Microsecond, None) |", - "| public | test_table | some_value | Float32 |", - "| public | test_table | some_other_value | Decimal128(38, 10) |", - "| public | test_table | some_bool_value | Boolean |", - "| public | test_table | some_int_value | Int64 |", - "+--------------+------------+------------------+------------------------------+", - ]; - assert_batches_eq!(expected, &results); -} - // There's a regression in DF 22, where the two introspection tests fail with // "Cannot infer common argument type for comparison operation Date64 < Timestamp(Nanosecond, None)" // Disabling them for now. diff --git a/tests/statements/time_travel.rs b/tests/statements/time_travel.rs new file mode 100644 index 00000000..603c1e26 --- /dev/null +++ b/tests/statements/time_travel.rs @@ -0,0 +1,319 @@ +use crate::statements::*; + +#[tokio::test] +async fn test_read_time_travel() { + let (context, _temp_dir) = make_context_with_pg(ObjectStoreType::Local).await; + let (version_results, version_timestamps) = create_table_and_some_partitions( + &context, + "test_table", + Some(Duration::from_secs(1)), + ) + .await; + + // + // Verify that the new table versions are shown in the corresponding system table + // + + let plan = context + .plan_query("SELECT table_schema, table_name, version FROM system.table_versions") + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+--------------+------------+---------+", + "| table_schema | table_name | version |", + "+--------------+------------+---------+", + "| public | test_table | 0 |", + "| public | test_table | 1 |", + "| public | test_table | 2 |", + "| public | test_table | 3 |", + "| public | test_table | 4 |", + "+--------------+------------+---------+", + ]; + assert_batches_eq!(expected, &results); + + // + // Test that filtering the system table works, given that we provide all rows to DF and expect + // it to do it. + // + let plan = context + .plan_query( + format!( + " + SELECT version FROM system.table_versions \ + WHERE version < 4 AND creation_time > to_timestamp('{}') + ", + timestamp_to_rfc3339(version_timestamps[&1]) + ) + .as_str(), + ) + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+---------+", + "| version |", + "+---------+", + "| 2 |", + "| 3 |", + "+---------+", + ]; + assert_batches_eq!(expected, &results); + + // + // Now use the recorded timestamps to query specific earlier table versions and compare them to + // the recorded results for that version. + // + + async fn query_table_version( + context: &DefaultSeafowlContext, + version_id: i64, + version_results: &HashMap>, + version_timestamps: &HashMap, + timestamp_converter: fn(Timestamp) -> String, + ) { + let plan = context + .plan_query( + format!( + "SELECT * FROM test_table('{}')", + timestamp_converter(version_timestamps[&version_id]) + ) + .as_str(), + ) + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + assert_eq!(version_results[&version_id], results); + } + + for version_id in [1, 2, 3, 4] { + query_table_version( + &context, + version_id, + &version_results, + &version_timestamps, + timestamp_to_rfc3339, + ) + .await; + } + + // + // Use multiple different version specifiers in the same complex query (including the latest + // version both explicitly and in the default notation). + // Ensures row differences between different versions are consistent: + // 4 - ((4 - 3) + (3 - 2) + (2 - 1)) = 1 + // + + let plan = context + .plan_query( + format!( + r#" + WITH diff_2_1 AS ( + SELECT * FROM test_table('{}') + EXCEPT + SELECT * FROM test_table('{}') + ), diff_3_2 AS ( + SELECT * FROM test_table('{}') + EXCEPT + SELECT * FROM test_table('{}') + ), diff_4_3 AS ( + SELECT * FROM test_table('{}') + EXCEPT + SELECT * FROM test_table('{}') + ) + SELECT * FROM test_table + EXCEPT ( + SELECT * FROM diff_4_3 + UNION + SELECT * FROM diff_3_2 + UNION + SELECT * FROM diff_2_1 + ) + ORDER BY some_int_value + "#, + timestamp_to_rfc3339(version_timestamps[&2]), + timestamp_to_rfc3339(version_timestamps[&1]), + timestamp_to_rfc3339(version_timestamps[&3]), + timestamp_to_rfc3339(version_timestamps[&2]), + timestamp_to_rfc3339(version_timestamps[&4]), + timestamp_to_rfc3339(version_timestamps[&3]), + ) + .as_str(), + ) + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + assert_eq!(version_results[&1], results); + + // Ensure the context table map contains the versioned + the latest table entries + assert_eq!( + sorted( + context + .inner() + .state() + .catalog_list() + .catalog(DEFAULT_DB) + .unwrap() + .schema(DEFAULT_SCHEMA) + .unwrap() + .table_names() + ) + .collect::>(), + vec![ + "test_table".to_string(), + format!( + "test_table:{}", + timestamp_to_rfc3339(version_timestamps[&1]).to_ascii_lowercase() + ), + format!( + "test_table:{}", + timestamp_to_rfc3339(version_timestamps[&2]).to_ascii_lowercase() + ), + format!( + "test_table:{}", + timestamp_to_rfc3339(version_timestamps[&3]).to_ascii_lowercase() + ), + format!( + "test_table:{}", + timestamp_to_rfc3339(version_timestamps[&4]).to_ascii_lowercase() + ), + ], + ); + + // + // Verify that information schema is not polluted with versioned tables/columns + // + + let results = list_tables_query(&context).await; + + let expected = [ + "+--------------------+-------------+", + "| table_schema | table_name |", + "+--------------------+-------------+", + "| information_schema | columns |", + "| information_schema | df_settings |", + "| information_schema | tables |", + "| information_schema | views |", + "| public | test_table |", + "+--------------------+-------------+", + ]; + assert_batches_eq!(expected, &results); + + let results = list_columns_query(&context).await; + + let expected = [ + "+--------------+------------+------------------+------------------------------+", + "| table_schema | table_name | column_name | data_type |", + "+--------------+------------+------------------+------------------------------+", + "| public | test_table | some_time | Timestamp(Microsecond, None) |", + "| public | test_table | some_value | Float32 |", + "| public | test_table | some_other_value | Decimal128(38, 10) |", + "| public | test_table | some_bool_value | Boolean |", + "| public | test_table | some_int_value | Int64 |", + "+--------------+------------+------------------+------------------------------+", + ]; + assert_batches_eq!(expected, &results); +} + +#[tokio::test] +async fn test_write_time_travel() { + let (context, _temp_dir) = make_context_with_pg(ObjectStoreType::Local).await; + let (_version_results, version_timestamps) = create_table_and_some_partitions( + &context, + "test_table", + Some(Duration::from_secs(1)), + ) + .await; + + let timestamp_to_rfc3339 = |timestamp: Timestamp| -> String { + Utc.timestamp_opt(timestamp, 0).unwrap().to_rfc3339() + }; + + // + // Now create a new table from an earlier version of the input table, using the recorded + // timestamps and the time travel syntax + // + + context + .plan_query( + format!( + r#" + CREATE TABLE diff_table AS ( + SELECT * FROM test_table('{}') + EXCEPT + SELECT * FROM test_table('{}') + ORDER BY some_other_value, some_value + ) + "#, + timestamp_to_rfc3339(version_timestamps[&4]), + timestamp_to_rfc3339(version_timestamps[&2]), + ) + .as_str(), + ) + .await + .unwrap(); + + // Test that results are as epxected + let plan = context + .plan_query("SELECT some_value, some_other_value FROM diff_table") + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+------------+------------------+", + "| some_value | some_other_value |", + "+------------+------------------+", + "| 46.0 | 3.0000000000 |", + "| 47.0 | 3.0000000000 |", + "| 48.0 | 3.0000000000 |", + "| 40.0 | 4.0000000000 |", + "| 41.0 | 4.0000000000 |", + "| 42.0 | 4.0000000000 |", + "+------------+------------------+", + ]; + assert_batches_eq!(expected, &results); + + // + // Finally try to INSERT data from the first version of the table + // + + context + .plan_query( + format!( + r#" + INSERT INTO diff_table SELECT * FROM test_table('{}') + "#, + timestamp_to_rfc3339(version_timestamps[&1]), + ) + .as_str(), + ) + .await + .unwrap(); + + let plan = context + .plan_query("SELECT some_value, some_other_value FROM diff_table") + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+------------+------------------+", + "| some_value | some_other_value |", + "+------------+------------------+", + "| 46.0 | 3.0000000000 |", + "| 47.0 | 3.0000000000 |", + "| 48.0 | 3.0000000000 |", + "| 40.0 | 4.0000000000 |", + "| 41.0 | 4.0000000000 |", + "| 42.0 | 4.0000000000 |", + "| 42.0 | 1.0000000000 |", + "| 43.0 | 1.0000000000 |", + "| 44.0 | 1.0000000000 |", + "+------------+------------------+", + ]; + assert_batches_eq!(expected, &results); +}