Skip to content

Commit

Permalink
Merge pull request #584 from splitgraph/datafusion-40-upgrade
Browse files Browse the repository at this point in the history
DataFusion 40 upgrade
  • Loading branch information
gruuya authored Jul 31, 2024
2 parents 9e4d13c + 0983255 commit 82a9ca4
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 206 deletions.
354 changes: 194 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
members = ["clade", "object_store_factory"]

[workspace.dependencies]
arrow = { version = "52.0.0", features = ["test_utils"] }
arrow-buffer = "52.0.0"
arrow-csv = "52.0.0"
arrow-flight = "52.0.0"
arrow = { version = "52.1.0", features = ["test_utils"] }
arrow-buffer = "52.1.0"
arrow-csv = "52.1.0"
arrow-flight = "52.1.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "52.0.0"
arrow-row = "52.0.0"
arrow-schema = "52.0.0"
arrow-integration-test = "52.1.0"
arrow-row = "52.1.0"
arrow-schema = "52.1.0"
async-trait = "0.1.64"

datafusion = "39.0.0"
datafusion-common = "39.0.0"
datafusion-expr = "39.0.0"
datafusion = "40.0.0"
datafusion-common = "40.0.0"
datafusion-expr = "40.0.0"

futures = "0.3"

Expand All @@ -34,16 +34,16 @@ url = "2.5"


[patch.crates-io]
# Pick up backport of https://github.com/apache/arrow-datafusion/pull/11386
datafusion = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-common = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-execution = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-optimizer = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-physical-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-physical-plan = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-proto = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
datafusion-sql = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11386" }
# Pick up fix for https://github.com/apache/arrow-datafusion/pull/11386
datafusion = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }
datafusion-sql = { git = "https://github.com/apache/datafusion", rev = "d314ced8090cb599fd7808d7df41699e46ac956e" }

[package]
name = "seafowl"
Expand Down Expand Up @@ -93,8 +93,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-39-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-39-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-40-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-40-upgrade", optional = true }

dashmap = "5.4.0"

Expand All @@ -104,7 +104,7 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "d9605eafdf03d346277f167ce5cf3886db0e1bc5", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "e75a0b49b40f35ed361444bbea0e5720f359d732", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-39-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-40-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::utils::gc_databases;

use arrow_schema::{DataType, Schema, TimeUnit};
use chrono::TimeDelta;
use datafusion::common::{DFSchema, FileType};
use datafusion::common::DFSchema;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
Expand Down Expand Up @@ -845,7 +845,7 @@ impl SeafowlContext {
pub async fn file_to_table(
&self,
file_path: String,
file_type: FileType,
file_type: &str,
file_schema: Option<SchemaRef>,
has_header: bool,
schema_name: String,
Expand Down Expand Up @@ -891,8 +891,8 @@ impl SeafowlContext {
// Create a `ListingTable` that points to the specified file
let table_path = ListingTableUrl::parse(file_path)?;
let file_format: Arc<dyn FileFormat> = match file_type {
FileType::CSV => Arc::new(CsvFormat::default().with_has_header(has_header)),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
"csv" => Arc::new(CsvFormat::default().with_has_header(has_header)),
"parquet" => Arc::new(ParquetFormat::default()),
_ => {
return Err(Error::Plan(format!(
"File type {file_type:?} not supported!"
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/sync/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl SyncSchema {
column_descriptors: Vec<ColumnDescriptor>,
schema: SchemaRef,
) -> Result<Self, SyncError> {
if column_descriptors.len() != schema.all_fields().len() {
if column_descriptors.len() != schema.flattened_fields().len() {
return Err(SyncError::SchemaError {
reason: "Column descriptors do not match the schema".to_string(),
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ impl SeafowlDataSyncWriter {
// Normalize the schema, by ordering columns according to the full table schema and
// projecting the sync data accordingly.
let projection = full_schema
.all_fields()
.flattened_fields()
.iter()
.map(|f| {
let name = f.name();
Expand Down
12 changes: 5 additions & 7 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use datafusion::datasource::DefaultTableSource;

use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::FileType;
use datafusion_expr::logical_plan::{LogicalPlan, TableScan};
use deltalake::parquet::data_type::AsBytes;
use deltalake::DeltaTable;
Expand Down Expand Up @@ -441,14 +440,13 @@ pub async fn upload(
return Err(ApiError::UploadMissingFile);
}

let file_type = match filename
let file_type = filename
.split('.')
.last()
.ok_or_else(|| ApiError::UploadMissingFilenameExtension(filename.clone()))?
{
"csv" => FileType::CSV,
"parquet" => FileType::PARQUET,
_ => return Err(ApiError::UploadUnsupportedFileFormat(filename)),
.ok_or_else(|| ApiError::UploadMissingFilenameExtension(filename.clone()))?;

if file_type != "csv" && file_type != "parquet" {
return Err(ApiError::UploadUnsupportedFileFormat(filename));
};

// Execute the plan and persist objects as well as table/partition metadata
Expand Down
2 changes: 1 addition & 1 deletion tests/statements/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_create_table_and_insert() {
let results = context.collect(plan).await.unwrap();

let expected = ["+--------------------------------+--------------------------------------------+----------------------------+",
"| MAX(test_table.some_int_value) | COUNT(DISTINCT test_table.some_bool_value) | MAX(test_table.some_value) |",
"| MAX(test_table.some_int_value) | count(DISTINCT test_table.some_bool_value) | MAX(test_table.some_value) |",
"+--------------------------------+--------------------------------------------+----------------------------+",
"| 3333 | 0 | 44.0 |",
"+--------------------------------+--------------------------------------------+----------------------------+"];
Expand Down
8 changes: 0 additions & 8 deletions tests/statements/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ async fn test_vacuum_table() -> Result<()> {
)
.await;

// Likewise, trying to time-travel to table_1 v1 will fail
table_1.load_version(1).await?;
let err = table_1
.scan(&context.inner.state(), Some(&vec![4_usize]), &[], None)
.await
.unwrap_err();
assert!(err.to_string().contains(".parquet not found"));

// Run vacuum on table_2 as well
context
.collect(context.plan_query("VACUUM TABLE table_2").await.unwrap())
Expand Down

0 comments on commit 82a9ca4

Please sign in to comment.