-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: partition table query optimize #1594
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -27,13 +27,16 @@ use datafusion::{ | |||||
use df_engine_extensions::dist_sql_query::physical_plan::UnresolvedPartitionedScan; | ||||||
use table_engine::{ | ||||||
partition::{ | ||||||
format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo, | ||||||
format_sub_partition_table_name, | ||||||
rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, | ||||||
PartitionInfo, | ||||||
}, | ||||||
provider::TableScanBuilder, | ||||||
remote::model::TableIdentifier, | ||||||
table::ReadRequest, | ||||||
}; | ||||||
|
||||||
use super::partitioned_predicates; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our codebase should prefer absolute import over super import. |
||||||
#[derive(Debug)] | ||||||
pub struct PartitionedTableScanBuilder { | ||||||
table_name: String, | ||||||
|
@@ -61,13 +64,13 @@ impl PartitionedTableScanBuilder { | |||||
&self, | ||||||
table_name: &str, | ||||||
partition_info: &PartitionInfo, | ||||||
partitions: Vec<usize>, | ||||||
partitions: &[usize], | ||||||
) -> Vec<TableIdentifier> { | ||||||
let definitions = partition_info.get_definitions(); | ||||||
partitions | ||||||
.into_iter() | ||||||
.iter() | ||||||
.map(|p| { | ||||||
let partition_name = &definitions[p].name; | ||||||
let partition_name = &definitions[*p].name; | ||||||
TableIdentifier { | ||||||
catalog: self.catalog_name.clone(), | ||||||
schema: self.schema_name.clone(), | ||||||
|
@@ -89,18 +92,186 @@ impl TableScanBuilder for PartitionedTableScanBuilder { | |||||
DataFusionError::Internal(format!("failed to build partition rule, err:{e}")) | ||||||
})?; | ||||||
|
||||||
let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); | ||||||
// Evaluate expr and locate partition. | ||||||
let partitions = df_partition_rule | ||||||
.locate_partitions_for_read(request.predicate.exprs()) | ||||||
.locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices) | ||||||
.map_err(|e| { | ||||||
DataFusionError::Internal(format!("failed to locate partition for read, err:{e}")) | ||||||
})?; | ||||||
|
||||||
let sub_tables = | ||||||
self.get_sub_table_idents(&self.table_name, &self.partition_info, partitions); | ||||||
self.get_sub_table_idents(&self.table_name, &self.partition_info, &partitions); | ||||||
|
||||||
let predicates = if partitioned_key_indices.len() == partitions.len() { | ||||||
Some( | ||||||
partitioned_predicates( | ||||||
request.predicate.clone(), | ||||||
&partitions, | ||||||
&mut partitioned_key_indices, | ||||||
) | ||||||
.map_err(|e| DataFusionError::Internal(format!("err:{e}")))?, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) | ||||||
} else { | ||||||
// since FilterExtractor.extract only cover some specific expr | ||||||
// cases, partitioned_key_indices.len() could be 0. | ||||||
// All partition requests will have the same predicate. | ||||||
None | ||||||
}; | ||||||
|
||||||
// Build plan. | ||||||
let plan = UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request); | ||||||
let plan = | ||||||
UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request, predicates); | ||||||
|
||||||
Ok(Arc::new(plan)) | ||||||
} | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod tests { | ||||||
use common_types::{column_schema::Builder as ColBuilder, datum::DatumKind, schema::Builder}; | ||||||
use datafusion::logical_expr::{ | ||||||
expr::{BinaryExpr, InList}, | ||||||
Expr, Operator, | ||||||
}; | ||||||
use table_engine::{ | ||||||
partition::{ | ||||||
rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, | ||||||
KeyPartitionInfo, PartitionDefinition, PartitionInfo, | ||||||
}, | ||||||
predicate::PredicateBuilder, | ||||||
}; | ||||||
|
||||||
use super::partitioned_predicates; | ||||||
|
||||||
#[test] | ||||||
fn test_partitioned_predicate() { | ||||||
// conditions: | ||||||
// 1) table schema: col_ts, col1, col2, in which col1 and col2 are both keys, | ||||||
// and with two partitions | ||||||
// 2) sql: select * from table where col1 = '33' and col2 in ("aa", "bb", | ||||||
// "cc", "dd") | ||||||
// partition expectations: | ||||||
// 1) query fit in two partitions | ||||||
// 2) yield two predicates, p0: col1 = '33' and col2 in ("aa", "bb", "cc"); | ||||||
// p1: col1 = '33' and col2 in ("dd") | ||||||
let definitions = vec![ | ||||||
PartitionDefinition { | ||||||
name: "p1".to_string(), | ||||||
origin_name: None, | ||||||
}, | ||||||
PartitionDefinition { | ||||||
name: "p2".to_string(), | ||||||
origin_name: None, | ||||||
}, | ||||||
]; | ||||||
|
||||||
let partition_info = PartitionInfo::Key(KeyPartitionInfo { | ||||||
version: 0, | ||||||
definitions, | ||||||
partition_key: vec!["col1".to_string(), "col2".to_string()], | ||||||
linear: false, | ||||||
}); | ||||||
|
||||||
let schema = { | ||||||
let builder = Builder::new(); | ||||||
let col_ts = ColBuilder::new("col_ts".to_string(), DatumKind::Timestamp) | ||||||
.build() | ||||||
.expect("ts"); | ||||||
let col1 = ColBuilder::new("col1".to_string(), DatumKind::String) | ||||||
.build() | ||||||
.expect("should succeed to build column schema"); | ||||||
let col2 = ColBuilder::new("col2".to_string(), DatumKind::String) | ||||||
.build() | ||||||
.expect("should succeed to build column schema"); | ||||||
builder | ||||||
.auto_increment_column_id(true) | ||||||
.add_key_column(col_ts) | ||||||
.unwrap() | ||||||
.add_key_column(col1) | ||||||
.unwrap() | ||||||
.add_key_column(col2) | ||||||
.unwrap() | ||||||
.primary_key_indexes(vec![1, 2]) | ||||||
.build() | ||||||
.unwrap() | ||||||
}; | ||||||
|
||||||
let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, &schema).unwrap(); | ||||||
|
||||||
let exprs = vec![ | ||||||
Expr::BinaryExpr(BinaryExpr { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DataFusion provider some helper functions to define expr: |
||||||
left: Box::new(Expr::Column("col1".into())), | ||||||
op: Operator::Eq, | ||||||
right: Box::new(Expr::Literal("33".into())), | ||||||
}), | ||||||
Expr::InList(InList { | ||||||
expr: Box::new(Expr::Column("col2".into())), | ||||||
list: vec![ | ||||||
Expr::Literal("aa".into()), | ||||||
Expr::Literal("bb".into()), | ||||||
Expr::Literal("cc".into()), | ||||||
Expr::Literal("dd".into()), | ||||||
], | ||||||
negated: false, | ||||||
}), | ||||||
]; | ||||||
let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); | ||||||
let partitions = df_partition_rule | ||||||
.locate_partitions_for_read(&exprs, &mut partitioned_key_indices) | ||||||
.unwrap(); | ||||||
assert!(partitions.len() == 2); | ||||||
assert!(partitioned_key_indices.len() == 2); | ||||||
|
||||||
let predicate = PredicateBuilder::default() | ||||||
.add_pushdown_exprs(exprs.as_slice()) | ||||||
.build(); | ||||||
|
||||||
let predicates = partitioned_predicates( | ||||||
predicate, | ||||||
partitions.as_slice(), | ||||||
&mut partitioned_key_indices, | ||||||
); | ||||||
assert!(predicates.is_ok()); | ||||||
let predicates = predicates.unwrap(); | ||||||
assert!(predicates.len() == 2); | ||||||
|
||||||
assert!(predicates[0].exprs().len() == 2); | ||||||
assert!( | ||||||
predicates[0].exprs()[0] | ||||||
== Expr::BinaryExpr(BinaryExpr { | ||||||
left: Box::new(Expr::Column("col1".into())), | ||||||
op: Operator::Eq, | ||||||
right: Box::new(Expr::Literal("33".into())), | ||||||
}) | ||||||
); | ||||||
assert!( | ||||||
predicates[0].exprs()[1] | ||||||
== Expr::InList(InList { | ||||||
expr: Box::new(Expr::Column("col2".into())), | ||||||
list: vec![ | ||||||
Expr::Literal("aa".into()), | ||||||
Expr::Literal("bb".into()), | ||||||
Expr::Literal("cc".into()), | ||||||
], | ||||||
negated: false, | ||||||
}) | ||||||
); | ||||||
assert!( | ||||||
predicates[1].exprs()[0] | ||||||
== Expr::BinaryExpr(BinaryExpr { | ||||||
left: Box::new(Expr::Column("col1".into())), | ||||||
op: Operator::Eq, | ||||||
right: Box::new(Expr::Literal("33".into())), | ||||||
}) | ||||||
); | ||||||
assert!( | ||||||
predicates[1].exprs()[1] | ||||||
== Expr::InList(InList { | ||||||
expr: Box::new(Expr::Column("col2".into())), | ||||||
list: vec![Expr::Literal("dd".into()),], | ||||||
negated: false, | ||||||
}) | ||||||
); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments why we need to overwrite old predicate.