Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
turn off sqlite auto_checkpoint and introduce period-based checkpoint…
Browse files Browse the repository at this point in the history
… calls
  • Loading branch information
Horusiath committed Jul 24, 2023
1 parent f50803f commit ce6b596
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
56 changes: 56 additions & 0 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::never::Never;
use libsql::wal_hook::TRANSPARENT_METHODS;
use once_cell::sync::Lazy;
use rpc::run_rpc_server;
use rusqlite::ffi::SQLITE_CHECKPOINT_TRUNCATE;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinSet;
use tonic::transport::Channel;
Expand All @@ -30,6 +31,7 @@ use crate::replication::replica::Replicator;
use crate::stats::Stats;

use sha256::try_digest;
use tokio::time::{interval, sleep, Instant};

pub use sqld_libsql_bindings as libsql;

Expand Down Expand Up @@ -105,6 +107,7 @@ pub struct Config {
pub max_response_size: u64,
pub snapshot_exec: Option<String>,
pub http_replication_addr: Option<SocketAddr>,
pub checkpoint_interval: Option<Duration>,
}

impl Default for Config {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl Default for Config {
max_response_size: 10 * 1024 * 1024, // 10MiB
snapshot_exec: None,
http_replication_addr: None,
checkpoint_interval: None,
}
}
}
Expand Down Expand Up @@ -587,6 +591,54 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<(
Ok(())
}

async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Result<()> {
const RETRY_INTERVAL: Duration = Duration::from_secs(60);
let data_path = db_path.join("data");
tracing::info!("setting checkpoint interval to {:?}", period);
let mut interval = interval(period);
let mut retry = None;
loop {
if let Some(retry) = retry.take() {
sleep(retry).await;
} else {
interval.tick().await;
}
retry = match rusqlite::Connection::open(&data_path) {
Ok(conn) => unsafe {
let start = Instant::now();
let mut num_checkpointed: std::ffi::c_int = 0;
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
conn.handle(),
std::ptr::null(),
SQLITE_CHECKPOINT_TRUNCATE,
&mut num_checkpointed as *mut _,
std::ptr::null_mut(),
);
if rc == 0 {
if num_checkpointed == -1 {
tracing::warn!(
"database {:?} not in WAL mode. Stopping checkpoint calls.",
data_path
);
return Ok(());
} else {
let elapsed = Instant::now() - start;
tracing::info!("database checkpoint (took: {:?})", elapsed);
}
None
} else {
tracing::warn!("failed to execute checkpoint - error code: {}", rc);
Some(RETRY_INTERVAL)
}
},
Err(err) => {
tracing::warn!("couldn't connect to '{:?}': {}", data_path, err);
Some(RETRY_INTERVAL)
}
};
}
}

fn sentinel_file_path(path: &Path) -> PathBuf {
path.join(".sentinel")
}
Expand Down Expand Up @@ -708,6 +760,10 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
join_set.spawn(run_storage_monitor(config.db_path.clone(), stats));
}

if let Some(interval) = config.checkpoint_interval {
join_set.spawn(run_checkpoint_cron(config.db_path.clone(), interval));
}

let reset = HARD_RESET.clone();
loop {
tokio::select! {
Expand Down
5 changes: 5 additions & 0 deletions sqld/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ struct Cli {
/// The address and port for the replication HTTP API.
#[clap(long, env = "SQLD_HTTP_REPLICATION_LISTEN_ADDR")]
http_replication_listen_addr: Option<SocketAddr>,
/// Interval in seconds, in which WAL checkpoint is being called.
/// By default, the interval is 1 hour.
#[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S", default_value = "3600")]
checkpoint_interval_s: Option<u64>,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -297,6 +301,7 @@ fn config_from_args(args: Cli) -> Result<Config> {
max_response_size: args.max_response_size.0,
snapshot_exec: args.snapshot_exec,
http_replication_addr: args.http_replication_listen_addr,
checkpoint_interval: args.checkpoint_interval_s.map(Duration::from_secs),
})
}

Expand Down
37 changes: 25 additions & 12 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{bail, ensure};
use anyhow::{anyhow, bail, ensure};
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
Expand Down Expand Up @@ -265,7 +265,7 @@ impl ReplicationLoggerHookCtx {
logger: Arc<ReplicationLogger>,
bottomless_replicator: Option<Arc<std::sync::Mutex<bottomless::replicator::Replicator>>>,
) -> Self {
tracing::trace!("bottomless replication enabled: {bottomless_replicator:?}");
tracing::trace!("bottomless replication enabled");
Self {
buffer: Default::default(),
logger,
Expand Down Expand Up @@ -897,6 +897,14 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
);
Ok(())
})?;
// turn off auto_checkpointing - we'll use a fiber to checkpoint in time steps instead
let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0);
if rc != 0 {
return Err(anyhow!(
"Failed to disable WAL autocheckpoint - error code: {}",
rc
));
}
let mut num_checkpointed: c_int = 0;
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
conn.handle(),
Expand All @@ -905,17 +913,22 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
&mut num_checkpointed as *mut _,
std::ptr::null_mut(),
);

// TODO: ensure correct page size
ensure!(
rc == 0 && num_checkpointed >= 0,
"failed to checkpoint database while recovering replication log"
);

conn.execute("VACUUM", ())?;
if rc == 0 {
if num_checkpointed == -1 {
Err(anyhow!(
"Checkpoint failed: database journal_mode is not WAL"
))
} else {
conn.execute("VACUUM", ())?;
Ok(())
}
} else {
Err(anyhow!(
"Checkpoint failed: wal_checkpoint_v2 error code {}",
rc
))
}
}

Ok(())
}

#[cfg(test)]
Expand Down

0 comments on commit ce6b596

Please sign in to comment.