Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
fix: Compaction cannot be performed when writing continuous transac…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
KKould committed Jan 8, 2024
1 parent 3f5e551 commit 76acf12
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kip_db"
version = "0.1.2-alpha.20"
version = "0.1.2-alpha.21"
edition = "2021"
authors = ["Kould <[email protected]>"]
description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库"
Expand Down
4 changes: 1 addition & 3 deletions src/kernel/lsm/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ impl Compactor {
&mut self,
option_tx: Option<oneshot::Sender<()>>,
) -> KernelResult<()> {
let is_force = option_tx.is_some();

if let Some((gen, values)) = self.mem_table().try_swap(is_force)? {
if let Some((gen, values)) = self.mem_table().swap()? {
if !values.is_empty() {
let start = Instant::now();
// 目前minor触发major时是同步进行的,所以此处对live_tag是在此方法体保持存活
Expand Down
18 changes: 7 additions & 11 deletions src/kernel/lsm/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,10 @@ pub(crate) struct TableInner {
}

macro_rules! check_count {
($count:ident, $is_force:ident) => {
($count:ident) => {
if 0 != $count.load(Acquire) {
if $is_force {
std::hint::spin_loop();
continue;
} else {
return Ok(None);
}
std::hint::spin_loop();
continue;
}
};
}
Expand Down Expand Up @@ -270,19 +266,19 @@ impl MemTable {
}

/// MemTable将数据弹出并转移到immut table中 (弹出数据为转移至immut table中数据的迭代器)
pub(crate) fn try_swap(&self, is_force: bool) -> KernelResult<Option<(i64, Vec<KeyValue>)>> {
pub(crate) fn swap(&self) -> KernelResult<Option<(i64, Vec<KeyValue>)>> {
let count = &self.tx_count;

loop {
check_count!(count, is_force);
check_count!(count);

let mut inner = self.inner.lock();
// 二重检测防止lock时(前)突然出现事务
// 当lock后,即使出现事务,会因为lock已被Compactor获取而无法读写,
// 因此不会对读写进行干扰
// 并且事务即使在lock后出现,所持有的seq为该压缩之前,
// 也不会丢失该seq的_mem,因为转移到了_immut,可以从_immut得到对应seq的数据
check_count!(count, is_force);
check_count!(count);

return if !inner._mem.is_empty() {
inner.trigger.reset();
Expand Down Expand Up @@ -510,7 +506,7 @@ mod tests {
let _ = mem_table
.insert_data((Bytes::from(vec![b'k', b'2']), Some(Bytes::from(vec![b'2']))))?;

let (_, mut vec) = mem_table.try_swap(false)?.unwrap();
let (_, mut vec) = mem_table.swap()?.unwrap();

assert_eq!(
vec.pop(),
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/lsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod mem_table;
pub mod mvcc;
pub mod storage;
mod table;
mod trigger;
pub mod trigger;
pub mod version;

const MAX_LEVEL: usize = 4;
Expand Down

0 comments on commit 76acf12

Please sign in to comment.