Skip to content

Commit

Permalink
Upgrade to DataFusion 43
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 12, 2024
1 parent 8a577e6 commit 89f8473
Show file tree
Hide file tree
Showing 31 changed files with 432 additions and 290 deletions.
498 changes: 311 additions & 187 deletions Cargo.lock

Large diffs are not rendered by default.

41 changes: 21 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,35 @@
members = ["clade", "object_store_factory"]

[workspace.dependencies]
arrow = { version = "52.2.0", features = ["test_utils"] }
arrow-buffer = "52.2.0"
arrow-csv = "52.2.0"
arrow-flight = "52.2.0"
arrow = { version = "53.2.0", features = ["test_utils"] }
arrow-buffer = "53.2.0"
arrow-csv = "53.2.0"
arrow-flight = "53.2.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "52.2.0"
arrow-row = "52.2.0"
arrow-schema = "52.2.0"
arrow-integration-test = "53.2.0"
arrow-row = "53.2.0"
arrow-schema = "53.2.0"
async-trait = "0.1.83"

datafusion = { version = "41.0.0", features = ["backtrace"] }
datafusion-common = "41.0.0"
datafusion-expr = "41.0.0"
datafusion-functions-nested = "41.0.0"
datafusion = { version = "43.0.0", features = ["backtrace"] }
datafusion-common = "43.0.0"
datafusion-expr = "43.0.0"
datafusion-functions-nested = "43.0.0"

futures = "0.3"

itertools = ">=0.10.0"
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
prost = "0.12.6"
object_store = { version = "0.11", features = ["aws", "azure", "gcp"] }
prost = "0.13"

serde = "1.0.213"
serde_json = "1.0.132"

tempfile = "3"
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
tonic = "0.12"
tracing = { version = "0.1", features = ["log"] }
tracing-log = "0.2"
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
Expand Down Expand Up @@ -83,8 +84,8 @@ clap = { version = "4.5.20", features = [ "derive" ] }
config = "0.14.0"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-43-upgrade", optional = true }

dashmap = "6.1.0"

