diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index a144e7446..a6ccfb982 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -11,6 +11,7 @@ use futures::never::Never; use libsql::wal_hook::TRANSPARENT_METHODS; use once_cell::sync::Lazy; use rpc::run_rpc_server; +use rusqlite::ffi::SQLITE_CHECKPOINT_TRUNCATE; use tokio::sync::{mpsc, Notify}; use tokio::task::JoinSet; use tonic::transport::Channel; @@ -30,6 +31,7 @@ use crate::replication::replica::Replicator; use crate::stats::Stats; use sha256::try_digest; +use tokio::time::{interval, sleep, Instant}; pub use sqld_libsql_bindings as libsql; @@ -105,6 +107,7 @@ pub struct Config { pub max_response_size: u64, pub snapshot_exec: Option, pub http_replication_addr: Option, + pub checkpoint_interval: Option, } impl Default for Config { @@ -145,6 +148,7 @@ impl Default for Config { max_response_size: 10 * 1024 * 1024, // 10MiB snapshot_exec: None, http_replication_addr: None, + checkpoint_interval: None, } } } @@ -587,6 +591,54 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<( Ok(()) } +async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Result<()> { + const RETRY_INTERVAL: Duration = Duration::from_secs(60); + let data_path = db_path.join("data"); + tracing::info!("setting checkpoint interval to {:?}", period); + let mut interval = interval(period); + let mut retry = None; + loop { + if let Some(retry) = retry.take() { + sleep(retry).await; + } else { + interval.tick().await; + } + retry = match rusqlite::Connection::open(&data_path) { + Ok(conn) => unsafe { + let start = Instant::now(); + let mut num_checkpointed: std::ffi::c_int = 0; + let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2( + conn.handle(), + std::ptr::null(), + SQLITE_CHECKPOINT_TRUNCATE, + &mut num_checkpointed as *mut _, + std::ptr::null_mut(), + ); + if rc == 0 { + if num_checkpointed == -1 { + tracing::warn!( + "database {:?} not in WAL mode. Stopping checkpoint calls.", + data_path + ); + return Ok(()); + } else { + let elapsed = Instant::now() - start; + tracing::info!("database checkpoint (took: {:?})", elapsed); + } + None + } else { + tracing::warn!("failed to execute checkpoint - error code: {}", rc); + Some(RETRY_INTERVAL) + } + }, + Err(err) => { + tracing::warn!("couldn't connect to '{:?}': {}", data_path, err); + Some(RETRY_INTERVAL) + } + }; + } +} + fn sentinel_file_path(path: &Path) -> PathBuf { path.join(".sentinel") } @@ -708,6 +760,10 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> { join_set.spawn(run_storage_monitor(config.db_path.clone(), stats)); } + if let Some(interval) = config.checkpoint_interval { + join_set.spawn(run_checkpoint_cron(config.db_path.clone(), interval)); + } + let reset = HARD_RESET.clone(); loop { tokio::select! { diff --git a/sqld/src/main.rs b/sqld/src/main.rs index f05d9b9e5..0bc5ef20c 100644 --- a/sqld/src/main.rs +++ b/sqld/src/main.rs @@ -187,6 +187,10 @@ struct Cli { /// The address and port for the replication HTTP API. #[clap(long, env = "SQLD_HTTP_REPLICATION_LISTEN_ADDR")] http_replication_listen_addr: Option, + /// Interval in seconds, in which WAL checkpoint is being called. + /// By default, the interval is 1 hour. + #[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S", default_value = "3600")] + checkpoint_interval_s: Option, } #[derive(clap::Subcommand, Debug)] @@ -297,6 +301,7 @@ fn config_from_args(args: Cli) -> Result { max_response_size: args.max_response_size.0, snapshot_exec: args.snapshot_exec, http_replication_addr: args.http_replication_listen_addr, + checkpoint_interval: args.checkpoint_interval_s.map(Duration::from_secs), }) } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 6ec012dc1..275a549b7 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -6,7 +6,7 @@ use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::{bail, ensure}; +use anyhow::{anyhow, bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; @@ -265,7 +265,7 @@ impl ReplicationLoggerHookCtx { logger: Arc, bottomless_replicator: Option>>, ) -> Self { - tracing::trace!("bottomless replication enabled: {bottomless_replicator:?}"); + tracing::trace!("bottomless replication enabled"); Self { buffer: Default::default(), logger, @@ -897,6 +897,14 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { ); Ok(()) })?; + // turn off auto_checkpointing - we'll use a fiber to checkpoint in time steps instead + let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0); + if rc != 0 { + return Err(anyhow!( + "Failed to disable WAL autocheckpoint - error code: {}", + rc + )); + } let mut num_checkpointed: c_int = 0; let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2( conn.handle(), @@ -905,17 +913,22 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { &mut num_checkpointed as *mut _, std::ptr::null_mut(), ); - - // TODO: ensure correct page size - ensure!( - rc == 0 && num_checkpointed >= 0, - "failed to checkpoint database while recovering replication log" - ); - - conn.execute("VACUUM", ())?; + if rc == 0 { + if num_checkpointed == -1 { + Err(anyhow!( + "Checkpoint failed: database journal_mode is not WAL" + )) + } else { + conn.execute("VACUUM", ())?; + Ok(()) + } + } else { + Err(anyhow!( + "Checkpoint failed: wal_checkpoint_v2 error code {}", + rc + )) + } } - - Ok(()) } #[cfg(test)]