Skip to content

Commit

Permalink
Enable time travel for write statements with source queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Aug 14, 2023
1 parent cdb8329 commit 016f7b6
Show file tree
Hide file tree
Showing 5 changed files with 444 additions and 315 deletions.
151 changes: 87 additions & 64 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use object_store::path::Path;

use sqlparser::ast::{
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, FunctionDefinition,
ObjectType, Statement, TableFactor, TableWithJoins,
ObjectType, Query, Statement, TableFactor, TableWithJoins,
};

use arrow_schema::{DataType, TimeUnit};
Expand Down Expand Up @@ -516,6 +516,57 @@ impl DefaultSeafowlContext {
.try_for_each(|f| self.register_function(&f.name, &f.details))
}

// Determine if some of the tables reference a non-latest version using table function syntax.
// If so, rename the tables in the query by appending the explicit version to the name, and add
// it to the schema provider's map inside a new session state.
// Should become obsolete once `sqlparser-rs` introduces support for some form of the `AS OF`
// clause: https://en.wikipedia.org/wiki/SQL:2011.
async fn rewrite_time_travel_query(&self, q: &mut Query) -> Result<SessionState> {
let mut version_processor =
TableVersionProcessor::new(self.database.clone(), DEFAULT_SCHEMA.to_string());
version_processor.visit_query(q);

if version_processor.table_versions.is_empty() {
// No time-travel syntax detected, just return the regular session state
return Ok(self.inner.state());
}

debug!("Time travel query rewritten to: {}", q);

// Create a new session context and session state, to avoid potential race
// conditions leading to schema provider map leaking into other queries (and
// thus polluting e.g. the information_schema output), or even worse reloading
// the map and having the versioned query fail during execution.
let session_ctx = SessionContext::with_state(self.inner.state());

for (table, version) in &version_processor.table_versions {
let name_with_version =
TableVersionProcessor::table_with_version(table, version);

let full_table_name = table.to_string();
let mut resolved_ref = TableReference::from(full_table_name.as_str())
.resolve(&self.database, DEFAULT_SCHEMA);

// We only support datetime DeltaTable version specification for start
let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?;
let table_object_store =
self.internal_object_store.for_delta_table(table_uuid);
let datetime = TableVersionProcessor::version_to_datetime(version)?;

let mut delta_table = DeltaTable::new(table_object_store, Default::default());
delta_table.load_with_datetime(datetime).await?;
let table_provider_for_version = Arc::from(delta_table);

resolved_ref.table = Cow::Borrowed(name_with_version.as_str());

if !session_ctx.table_exist(resolved_ref.clone())? {
session_ctx.register_table(resolved_ref, table_provider_for_version)?;
}
}

Ok(session_ctx.state())
}