Expand All @@ -95,7 +96,7 @@ datafusion-functions-nested = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { version = "0.21", features = ["datafusion"] }
deltalake = { git = "https://github.com/ion-elgreco/delta-rs", branch = "chore/bump_kernel", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -105,7 +106,7 @@ lazy_static = ">=1.4.0"
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.3" }
moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] }
object_store = { version = "0.10.2", features = ["aws", "azure", "gcp"] }
object_store = { workspace = true }
object_store_factory = { path = "object_store_factory" }
percent-encoding = "2.2.0"
prost = { workspace = true }
Expand All @@ -122,15 +123,15 @@ rustyline = "14.0"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = ">=0.10.1"
sqlparser = { version = "0.49", features = ["visitor"] }
sqlparser = { version = "0.51", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
thiserror = "1"
tokio = { workspace = true }
tokio-graceful-shutdown = { version = "0.15" }
tonic = { version = "0.11.0", optional = true }
tonic = { workspace = true, optional = true }
tower = "0.5"
tracing = { workspace = true }
tracing-log = "0.2"
Expand All @@ -152,7 +153,7 @@ aws-credential-types = { version = "1.2.1", features = ["hardcoded-credentials"]
aws-sdk-sts = { version = "1.46.0", features = ["behavior-version-latest"] }
rstest = "*"
serial_test = "3"
tonic-reflection = "0.11"
tonic-reflection = "0.12"
wiremock = "0.6"

[build-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions clade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "Apache-2.0"
[dependencies]
arrow-flight = { workspace = true }
prost = { workspace = true }
tonic = "0.11"
tonic = { workspace = true }

[build-dependencies]
tonic-build = "0.11"
tonic-build = "0.12"
2 changes: 1 addition & 1 deletion clade/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.build_client(true)
.type_attribute("clade.sync.ColumnDescriptor", "#[derive(Eq, Hash)]")
.compile(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;
.compile_protos(&["proto/schema.proto", "proto/sync.proto"], &["proto"])?;

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-41-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-43-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::ops::Deref;
use std::sync::Arc;

/// Factory for creating remote tables
#[derive(Debug)]
pub struct RemoteTableFactory {}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::task;
use tracing::debug;

// Implementation of a remote table, capable of querying Postgres, MySQL, SQLite, etc...
#[derive(Debug)]
pub struct RemoteTable {
// We manually escape the field names during scans, but expect the user to escape the table name
// appropriately in the remote table definition
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clade::schema::ListSchemaResponse;

use super::CatalogError;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct EmptyStore {}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tonic::transport::{channel::Channel, Endpoint, Error};
use tonic::Request;

// An external store, facilitated via a remote clade server implementation
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct ExternalStore {
client: SchemaStoreServiceClient<Channel>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::catalog::{
use crate::repository::interface::AllDatabaseFunctionsResult;
use clade::schema::ListSchemaResponse;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct MemoryStore {
pub schemas: ListSchemaResponse,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type LocationAndOptions = (String, HashMap<String, String>);
// This is the main entrypoint to all individual catalogs for various objects types.
// The intention is to make it extensible and de-coupled from the underlying metastore
// persistence mechanism (such as the presently used `Repository`).
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Metastore {
pub catalogs: Arc<dyn CatalogStore>,
pub schemas: Arc<dyn SchemaStore>,
Expand Down
9 changes: 5 additions & 4 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use clade::schema::ListSchemaResponse;
use datafusion_common::DataFusionError;
use std::fmt::Debug;
use tonic::Status;
use uuid::Uuid;

Expand Down Expand Up @@ -149,7 +150,7 @@ impl From<serde_json::Error> for CreateFunctionError {
pub type CatalogResult<T> = Result<T, CatalogError>;

#[async_trait]
pub trait CatalogStore: Sync + Send {
pub trait CatalogStore: Debug + Sync + Send {
async fn create(&self, _name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -164,7 +165,7 @@ pub trait CatalogStore: Sync + Send {
}

#[async_trait]
pub trait SchemaStore: Sync + Send {
pub trait SchemaStore: Debug + Sync + Send {
async fn create(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> {
not_impl()
}
Expand All @@ -187,7 +188,7 @@ pub trait SchemaStore: Sync + Send {
}

#[async_trait]
pub trait TableStore: Sync + Send {
pub trait TableStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down Expand Up @@ -275,7 +276,7 @@ pub trait TableStore: Sync + Send {
}

#[async_trait]
pub trait FunctionStore: Sync + Send {
pub trait FunctionStore: Debug + Sync + Send {
async fn create(
&self,
_catalog_name: &str,
Expand Down
1 change: 1 addition & 0 deletions src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::repository::interface::{
use crate::wasm_udf::data_types::CreateFunctionDetails;

// The native catalog implementation for Seafowl.
#[derive(Debug)]
pub struct RepositoryStore {
pub repository: Arc<dyn Repository>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_DB, DEFAULT_SCHEMA);

let runtime_env = RuntimeEnv::new(runtime_config)?;
let runtime_env = RuntimeEnv::try_new(runtime_config)?;
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
let context = SessionContext::new_with_state(state);

Expand Down
4 changes: 2 additions & 2 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ mod tests {
vec![
(
PART_0_FILE_NAME.to_string(),
1298,
1164,
true,
true,
json!({
Expand All @@ -606,7 +606,7 @@ mod tests {
),
(
PART_1_FILE_NAME.to_string(),
1313,
1176,
true,
true,
json!({
Expand Down
8 changes: 4 additions & 4 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ impl SeafowlContext {
})),
}))
},
Statement::Truncate { table: false, table_name, partitions, .. } => {
Statement::Truncate { table: false, table_names, partitions, .. } => {
let table_name = if partitions.is_none() {
Some(table_name.to_string())
Some(table_names[0].to_string())
} else {
None
};
Expand All @@ -268,10 +268,10 @@ impl SeafowlContext {
})),
}))
}
Statement::Truncate { table: true, table_name, .. } => {
Statement::Truncate { table: true, table_names, .. } => {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(SeafowlExtensionNode::Truncate(Truncate {
table_name: table_name.to_string(),
table_name: table_names[0].to_string(),
output_schema: Arc::new(DFSchema::empty())
})),
}))
Expand Down
2 changes: 1 addition & 1 deletion src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl SeafowlContext {
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
op: WriteOp::Insert(_),
input,
..
}) => {
Expand Down
24 changes: 14 additions & 10 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
pub use datafusion::sql::parser::Statement;
use datafusion::sql::parser::{CopyToSource, CopyToStatement, CreateExternalTable};
use lazy_static::lazy_static;
use sqlparser::ast::{CreateFunctionBody, Expr, ObjectName, OrderByExpr, Value};
use sqlparser::ast::{
CreateFunctionBody, Expr, ObjectName, OrderByExpr, TruncateTableTarget, Value,
};
use sqlparser::tokenizer::{TokenWithLocation, Word};
use sqlparser::{
ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint},
Expand Down Expand Up @@ -217,9 +219,12 @@ impl<'a> DFParser<'a> {
}

Ok(Statement::Statement(Box::new(SQLStatement::Truncate {
table_name,
table_names: vec![TruncateTableTarget { name: table_name }],
partitions,
table: false,
only: false,
identity: None,
cascade: None,
})))
}

Expand All @@ -231,9 +236,12 @@ impl<'a> DFParser<'a> {
let table_name = self.parser.parse_object_name(true)?;

Ok(Statement::Statement(Box::new(SQLStatement::Truncate {
table_name,
table_names: vec![TruncateTableTarget { name: table_name }],
partitions: None,
table: true,
only: false,
identity: None,
cascade: None,
})))
}

Expand Down Expand Up @@ -315,12 +323,7 @@ impl<'a> DFParser<'a> {
Token::SingleQuotedString(s) => Ok(Value::SingleQuotedString(s)),
Token::DoubleQuotedString(s) => Ok(Value::DoubleQuotedString(s)),
Token::EscapedStringLiteral(s) => Ok(Value::EscapedStringLiteral(s)),
Token::Number(ref n, l) => {
let n = n
.parse()
.expect("Token::Number should always contain a valid number");
Ok(Value::Number(n, l))
}
Token::Number(n, l) => Ok(Value::Number(n, l)),
_ => self.parser.expected("string or numeric value", next_token),
}
}
Expand Down Expand Up @@ -655,13 +658,14 @@ impl<'a> DFParser<'a> {
}

let create = CreateExternalTable {
name: table_name.to_string(),
name: table_name,
columns,
file_type: builder.file_type.unwrap(),
location: builder.location.unwrap(),
table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]),
order_exprs: builder.order_exprs,
if_not_exists,
temporary: false,
unbounded,
options: builder.options.unwrap_or(Vec::new()),
constraints,
Expand Down
4 changes: 3 additions & 1 deletion src/datafusion/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
| SQLDataType::Bytes(_)
| SQLDataType::Int64
| SQLDataType::Float64
| SQLDataType::Struct(_)
| SQLDataType::Struct(_, _)
| SQLDataType::JSONB
| SQLDataType::Unspecified
// Clickhouse datatypes
Expand All @@ -157,10 +157,12 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
| SQLDataType::FixedString(_)
| SQLDataType::Map(_, _)
| SQLDataType::Tuple(_)

| SQLDataType::Nested(_)
| SQLDataType::Union(_)
| SQLDataType::Nullable(_)
| SQLDataType::LowCardinality(_)
| SQLDataType::Trigger
=> not_impl_err!(
"Unsupported SQL type {sql_type:?}"
),
Expand Down
Loading

0 comments on commit 89f8473

Please sign in to comment.