From 2e75d772ff6dec078a19aa190f32c9b0b96f9fff Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 29 Nov 2023 19:48:09 +0100 Subject: [PATCH] use min-max-heap for merge iterator --- Cargo.toml | 1 + benches/lsmt.rs | 10 +- src/merge.rs | 342 +++++++++++++++++++++++++++--------------------- src/value.rs | 12 ++ 4 files changed, 208 insertions(+), 157 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e16caa9..1c8394bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ chrono = "0.4.31" crc32fast = "1.3.2" log = "0.4.20" lz4_flex = "0.11.1" +min-max-heap = "1.3.0" quick_cache = { version = "0.4.0", default-features = false } rand = "0.8.5" serde = { version = "1.0.192", features = ["derive"] } diff --git a/benches/lsmt.rs b/benches/lsmt.rs index 95d30d7a..23bb1ea6 100644 --- a/benches/lsmt.rs +++ b/benches/lsmt.rs @@ -192,7 +192,10 @@ fn full_scan(c: &mut Criterion) { let item_count = 500_000; group.bench_function("full scan uncached", |b| { - let tree = Config::new(tempdir().unwrap()).open().unwrap(); + let tree = Config::new(tempdir().unwrap()) + .block_cache_size(0) + .open() + .unwrap(); for x in 0_u32..item_count { let key = x.to_be_bytes(); @@ -212,10 +215,7 @@ fn full_scan(c: &mut Criterion) { }); group.bench_function("full scan cached", |b| { - let tree = Config::new(tempdir().unwrap()) - .block_cache_size(100_000) - .open() - .unwrap(); + let tree = Config::new(tempdir().unwrap()).open().unwrap(); for x in 0_u32..item_count { let key = x.to_be_bytes(); diff --git a/src/merge.rs b/src/merge.rs index 53a5cbe9..6e5aa80c 100644 --- a/src/merge.rs +++ b/src/merge.rs @@ -1,34 +1,25 @@ +use crate::{segment::Segment, Value}; +use min_max_heap::MinMaxHeap; use std::sync::Arc; -use crate::{segment::Segment, Value}; +type BoxedIterator<'a> = Box> + 'a>; -/// This iterator can iterate through N segments simultaneously in order +/// This iterator can iterate through N iterators simultaneously in order /// This is achieved by advancing the iterators that yield the lowest/highest item /// and merging using a simple k-way merge algorithm /// -/// If multiple iterators yield the same key value, the freshest one (by timestamp) will be picked +/// If multiple iterators yield the same key value, the freshest one (by seqno) will be picked pub struct MergeIterator<'a> { - iterators: Vec> + 'a>>, - lo_state: Vec>, - hi_state: Vec>, - lo_initialized: bool, - hi_initialized: bool, + iterators: Vec>, + heap: MinMaxHeap, } impl<'a> MergeIterator<'a> { - /// Initializes a new interleaved iterator - pub fn new( - iterators: Vec> + 'a>>, - ) -> Self { - let initial_state: Vec> = iterators.iter().map(|_| None).collect(); - let hi_state = initial_state.clone(); - + /// Initializes a new merge iterator + pub fn new(iterators: Vec>) -> Self { Self { iterators, - lo_state: initial_state, - hi_state, - lo_initialized: false, - hi_initialized: false, + heap: MinMaxHeap::new(), } } @@ -44,169 +35,216 @@ impl<'a> MergeIterator<'a> { Ok(Box::new(MergeIterator::new(iter_vec))) } - /// Finds the lowest key entry from all iterators - fn get_min_entry(&self) -> Option { - let min_entry = self - .lo_state - .iter() - .filter_map(std::option::Option::as_ref) - // TODO: use min_by - .reduce(|min_entry, x| if min_entry.key <= x.key { min_entry } else { x }); + fn push_next(&mut self) -> crate::Result<()> { + for iterator in &mut self.iterators { + if let Some(result) = iterator.next() { + let value = result?; + self.heap.push(value); + } + } - min_entry.cloned() + Ok(()) } - /// Finds the highest key entry from all iterators - fn get_max_entry(&self) -> Option { - let max_entry = self - .hi_state - .iter() - .filter_map(std::option::Option::as_ref) - // TODO: use min_by - .reduce(|max_entry, x| if max_entry.key >= x.key { max_entry } else { x }); + fn push_next_back(&mut self) -> crate::Result<()> { + for iterator in &mut self.iterators { + if let Some(result) = iterator.next_back() { + let value = result?; + self.heap.push(value); + } + } - max_entry.cloned() + Ok(()) } +} - /// Returns the indices of all iterators that should be advanced forward - fn get_indices_to_advance_forwards(&self, min_entry: &Value) -> Vec { - self.lo_state - .iter() - .enumerate() - .filter(|(_, x)| { - if let Some(x) = x { - x.key == min_entry.key +impl<'a> Iterator for MergeIterator<'a> { + type Item = crate::Result; + + fn next(&mut self) -> Option { + if self.heap.is_empty() { + if let Err(e) = self.push_next() { + return Some(Err(e)); + }; + } + + if let Some(mut head) = self.heap.pop_min() { + while let Some(next) = self.heap.pop_min() { + if head.key == next.key { + head = if head.seqno > next.seqno { head } else { next }; } else { - false + // Push back the non-conflicting item. + self.heap.push(next); + break; } - }) - .map(|(i, _)| i) - .collect() - } + } - /// Returns the indices of all iterators that should be advanced backward - fn get_indices_to_advance_backwards(&self, max_entry: &Value) -> Vec { - self.hi_state - .iter() - .enumerate() - .filter(|(_, x)| x.is_some()) - .filter(|(_, x)| x.as_ref().unwrap().key == max_entry.key) - .map(|(i, _)| i) - .collect() - } + if let Err(e) = self.push_next() { + return Some(Err(e)); + }; - /// Advances selected iterators forwards - fn advance_iterators_forwards(&mut self, indices: Vec) -> crate::Result<()> { - for index in indices { - let iterator = &mut self.iterators[index]; - self.lo_state[index] = iterator.next().transpose()?; + Some(Ok(head)) + } else { + None } - Ok(()) } +} - /// Advances selected iterators backwards - fn advance_iterators_backwards(&mut self, indices: Vec) -> crate::Result<()> { - for index in indices { - let iterator = &mut self.iterators[index]; - self.hi_state[index] = iterator.next_back().transpose()?; +impl<'a> DoubleEndedIterator for MergeIterator<'a> { + fn next_back(&mut self) -> Option { + if self.heap.is_empty() { + if let Err(e) = self.push_next_back() { + return Some(Err(e)); + }; } - Ok(()) - } - // TODO: refactor with hi_entry - fn get_newest_lo_entry(&self, indices: &[usize]) -> Value { - indices - .iter() - .filter_map(|index| self.lo_state.get(*index)) - .flatten() - // TODO: use max_by - .reduce(|newer, x| if newer.seqno >= x.seqno { newer } else { x }) - .unwrap() - .clone() - } + if let Some(mut head) = self.heap.pop_max() { + while let Some(next) = self.heap.pop_max() { + if head.key == next.key { + head = if head.seqno > next.seqno { head } else { next }; + } else { + // Push back the non-conflicting item. + self.heap.push(next); + break; + } + } - fn get_newest_hi_entry(&self, indices: &[usize]) -> Value { - indices - .iter() - .filter_map(|index| self.hi_state.get(*index)) - .flatten() - // TODO: use max_by - .reduce(|newer, x| if newer.seqno >= x.seqno { newer } else { x }) - .unwrap() - .clone() + if let Err(e) = self.push_next_back() { + return Some(Err(e)); + }; + + Some(Ok(head)) + } else { + None + } } } -impl<'a> Iterator for MergeIterator<'a> { - type Item = crate::Result; +#[cfg(test)] +mod tests { + use super::*; + use test_log::test; - fn next(&mut self) -> Option { - if !self.lo_initialized { - let next_lo_state: Result, _> = self - .iterators - .iter_mut() - .map(Iterator::next) - .map(Option::transpose) - .collect(); - match next_lo_state { - Ok(next_state) => self.lo_state = next_state, - Err(error) => return Some(Err(error)), - } - self.lo_initialized = true; - } + #[test] + fn test_big() -> crate::Result<()> { + let iter0 = (000u64..100).map(|x| crate::Value::new(x.to_be_bytes(), "old", false, 0)); + let iter1 = (100u64..200).map(|x| crate::Value::new(x.to_be_bytes(), "new", false, 3)); + let iter2 = (200u64..300).map(|x| crate::Value::new(x.to_be_bytes(), "asd", true, 1)); + let iter3 = (300u64..400).map(|x| crate::Value::new(x.to_be_bytes(), "qwe", true, 2)); - let min_entry = self.get_min_entry(); + let iter0 = Box::new(iter0.map(Ok)); + let iter1 = Box::new(iter1.map(Ok)); + let iter2 = Box::new(iter2.map(Ok)); + let iter3 = Box::new(iter3.map(Ok)); - match min_entry { - Some(min_entry) => { - let to_advance = self.get_indices_to_advance_forwards(&min_entry); - let newest_entry: Value = self.get_newest_lo_entry(&to_advance); + let start = std::time::Instant::now(); - let advance_result = self.advance_iterators_forwards(to_advance); - match advance_result { - Ok(_) => {} - Err(error) => return Some(Err(error)), - }; + let merge_iter = MergeIterator::new(vec![iter0, iter1, iter2, iter3]); - Some(Ok(newest_entry)) - } - None => None, + for (idx, item) in merge_iter.enumerate() { + let item = item?; + assert_eq!(item.key, (idx as u64).to_be_bytes()); + //assert_eq!(b"new", &*item.value); } + + eprintln!("took {}ms", start.elapsed().as_millis()); + + Ok(()) } -} -impl<'a> DoubleEndedIterator for MergeIterator<'a> { - fn next_back(&mut self) -> Option { - if !self.hi_initialized { - let next_hi_state: Result, _> = self - .iterators - .iter_mut() - .map(DoubleEndedIterator::next_back) - .map(Option::transpose) - .collect(); - match next_hi_state { - Ok(next_state) => self.hi_state = next_state, - Err(error) => return Some(Err(error)), - } - self.hi_initialized = true; - } + #[test] + fn test_mixed() -> crate::Result<()> { + let vec0 = vec![ + crate::Value::new(1u64.to_be_bytes(), "old", false, 0), + crate::Value::new(2u64.to_be_bytes(), "new", false, 2), + crate::Value::new(3u64.to_be_bytes(), "old", false, 0), + ]; + + let vec1 = vec![ + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "old", false, 0), + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + ]; + + let iter0 = Box::new(vec0.iter().cloned().map(Ok)); + let iter1 = Box::new(vec1.iter().cloned().map(Ok)); + + let merge_iter = MergeIterator::new(vec![iter0, iter1]); + let items = merge_iter.collect::>>()?; + + assert_eq!( + items, + vec![ + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "new", false, 2), + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + ] + ); - let max_entry = self.get_max_entry(); + Ok(()) + } - match max_entry { - Some(max_entry) => { - let to_advance = self.get_indices_to_advance_backwards(&max_entry); - let newest_entry: Value = self.get_newest_hi_entry(&to_advance); + #[test] + fn test_forward_merge() -> crate::Result<()> { + let vec0 = vec![ + crate::Value::new(1u64.to_be_bytes(), "old", false, 0), + crate::Value::new(2u64.to_be_bytes(), "old", false, 0), + crate::Value::new(3u64.to_be_bytes(), "old", false, 0), + ]; + + let vec1 = vec![ + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "new", false, 1), + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + ]; + + let iter0 = Box::new(vec0.iter().cloned().map(Ok)); + let iter1 = Box::new(vec1.iter().cloned().map(Ok)); + + let merge_iter = MergeIterator::new(vec![iter0, iter1]); + let items = merge_iter.collect::>>()?; + + assert_eq!( + items, + vec![ + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "new", false, 1), + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + ] + ); - let advance_result = self.advance_iterators_backwards(to_advance); - match advance_result { - Ok(_) => {} - Err(error) => return Some(Err(error)), - }; + Ok(()) + } - Some(Ok(newest_entry)) - } - None => None, - } + #[test] + fn test_rev_merge() -> crate::Result<()> { + let vec0 = vec![ + crate::Value::new(1u64.to_be_bytes(), "old", false, 0), + crate::Value::new(2u64.to_be_bytes(), "old", false, 0), + crate::Value::new(3u64.to_be_bytes(), "old", false, 0), + ]; + + let vec1 = vec![ + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "new", false, 1), + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + ]; + + let iter0 = Box::new(vec0.iter().cloned().map(Ok)); + let iter1 = Box::new(vec1.iter().cloned().map(Ok)); + + let merge_iter = MergeIterator::new(vec![iter0, iter1]); + let items = merge_iter.rev().collect::>>()?; + + assert_eq!( + items, + vec![ + crate::Value::new(3u64.to_be_bytes(), "new", false, 1), + crate::Value::new(2u64.to_be_bytes(), "new", false, 1), + crate::Value::new(1u64.to_be_bytes(), "new", false, 1), + ] + ); + + Ok(()) } } diff --git a/src/value.rs b/src/value.rs index 4d39d32c..da220fc9 100644 --- a/src/value.rs +++ b/src/value.rs @@ -31,6 +31,18 @@ pub struct Value { pub is_tombstone: bool, } +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + self.key.partial_cmp(&other.key) + } +} + +impl Ord for Value { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.key.cmp(&other.key) + } +} + impl Value { /// Creates a new [`Value`]. ///