Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition table query optimize #1594

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion::{
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use runtime::Priority;
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use table_engine::{predicate::Predicate, remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop};

use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, TableScanContext};
Expand All @@ -62,13 +62,15 @@ pub struct UnresolvedPartitionedScan {
pub table_scan_ctx: TableScanContext,
pub metrics_collector: MetricsCollector,
pub priority: Priority,
pub predicates: Option<Vec<Predicate>>,
}

impl UnresolvedPartitionedScan {
pub fn new(
table_name: &str,
sub_tables: Vec<TableIdentifier>,
read_request: ReadRequest,
predicates: Option<Vec<Predicate>>,
) -> Self {
let metrics_collector = MetricsCollector::new(table_name.to_string());
let table_scan_ctx = TableScanContext {
Expand All @@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan {
table_scan_ctx,
metrics_collector,
priority: read_request.priority,
predicates,
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,17 @@ impl Resolver {
let sub_tables = unresolved.sub_tables.clone();
let remote_plans = sub_tables
.into_iter()
.map(|table| {
.enumerate()
.map(|(idx, table)| {
let plan = Arc::new(UnresolvedSubTableScan {
table: table.clone(),
table_scan_ctx: unresolved.table_scan_ctx.clone(),
table_scan_ctx: if let Some(ref predicates) = unresolved.predicates {
let mut ctx = unresolved.table_scan_ctx.clone();
ctx.predicate = Arc::new(predicates[idx].clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments why we need to overwrite old predicate.

ctx
} else {
unresolved.table_scan_ctx.clone()
},
});
let sub_metrics_collect = metrics_collector.span(table.table.clone());

Expand Down
2 changes: 2 additions & 0 deletions src/df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl TestContext {
"test",
sub_tables,
self.request.clone(),
None,
));

let filter: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -364,6 +365,7 @@ impl TestContext {
"test",
self.sub_table_groups[0].clone(),
self.request.clone(),
None,
));

self.build_aggr_plan_with_input(unresolved_scan)
Expand Down
50 changes: 47 additions & 3 deletions src/partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ use std::sync::Arc;

use analytic_engine::TableOptions;
use async_trait::async_trait;
use datafusion::logical_expr::expr::{Expr, InList};
use generic_error::BoxError;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
engine::{
CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest,
DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, Result, TableEngine,
Unexpected, UnexpectedNoCause,
DropTableRequest, InvalidPartitionContext, OpenShardRequest, OpenShardResult,
OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
},
partition::rule::df_adapter::PartitionedFilterKeyIndex,
predicate::Predicate,
remote::RemoteEngineRef,
table::TableRef,
PARTITION_TABLE_ENGINE_TYPE,
Expand Down Expand Up @@ -110,3 +113,44 @@ impl TableEngine for PartitionTableEngine {
vec![Ok("".to_string())]
}
}

pub fn partitioned_predicates(
predicate: Arc<Predicate>,
partitions: &[usize],
partitioned_key_indices: &mut PartitionedFilterKeyIndex,
) -> Result<Vec<Predicate>> {
ensure!(
partitions.len() == partitioned_key_indices.keys().len(),
InvalidPartitionContext {
msg: format!(
"partitions length:{}, partitioned_key_indices length: {}",
partitions.len(),
partitioned_key_indices.keys().len()
)
}
);
let mut predicates = vec![(*predicate).clone(); partitions.len()];
for (idx, predicate) in predicates.iter_mut().enumerate() {
let partition = partitions[idx];
if let Some(filter_indices) = partitioned_key_indices.get(&partition) {
let exprs = predicate.mut_exprs();
for (filter_idx, key_indices) in filter_indices {
if let Expr::InList(InList {
expr: _expr,
list,
negated: false,
}) = &mut exprs[*filter_idx]
{
let filtered_list = list
.iter()
.enumerate()
.filter(|(idx, _)| key_indices.contains(idx))
.map(|(_, value)| value.clone())
.collect::<Vec<_>>();
*list = filtered_list;
}
}
}
}
Ok(predicates)
}
8 changes: 4 additions & 4 deletions src/partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use table_engine::{
partition::{
format_sub_partition_table_name,
rule::{
df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows,
PartitionedRowsIter,
df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
PartitionedRow, PartitionedRows, PartitionedRowsIter,
},
PartitionInfo,
},
Expand Down Expand Up @@ -289,14 +289,14 @@ impl Table for PartitionTableImpl {
.context(CreatePartitionRule)?
}
};

let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
// Evaluate expr and locate partition.
let partitions = {
let _locate_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM
.with_label_values(&["locate"])
.start_timer();
df_partition_rule
.locate_partitions_for_read(request.predicate.exprs())
.locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices)
.box_err()
.context(LocatePartitions)?
};
Expand Down
185 changes: 178 additions & 7 deletions src/partition_table_engine/src/scan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ use datafusion::{
use df_engine_extensions::dist_sql_query::physical_plan::UnresolvedPartitionedScan;
use table_engine::{
partition::{
format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo,
format_sub_partition_table_name,
rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
PartitionInfo,
},
provider::TableScanBuilder,
remote::model::TableIdentifier,
table::ReadRequest,
};

use super::partitioned_predicates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our codebase should prefer absolute import over super import.

#[derive(Debug)]
pub struct PartitionedTableScanBuilder {
table_name: String,
Expand Down Expand Up @@ -61,13 +64,13 @@ impl PartitionedTableScanBuilder {
&self,
table_name: &str,
partition_info: &PartitionInfo,
partitions: Vec<usize>,
partitions: &[usize],
) -> Vec<TableIdentifier> {
let definitions = partition_info.get_definitions();
partitions
.into_iter()
.iter()
.map(|p| {
let partition_name = &definitions[p].name;
let partition_name = &definitions[*p].name;
TableIdentifier {
catalog: self.catalog_name.clone(),
schema: self.schema_name.clone(),
Expand All @@ -89,18 +92,186 @@ impl TableScanBuilder for PartitionedTableScanBuilder {
DataFusionError::Internal(format!("failed to build partition rule, err:{e}"))
})?;

let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
// Evaluate expr and locate partition.
let partitions = df_partition_rule
.locate_partitions_for_read(request.predicate.exprs())
.locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices)
.map_err(|e| {
DataFusionError::Internal(format!("failed to locate partition for read, err:{e}"))
})?;

let sub_tables =
self.get_sub_table_idents(&self.table_name, &self.partition_info, partitions);
self.get_sub_table_idents(&self.table_name, &self.partition_info, &partitions);

let predicates = if partitioned_key_indices.len() == partitions.len() {
Some(
partitioned_predicates(
request.predicate.clone(),
&partitions,
&mut partitioned_key_indices,
)
.map_err(|e| DataFusionError::Internal(format!("err:{e}")))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|e| DataFusionError::Internal(format!("err:{e}")))?,
.map_err(|e| DataFusionError::Internal(format!("partition predicates failed, err:{e}")))?,

)
} else {
// since FilterExtractor.extract only cover some specific expr
// cases, partitioned_key_indices.len() could be 0.
// All partition requests will have the same predicate.
None
};

// Build plan.
let plan = UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request);
let plan =
UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request, predicates);

Ok(Arc::new(plan))
}
}

#[cfg(test)]
mod tests {
use common_types::{column_schema::Builder as ColBuilder, datum::DatumKind, schema::Builder};
use datafusion::logical_expr::{
expr::{BinaryExpr, InList},
Expr, Operator,
};
use table_engine::{
partition::{
rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
KeyPartitionInfo, PartitionDefinition, PartitionInfo,
},
predicate::PredicateBuilder,
};

use super::partitioned_predicates;

#[test]
fn test_partitioned_predicate() {
// conditions:
// 1) table schema: col_ts, col1, col2, in which col1 and col2 are both keys,
// and with two partitions
// 2) sql: select * from table where col1 = '33' and col2 in ("aa", "bb",
// "cc", "dd")
// partition expectations:
// 1) query fit in two partitions
// 2) yield two predicates, p0: col1 = '33' and col2 in ("aa", "bb", "cc");
// p1: col1 = '33' and col2 in ("dd")
let definitions = vec![
PartitionDefinition {
name: "p1".to_string(),
origin_name: None,
},
PartitionDefinition {
name: "p2".to_string(),
origin_name: None,
},
];

let partition_info = PartitionInfo::Key(KeyPartitionInfo {
version: 0,
definitions,
partition_key: vec!["col1".to_string(), "col2".to_string()],
linear: false,
});

let schema = {
let builder = Builder::new();
let col_ts = ColBuilder::new("col_ts".to_string(), DatumKind::Timestamp)
.build()
.expect("ts");
let col1 = ColBuilder::new("col1".to_string(), DatumKind::String)
.build()
.expect("should succeed to build column schema");
let col2 = ColBuilder::new("col2".to_string(), DatumKind::String)
.build()
.expect("should succeed to build column schema");
builder
.auto_increment_column_id(true)
.add_key_column(col_ts)
.unwrap()
.add_key_column(col1)
.unwrap()
.add_key_column(col2)
.unwrap()
.primary_key_indexes(vec![1, 2])
.build()
.unwrap()
};

let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, &schema).unwrap();

let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion provider some helper functions to define expr:

left: Box::new(Expr::Column("col1".into())),
op: Operator::Eq,
right: Box::new(Expr::Literal("33".into())),
}),
Expr::InList(InList {
expr: Box::new(Expr::Column("col2".into())),
list: vec![
Expr::Literal("aa".into()),
Expr::Literal("bb".into()),
Expr::Literal("cc".into()),
Expr::Literal("dd".into()),
],
negated: false,
}),
];
let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
let partitions = df_partition_rule
.locate_partitions_for_read(&exprs, &mut partitioned_key_indices)
.unwrap();
assert!(partitions.len() == 2);
assert!(partitioned_key_indices.len() == 2);

let predicate = PredicateBuilder::default()
.add_pushdown_exprs(exprs.as_slice())
.build();

let predicates = partitioned_predicates(
predicate,
partitions.as_slice(),
&mut partitioned_key_indices,
);
assert!(predicates.is_ok());
let predicates = predicates.unwrap();
assert!(predicates.len() == 2);

assert!(predicates[0].exprs().len() == 2);
assert!(
predicates[0].exprs()[0]
== Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column("col1".into())),
op: Operator::Eq,
right: Box::new(Expr::Literal("33".into())),
})
);
assert!(
predicates[0].exprs()[1]
== Expr::InList(InList {
expr: Box::new(Expr::Column("col2".into())),
list: vec![
Expr::Literal("aa".into()),
Expr::Literal("bb".into()),
Expr::Literal("cc".into()),
],
negated: false,
})
);
assert!(
predicates[1].exprs()[0]
== Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column("col1".into())),
op: Operator::Eq,
right: Box::new(Expr::Literal("33".into())),
})
);
assert!(
predicates[1].exprs()[1]
== Expr::InList(InList {
expr: Box::new(Expr::Column("col2".into())),
list: vec![Expr::Literal("dd".into()),],
negated: false,
})
);
}
}
3 changes: 3 additions & 0 deletions src/table_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub enum Error {
msg: Option<String>,
source: GenericError,
},

#[snafu(display("Invalid partiton context, err:{}", msg))]
InvalidPartitionContext { msg: String },
}

define_result!(Error);
Expand Down
Loading
Loading