// Check that the TableReference doesn't have a database/schema in it.
// We create all external tables in the staging schema (backed by DataFusion's
// in-memory schema provider) instead.
Expand Down Expand Up @@ -933,57 +984,24 @@ impl SeafowlContext for DefaultSeafowlContext {
// planning phase. We could use a lighter structure for that, and implement `ContextProvider` for
// it rather than for DefaultSeafowlContext.
self.reload_schema().await?;
let state = self.inner.state();

match statement.clone() {
DFStatement::Statement(s) => match *s {
Statement::Query(mut q) => {
// Determine if some of the tables reference a non-latest version using table
// function syntax. If so, rename the tables in the query by appending the
// explicit version to the name, and add it to the schema provider's map.

let mut version_processor = TableVersionProcessor::new(self.database.clone(), DEFAULT_SCHEMA.to_string());
version_processor.visit_query(&mut q);

if !version_processor.table_versions.is_empty() {
debug!("Time travel query rewritten to: {}", q);

// Create a new session context and session state, to avoid potential race
// conditions leading to schema provider map leaking into other queries (and
// thus polluting e.g. the information_schema output), or even worse reloading
// the map and having the versioned query fail during execution.
let session_ctx = SessionContext::with_state(self.inner.state());

for (table, version) in &version_processor.table_versions {
let name_with_version =
TableVersionProcessor::table_with_version(table, version);

let full_table_name = table.to_string();
let mut resolved_ref = TableReference::from(full_table_name.as_str()).resolve(&self.database, DEFAULT_SCHEMA);

// We only support datetime DeltaTable version specification for start
let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?;
let table_object_store =
self.internal_object_store.for_delta_table(table_uuid);
let datetime = TableVersionProcessor::version_to_datetime(version)?;

let mut delta_table = DeltaTable::new(table_object_store, Default::default());
delta_table.load_with_datetime(datetime).await?;
let table_provider_for_version = Arc::from(delta_table);

resolved_ref.table = Cow::Borrowed(name_with_version.as_str());

if !session_ctx.table_exist(resolved_ref.clone())? {
session_ctx.register_table(resolved_ref, table_provider_for_version)?;
}
}

let state = session_ctx.state();
return state.statement_to_plan(DFStatement::Statement(Box::from(Statement::Query(q)))).await;
}

state.statement_to_plan(DFStatement::Statement(Box::from(Statement::Query(q)))).await
},
// Create a mutable clone of the statement so that we can rewrite table names if we encounter
// time travel syntax.
// Alternatively, this could be done without the `mut`, except then we'd need to construct
// and return a new `DFStatement` after the rewrite. In case of `Statement::Query` this is
// straight-forward as that enum variant contains a struct, however `Statement::Insert` and
// `Statement::CreateTable` wrap dozens of fields in curly braces, and we'd need to capture
// and re-populate each of those, so it would be very verbose.
// TODO: If `sqlparser-rs` encapsulates these fields inside a struct at some point remove
// the mut and go with @ capture and de-structuring to pass along other fields unchanged.
let mut stmt = statement.clone();

match &mut stmt {
DFStatement::Statement(ref mut s) => match &mut **s {
Statement::Query(ref mut query) => {
let state = self.rewrite_time_travel_query(query).await?;
state.statement_to_plan(stmt).await
}
// Delegate generic queries to the basic DataFusion logical planner
// (though note EXPLAIN [our custom query] will mean we have to implement EXPLAIN ourselves)
Statement::Explain { .. }
Expand All @@ -992,9 +1010,10 @@ impl SeafowlContext for DefaultSeafowlContext {
| Statement::ShowColumns { .. }
| Statement::CreateSchema { .. }
| Statement::CreateView { .. }
| Statement::CreateDatabase { .. } => state.statement_to_plan(statement).await,
Statement::Insert{ .. } => {
let plan = state.statement_to_plan(statement).await?;
| Statement::CreateDatabase { .. } => self.inner.state().statement_to_plan(stmt).await,
Statement::Insert{ ref mut source, .. } => {
let state = self.rewrite_time_travel_query(source).await?;
let plan = state.statement_to_plan(stmt).await?;
state.optimize(&plan)
}
Statement::Update {
Expand All @@ -1003,7 +1022,7 @@ impl SeafowlContext for DefaultSeafowlContext {
}
// We only support the most basic form of UPDATE (no aliases or FROM or joins)
if with_hints.is_empty() && joins.is_empty() => {
let plan = state.statement_to_plan(statement).await?;
let plan = self.inner.state().statement_to_plan(stmt).await?;

// Create a custom optimizer as a workaround for "Optimizer rule 'push_down_projection' failed",
// which seems to be a regression for the UPDATE statement planning.
Expand Down Expand Up @@ -1031,10 +1050,11 @@ impl SeafowlContext for DefaultSeafowlContext {
)
},
Statement::Delete{ .. } => {
let plan = state.statement_to_plan(statement).await?;
let state = self.inner.state();
let plan = state.statement_to_plan(stmt).await?;
state.optimize(&plan)
}
Statement::Drop { object_type: ObjectType::Table, .. } => state.statement_to_plan(statement).await,
Statement::Drop { object_type: ObjectType::Table, .. } => self.inner.state().statement_to_plan(stmt).await,
Statement::Drop { object_type: ObjectType::Schema,
if_exists: _,
names,
Expand Down Expand Up @@ -1062,12 +1082,12 @@ impl SeafowlContext for DefaultSeafowlContext {
&& table_properties.is_empty()
&& with_options.is_empty() =>
{
let schema = build_schema(columns)?;
let schema = build_schema(columns.to_vec())?;
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::CreateTable(CreateTable {
schema,
name: name.to_string(),
if_not_exists,
if_not_exists: *if_not_exists,
output_schema: Arc::new(DFSchema::empty())
})),
}))
Expand Down Expand Up @@ -1099,7 +1119,10 @@ impl SeafowlContext for DefaultSeafowlContext {

// Other CREATE TABLE: SqlToRel only allows CreateTableAs statements and makes
// a CreateMemoryTable node. We're fine with that, but we'll execute it differently.
Statement::CreateTable { .. } => state.statement_to_plan(statement).await,
Statement::CreateTable { query: Some(ref mut input), .. } => {
let state = self.rewrite_time_travel_query(input).await?;
state.statement_to_plan(stmt).await
},

Statement::CreateFunction {
or_replace,
Expand All @@ -1110,14 +1133,14 @@ impl SeafowlContext for DefaultSeafowlContext {
} => {
// We abuse the fact that in CREATE FUNCTION AS [class_name], class_name can be an arbitrary string
// and so we can get the user to put some JSON in there
let function_details: CreateFunctionDetails = serde_json::from_str(&details)
let function_details: CreateFunctionDetails = serde_json::from_str(details)
.map_err(|e| {
Error::Execution(format!("Error parsing UDF details: {e:?}"))
})?;

Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::CreateFunction(CreateFunction {
or_replace,
or_replace: *or_replace,
name: name.to_string(),
details: function_details,
output_schema: Arc::new(DFSchema::empty())
Expand Down Expand Up @@ -1155,7 +1178,7 @@ impl SeafowlContext for DefaultSeafowlContext {
func_desc.iter().map(|desc| desc.name.to_string()).collect();
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::DropFunction(DropFunction {
if_exists,
if_exists: *if_exists,
func_names,
output_schema: Arc::new(DFSchema::empty()),
}))
Expand All @@ -1166,7 +1189,7 @@ impl SeafowlContext for DefaultSeafowlContext {
))),
},
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
state.statement_to_plan(statement).await
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::CopyTo(_) => Err(Error::NotImplemented(format!(
"Unsupported SQL statement: {statement:?}"
Expand Down
36 changes: 19 additions & 17 deletions tests/statements/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ async fn test_insert_two_different_schemas(
.unwrap();
let results = context.collect(plan).await.unwrap();

let expected = ["+---------------------+------------+------------------+-----------------+----------------+",
let expected = [
"+---------------------+------------+------------------+-----------------+----------------+",
"| some_time | some_value | some_other_value | some_bool_value | some_int_value |",
"+---------------------+------------+------------------+-----------------+----------------+",
"| 2022-01-01T20:01:01 | 42.0 | | | 1111 |",
"| 2022-01-01T20:02:02 | 43.0 | | | 2222 |",
"| 2022-01-01T20:03:03 | 44.0 | | | 3333 |",
"| 2022-01-01T20:01:01 | 42.0 | 1.0000000000 | | 1111 |",
"| 2022-01-01T20:02:02 | 43.0 | 1.0000000000 | | 2222 |",
"| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |",
"| | 41.0 | 2.1500000000 | false | |",
"| | 45.0 | 9.1200000000 | true | |",
"| | | 44.3400000000 | false | |",
"+---------------------+------------+------------------+-----------------+----------------+"];
"+---------------------+------------+------------------+-----------------+----------------+",
];
assert_batches_eq!(expected, &results);
}

Expand Down Expand Up @@ -376,18 +378,18 @@ async fn test_update_statement(
"+---------------------+------------+------------------+-----------------+----------------+",
"| some_time | some_value | some_other_value | some_bool_value | some_int_value |",
"+---------------------+------------+------------------+-----------------+----------------+",
"| 2022-01-01T21:21:21 | 31.0 | | | 5555 |",
"| 2022-01-01T21:21:21 | 32.0 | | | 5555 |",
"| 2022-01-01T21:21:21 | 32.0 | | | 5555 |",
"| 2022-01-01T21:21:21 | 33.0 | | | 5555 |",
"| | 40.0 | | | |",
"| 2022-01-01T20:03:03 | 44.0 | | | 3333 |",
"| | 45.0 | | | |",
"| | 46.0 | | | |",
"| | 46.0 | | | |",
"| | 47.0 | | | |",
"| | 47.0 | | | |",
"| | 48.0 | | | |",
"| 2022-01-01T21:21:21 | 31.0 | 4.0000000000 | | 5555 |",
"| 2022-01-01T21:21:21 | 32.0 | 1.0000000000 | | 5555 |",
"| 2022-01-01T21:21:21 | 32.0 | 4.0000000000 | | 5555 |",
"| 2022-01-01T21:21:21 | 33.0 | 1.0000000000 | | 5555 |",
"| | 40.0 | 4.0000000000 | | |",
"| 2022-01-01T20:03:03 | 44.0 | 1.0000000000 | | 3333 |",
"| | 45.0 | 2.0000000000 | | |",
"| | 46.0 | 2.0000000000 | | |",
"| | 46.0 | 3.0000000000 | | |",
"| | 47.0 | 2.0000000000 | | |",
"| | 47.0 | 3.0000000000 | | |",
"| | 48.0 | 3.0000000000 | | |",
"+---------------------+------------+------------------+-----------------+----------------+",
];
assert_batches_eq!(expected, &results);
Expand Down
Loading

0 comments on commit 016f7b6

Please sign in to comment.