Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition table query optimize #1594

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

zealchen
Copy link
Contributor

@zealchen zealchen commented Nov 12, 2024

Rationale

#1441

Detailed Changes

TLDR

The performance issue with inlist queries is due to the extra overhead from bloom-filter-like directory lookups when scanning each SST file for rows. The solution is to create a separate predicate for each partition, containing only the keys relevant to that partition. Since the current partition filter only supports BinaryExpr(Column, operator, Literal) and non-negated InList expressions, this solution will address only those specific cases.

Changes

  1. During the scan building process, when identifying the partitions for a query, we create a PartitionedFilterKeyIndex variable to store the predicate key indices for each expression.
  2. In the compute_partition_for_keys_group function, we use a HashMap<partition_id, HashMap<filter_index, BTreeSet<key_index>>> to record the indices of keys involved in partition computation for each group. Since DatumView only implements PartialEq or PartialOrd, it cannot be directly stored in a HashSet or BTreeSet. To handle this, I use a key_index in a set to ensure deduplication of all inlist keys when each key group is built in multi_cartesian_product.
  3. In the partitioned_predicates function, we construct the final predicates for each partition.
  4. In resolve_partitioned_scan_internal, we generate separate requests for each partition.

e.g.
conditions:

  1. table schema: col_ts, col1, col2, in which col1 and col2 are both keys,
    and with two partitions
  2. sql: select * from table where col1 = '33' and col2 in ("aa", "bb",
    "cc", "dd")

partition expectations:
yield two predicates
p0: col1 = '33' and col2 in ("aa", "bb", "cc");
p1: col1 = '33' and col2 in ("dd")

Other issues discovered

When the inlist key args length is less than three, Expr will be refactored to nested BinaryExpr which bypasses the FilterExtractor.

e.g.
SQL: select * from table where col1 in ("aa", "bb") and col2 in (1,2,3,4,5...1000)
Since ("aa", "bb") has fewer than three elements, the col1 key filter is not included in partition computation, which interrupts the partitioning process in the get_candidate_partition_keys_groups function, as contains_empty_filter is set to true.

Test Plan

  1. UT: test_partitioned_predicate
  2. Manual test.

@github-actions github-actions bot added the feature New feature or request label Nov 12, 2024
@jiacai2050 jiacai2050 self-requested a review November 13, 2024 03:03
table_scan_ctx: unresolved.table_scan_ctx.clone(),
table_scan_ctx: if let Some(ref predicates) = unresolved.predicates {
let mut ctx = unresolved.table_scan_ctx.clone();
ctx.predicate = Arc::new(predicates[idx].clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments why we need to overwrite old predicate.

},
provider::TableScanBuilder,
remote::model::TableIdentifier,
table::ReadRequest,
};

use super::partitioned_predicates;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our codebase should prefer absolute import over super import.

&partitions,
&mut partitioned_key_indices,
)
.map_err(|e| DataFusionError::Internal(format!("err:{e}")))?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|e| DataFusionError::Internal(format!("err:{e}")))?,
.map_err(|e| DataFusionError::Internal(format!("partition predicates failed, err:{e}")))?,

let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, &schema).unwrap();

let exprs = vec![
Expr::BinaryExpr(BinaryExpr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion provider some helper functions to define expr:

},
BuildPartitionRule, PartitionInfo, Result,
};

mod extractor;

pub type PartitionedFilterKeyIndex = HashMap<usize, HashMap<usize, BTreeSet<usize>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments explaining the type meaning of those usize.

let mut partitions = BTreeSet::new();
// Retrieve all the key DatumView instances along with their corresponding
// indices related to their positions in the predicate inlist. Since DatumView
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl<'a> std::hash::Hash for DatumView<'a> {

DatumView already impl Hash

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants