Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
made auto_checkpoint configurable at Connection::open level
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Aug 2, 2023
1 parent 075c370 commit 37d3ce2
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 36 deletions.
7 changes: 6 additions & 1 deletion sqld-libsql-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl<'a> Connection<'a> {
// it has been instanciated and lives for long enough
_wal_hook: &'static WalMethodsHook<W>,
hook_ctx: &'a mut W::Context,
auto_checkpoint: u32,
) -> Result<Self, rusqlite::Error> {
let path = path.as_ref().join("data");
tracing::trace!(
Expand All @@ -87,7 +88,7 @@ impl<'a> Connection<'a> {
// We pass a pointer to the WAL methods data to the database connection. This means
// that the reference must outlive the connection. This is guaranteed by the marker in
// the returned connection.
let rc = rusqlite::ffi::libsql_open_v2(
let mut rc = rusqlite::ffi::libsql_open_v2(
filename.as_ptr(),
&mut db as *mut _,
flags.bits(),
Expand All @@ -96,6 +97,10 @@ impl<'a> Connection<'a> {
hook_ctx as *mut _ as *mut _,
);

if rc == 0 {
rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(db, auto_checkpoint as _);
}

if rc != 0 {
rusqlite::ffi::sqlite3_close(db);
return Err(rusqlite::Error::SqliteFailure(
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/database/dump/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ impl DumpLoader {

let (ok_snd, ok_rcv) = oneshot::channel::<anyhow::Result<()>>();
tokio::task::spawn_blocking(move || {
let auto_checkpoint = logger.auto_checkpoint;
let mut ctx = ReplicationLoggerHookCtx::new(logger, bottomless_replicator);
let mut retries = 0;
let db = loop {
match open_db(&path, &REPLICATION_METHODS, &mut ctx, None) {
match open_db(&path, &REPLICATION_METHODS, &mut ctx, None, auto_checkpoint) {
Ok(db) => {
if ok_snd.send(Ok(())).is_ok() {
break db;
Expand Down
14 changes: 11 additions & 3 deletions sqld/src/database/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::query::Query;
use crate::query_analysis::{State, StmtKind};
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
use crate::stats::Stats;
use crate::Result;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::config::DatabaseConfigStore;
use super::factory::DbFactory;
Expand Down Expand Up @@ -113,6 +113,7 @@ where
self.config_store.clone(),
QueryBuilderConfig {
max_size: Some(self.max_response_size),
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
},
)
.await
Expand Down Expand Up @@ -142,6 +143,7 @@ pub fn open_db<'a, W>(
wal_methods: &'static WalMethodsHook<W>,
hook_ctx: &'a mut W::Context,
flags: Option<OpenFlags>,
auto_checkpoint: u32,
) -> Result<sqld_libsql_bindings::Connection<'a>, rusqlite::Error>
where
W: WalHook,
Expand All @@ -152,7 +154,7 @@ where
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
);
sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx)
sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx, auto_checkpoint)
}

impl LibSqlDb {
Expand Down Expand Up @@ -251,7 +253,13 @@ impl<'a> Connection<'a> {
builder_config: QueryBuilderConfig,
) -> Result<Self> {
let this = Self {
conn: open_db(path, wal_methods, hook_ctx, None)?,
conn: open_db(
path,
wal_methods,
hook_ctx,
None,
builder_config.auto_checkpoint,
)?,
timeout_deadline: None,
timed_out: false,
stats,
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/database/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
use crate::rpc::proxy::rpc::query_result::RowResult;
use crate::rpc::proxy::rpc::{DisconnectMessage, ExecuteResults};
use crate::stats::Stats;
use crate::Result;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::config::DatabaseConfigStore;
use super::Program;
Expand Down Expand Up @@ -75,6 +75,7 @@ impl DbFactory for WriteProxyDbFactory {
self.applied_frame_no_receiver.clone(),
QueryBuilderConfig {
max_size: Some(self.max_response_size),
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
},
)
.await?;
Expand Down
13 changes: 9 additions & 4 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub mod version;

const MAX_CONCCURENT_DBS: usize = 128;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
pub enum Backend {
Expand Down Expand Up @@ -443,14 +444,18 @@ async fn start_primary(
) -> anyhow::Result<()> {
let is_fresh_db = check_fresh_db(&config.db_path);
// switch frame-count checkpoint to time-based one
let disable_auto_checkpoint =
config.checkpoint_interval.is_some() && config.bottomless_replication.is_some();
let auto_checkpoint =
if config.checkpoint_interval.is_some() && config.bottomless_replication.is_some() {
0
} else {
DEFAULT_AUTO_CHECKPOINT
};
let logger = Arc::new(ReplicationLogger::open(
&config.db_path,
config.max_log_size,
config.max_log_duration.map(Duration::from_secs_f32),
db_is_dirty,
disable_auto_checkpoint,
auto_checkpoint,
snapshot_callback,
)?);

Expand Down Expand Up @@ -559,7 +564,7 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<(
// initialize a connection here, and keep it alive for the entirety of the program. If we
// fail to open it, we wait for `duration` and try again later.
let ctx = &mut ();
let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)) {
let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY), DEFAULT_AUTO_CHECKPOINT) {
Ok(conn) => Some(conn),
Err(e) => {
tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}");
Expand Down
1 change: 1 addition & 0 deletions sqld/src/query_result_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl<'a> From<&'a rusqlite::Column<'a>> for Column<'a> {
#[derive(Debug, Clone, Copy, Default)]
pub struct QueryBuilderConfig {
pub max_size: Option<u64>,
pub auto_checkpoint: u32,
}

pub trait QueryResultBuilder: Send + 'static {
Expand Down
77 changes: 51 additions & 26 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ pub struct ReplicationLogger {
/// a notifier channel other tasks can subscribe to, and get notified when new frames become
/// available.
pub new_frame_notifier: watch::Sender<FrameNo>,
pub auto_checkpoint: u32,
}

impl ReplicationLogger {
Expand All @@ -719,7 +720,7 @@ impl ReplicationLogger {
max_log_size: u64,
max_log_duration: Option<Duration>,
dirty: bool,
disable_auto_checkpoint: bool,
auto_checkpoint: u32,
callback: SnapshotCallback,
) -> anyhow::Result<Self> {
let log_path = db_path.join("wallog");
Expand Down Expand Up @@ -751,40 +752,54 @@ impl ReplicationLogger {
};

if should_recover {
Self::recover(log_file, data_path, callback, disable_auto_checkpoint)
Self::recover(log_file, data_path, callback, auto_checkpoint)
} else {
Self::from_log_file(db_path.to_path_buf(), log_file, callback)
Self::from_log_file(db_path.to_path_buf(), log_file, callback, auto_checkpoint)
}
}

fn from_log_file(
db_path: PathBuf,
log_file: LogFile,
callback: SnapshotCallback,
auto_checkpoint: u32,
) -> anyhow::Result<Self> {
let header = log_file.header();
let generation_start_frame_no = header.start_frame_no + header.frame_count;

let (new_frame_notifier, _) = watch::channel(generation_start_frame_no);

unsafe {
let conn = rusqlite::Connection::open(&db_path)?;
let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), auto_checkpoint as _);
if rc != 0 {
bail!(
"Failed to set WAL autocheckpoint to {} - error code: {}",
auto_checkpoint,
rc
)
} else {
tracing::info!("SQLite autocheckpoint: {}", auto_checkpoint);
}
}
Ok(Self {
generation: Generation::new(generation_start_frame_no),
compactor: LogCompactor::new(&db_path, log_file.header.db_id, callback)?,
log_file: RwLock::new(log_file),
db_path,
new_frame_notifier,
auto_checkpoint,
})
}

fn recover(
log_file: LogFile,
mut data_path: PathBuf,
callback: SnapshotCallback,
disable_auto_checkpoint: bool,
auto_checkpoint: u32,
) -> anyhow::Result<Self> {
// It is necessary to checkpoint before we restore the replication log, since the WAL may
// contain pages that are not in the database file.
checkpoint_db(&data_path, disable_auto_checkpoint)?;
checkpoint_db(&data_path)?;
let mut log_file = log_file.reset()?;
let snapshot_path = data_path.parent().unwrap().join("snapshots");
// best effort, there may be no snapshots
Expand Down Expand Up @@ -813,7 +828,7 @@ impl ReplicationLogger {

assert!(data_path.pop());

Self::from_log_file(data_path, log_file, callback)
Self::from_log_file(data_path, log_file, callback, auto_checkpoint)
}

pub fn database_id(&self) -> anyhow::Result<Uuid> {
Expand Down Expand Up @@ -887,7 +902,7 @@ impl ReplicationLogger {
}
}

fn checkpoint_db(data_path: &Path, disable_auto_checkpoint: bool) -> anyhow::Result<()> {
fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
unsafe {
let conn = rusqlite::Connection::open(data_path)?;
conn.query_row("PRAGMA journal_mode=WAL", (), |_| Ok(()))?;
Expand All @@ -901,15 +916,6 @@ fn checkpoint_db(data_path: &Path, disable_auto_checkpoint: bool) -> anyhow::Res
);
Ok(())
})?;
// turn off auto_checkpointing - we'll use a fiber to checkpoint in time steps instead
if disable_auto_checkpoint {
let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0);
if rc != 0 {
bail!("Failed to disable WAL autocheckpoint - error code: {}", rc)
} else {
tracing::info!("SQLite autocheckpoint disabled");
}
}
let mut num_checkpointed: c_int = 0;
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
conn.handle(),
Expand All @@ -933,13 +939,20 @@ fn checkpoint_db(data_path: &Path, disable_auto_checkpoint: bool) -> anyhow::Res
#[cfg(test)]
mod test {
use super::*;
use crate::DEFAULT_AUTO_CHECKPOINT;

#[test]
fn write_and_read_from_frame_log() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();
let logger = ReplicationLogger::open(
dir.path(),
0,
None,
false,
DEFAULT_AUTO_CHECKPOINT,
Box::new(|_| Ok(())),
)
.unwrap();

let frames = (0..10)
.map(|i| WalPage {
Expand Down Expand Up @@ -967,9 +980,15 @@ mod test {
#[test]
fn index_out_of_bounds() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();
let logger = ReplicationLogger::open(
dir.path(),
0,
None,
false,
DEFAULT_AUTO_CHECKPOINT,
Box::new(|_| Ok(())),
)
.unwrap();
let log_file = logger.log_file.write();
assert!(matches!(log_file.frame(1), Err(LogReadError::Ahead)));
}
Expand All @@ -978,9 +997,15 @@ mod test {
#[should_panic]
fn incorrect_frame_size() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();
let logger = ReplicationLogger::open(
dir.path(),
0,
None,
false,
DEFAULT_AUTO_CHECKPOINT,
Box::new(|_| Ok(())),
)
.unwrap();
let entry = WalPage {
page_no: 0,
size_after: 0,
Expand Down
2 changes: 2 additions & 0 deletions sqld/src/replication/replica/injector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::Path;

use crate::DEFAULT_AUTO_CHECKPOINT;
use rusqlite::OpenFlags;

use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION};
Expand All @@ -20,6 +21,7 @@ impl<'a> FrameInjector<'a> {
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
&INJECTOR_METHODS,
hook_ctx,
DEFAULT_AUTO_CHECKPOINT,
)?;

Ok(Self { conn })
Expand Down

0 comments on commit 37d3ce2

Please sign in to comment.