Skip to content

Commit

Permalink
Merge pull request #459 from splitgraph/delta-rs-write-polishing
Browse files Browse the repository at this point in the history
Delta rs write polishing
  • Loading branch information
gruuya authored Aug 11, 2023
2 parents 8726977 + 8d51ad7 commit cdb8329
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 443 deletions.
96 changes: 13 additions & 83 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::ops::Not;
use datafusion::datasource::TableProvider;
use datafusion::parquet::basic::{Compression, ZstdLevel};
use itertools::Itertools;
use object_store::local::LocalFileSystem;
use std::collections::HashMap;
use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
Expand All @@ -28,7 +27,7 @@ use futures::{StreamExt, TryStreamExt};

#[cfg(test)]
use mockall::automock;
use object_store::{path::Path, ObjectStore};
use object_store::path::Path;

use sqlparser::ast::{
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, FunctionDefinition,
Expand All @@ -37,7 +36,6 @@ use sqlparser::ast::{

use arrow_schema::{DataType, TimeUnit};
use chrono::Duration;
use std::iter::zip;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
Expand All @@ -59,7 +57,6 @@ use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::prelude::SessionConfig;
use datafusion::{
arrow::{
datatypes::{Schema, SchemaRef},
Expand All @@ -71,7 +68,7 @@ use datafusion::{
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
physical_plan::{
coalesce_partitions::CoalescePartitionsExec, empty::EmptyExec,
EmptyRecordBatchStream, ExecutionPlan, SendableRecordBatchStream, Statistics,
EmptyRecordBatchStream, ExecutionPlan, SendableRecordBatchStream,
},
prelude::SessionContext,
sql::TableReference,
Expand All @@ -83,10 +80,11 @@ use datafusion_expr::logical_plan::{
DropTable, Extension, LogicalPlan, Projection,
};
use datafusion_expr::{DdlStatement, DmlStatement, Filter, WriteOp};
use deltalake::action::{Action, Add, ColumnCountStat, DeltaOperation, Remove, SaveMode};
use deltalake::action::{Action, Add, DeltaOperation, Remove, SaveMode};
use deltalake::operations::create::CreateBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::writer::create_add;
use deltalake::{DeltaTable, Schema as DeltaSchema};
use log::{debug, info, warn};
use parking_lot::RwLock;
Expand All @@ -100,7 +98,6 @@ use crate::config::context::{build_object_store, build_state_with_table_factorie
use crate::config::schema;
use crate::config::schema::{GCS, S3};
use crate::datafusion::visit::VisitorMut;
use crate::delta_rs::backport_create_add::{create_add, NullCounts};
use crate::delta_rs::backports::parquet_scan_from_actions;
#[cfg(test)]
use crate::frontend::http::tests::deterministic_uuid;
Expand Down Expand Up @@ -145,66 +142,6 @@ fn get_uuid() -> Uuid {
Uuid::new_v4()
}

/// Load the Statistics for a Parquet file in memory
async fn get_parquet_file_statistics_bytes(
path: &std::path::Path,
schema: SchemaRef,
) -> Result<(i64, Statistics)> {
// DataFusion's methods for this are all private (see fetch_statistics / summarize_min_max)
// and require the ObjectStore abstraction since they are normally used in the context
// of a TableProvider sending a Range request to object storage to get min/max values
// for a Parquet file. We are currently interested in getting statistics for a temporary
// file we just wrote out, before uploading it to object storage.

// A more fancy way to get this working would be making an ObjectStore
// that serves as a write-through cache so that we can use it both when downloading and uploading
// Parquet files.

let tmp_dir = path
.parent()
.expect("Temporary Parquet file in the FS root");
let file_name = path
.file_name()
.expect("Temporary Parquet file pointing to a directory")
.to_string_lossy();

// Create a dummy object store pointing to our temporary directory
let dummy_object_store: Arc<dyn ObjectStore> =
Arc::from(LocalFileSystem::new_with_prefix(tmp_dir)?);
let dummy_path = Path::from(file_name.to_string());

let parquet = ParquetFormat::new();
let session_state = SessionContext::with_config(SessionConfig::new()).state();
let meta = dummy_object_store
.head(&dummy_path)
.await
.expect("Temporary object not found");
let stats = parquet
.infer_stats(&session_state, &dummy_object_store, schema, &meta)
.await?;
Ok((meta.size.try_into().unwrap(), stats))
}

// TODO: maybe we should do something along the lines of `apply_null_counts` from delta-rs
/// Generate delta-rs `NullCounts` from Parquet `Statistics`.
fn build_null_counts(partition_stats: &Statistics, schema: SchemaRef) -> NullCounts {
match &partition_stats.column_statistics {
// NB: Here we may end up with `null_count` being None, but DF pruning algorithm demands that
// the null count field be not nullable itself. Consequently for any such cases the
// pruning will fail, and we will default to using all partitions.
Some(column_statistics) => zip(column_statistics, schema.fields())
.filter(|(stats, _)| stats.null_count.is_some())
.map(|(stats, column)| {
(
column.name().to_string(),
ColumnCountStat::Value(stats.null_count.unwrap().try_into().unwrap()),
)
})
.collect::<NullCounts>(),
None => NullCounts::default(),
}
}

pub struct DefaultSeafowlContext {
pub inner: SessionContext,
pub table_catalog: Arc<dyn TableCatalog>,
Expand Down Expand Up @@ -324,22 +261,13 @@ pub async fn plan_to_object_store(
{
let permit = Arc::clone(&sem).acquire_owned().await.ok();

let physical = plan.clone();
let store = store.clone();
let prefix = prefix.clone();
let handle: tokio::task::JoinHandle<Result<Add>> =
tokio::task::spawn(async move {
// Move the ownership of the semaphore permit into the task
let _permit = permit;

// Index the Parquet file: get its min-max values and size
let (size, partition_stats) = get_parquet_file_statistics_bytes(
&partition_file_path,
physical.schema(),
)
.await?;
let null_counts = build_null_counts(&partition_stats, physical.schema());

// This is taken from delta-rs `PartitionWriter::next_data_path`
let file_name =
format!("part-{part:0>5}-{partitions_uuid}-c000.snappy.parquet");
Expand All @@ -350,6 +278,14 @@ pub async fn plan_to_object_store(
// path (just the file name).
let location = Path::from(prefix).child(file_name.clone());

let size = tokio::fs::metadata(
partition_file_path
.to_str()
.expect("Temporary Parquet file in the FS root"),
)
.await?
.len() as i64;

// For local FS stores, we can just move the file to the target location
if let Some(result) =
store.fast_upload(&partition_file_path, &location).await
Expand Down Expand Up @@ -422,13 +358,7 @@ pub async fn plan_to_object_store(

// Create the corresponding Add action; currently we don't support partition columns
// which simplifies things.
let add = create_add(
&HashMap::default(),
null_counts,
file_name,
size,
&metadata,
)?;
let add = create_add(&HashMap::default(), file_name, size, &metadata)?;

Ok(add)
});
Expand Down
Loading

0 comments on commit cdb8329

Please sign in to comment.