From 73a9f8fbbcf64aee49a79cec666f0ce507434080 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 25 Jul 2023 13:49:26 +0200 Subject: [PATCH] fixed review remarks --- bottomless/src/replicator.rs | 21 +++++++------ sqld/src/database/libsql.rs | 1 - sqld/src/lib.rs | 24 +++++++++------ sqld/src/replication/primary/logger.rs | 41 +++++++++++++------------- 4 files changed, 46 insertions(+), 41 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 982833bb0..648ecc916 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -112,17 +112,16 @@ impl Options { if let Some(region) = self.region.as_deref() { loader = loader.region(Region::new(region.to_string())); } - match (&self.access_key_id, &self.secret_access_key) { - (Some(access_key_id), Some(secret_access_key)) => { - loader = loader.credentials_provider(Credentials::new( - access_key_id, - secret_access_key, - None, - None, - "Static", - )); - } - _ => {} + if let (Some(access_key_id), Some(secret_access_key)) = + (&self.access_key_id, &self.secret_access_key) + { + loader = loader.credentials_provider(Credentials::new( + access_key_id, + secret_access_key, + None, + None, + "Static", + )); } if let Some(endpoint) = self.aws_endpoint.as_deref() { loader = loader.endpoint_url(endpoint); diff --git a/sqld/src/database/libsql.rs b/sqld/src/database/libsql.rs index 02a9af53a..1874b5272 100644 --- a/sqld/src/database/libsql.rs +++ b/sqld/src/database/libsql.rs @@ -152,7 +152,6 @@ where | OpenFlags::SQLITE_OPEN_URI | OpenFlags::SQLITE_OPEN_NO_MUTEX, ); - sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx) } diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index a6ccfb982..45ced204b 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -31,7 +31,7 @@ use crate::replication::replica::Replicator; use crate::stats::Stats; use sha256::try_digest; -use tokio::time::{interval, sleep, Instant}; +use tokio::time::{interval, sleep, Instant, MissedTickBehavior}; pub use sqld_libsql_bindings as libsql; @@ -442,11 +442,14 @@ async fn start_primary( snapshot_callback: SnapshotCallback, ) -> anyhow::Result<()> { let is_fresh_db = check_fresh_db(&config.db_path); + // switch frame-count checkpoint to time-based one + let disable_auto_checkpoint = config.checkpoint_interval.is_some(); let logger = Arc::new(ReplicationLogger::open( &config.db_path, config.max_log_size, config.max_log_duration.map(Duration::from_secs_f32), db_is_dirty, + disable_auto_checkpoint, snapshot_callback, )?); @@ -596,14 +599,20 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu let data_path = db_path.join("data"); tracing::info!("setting checkpoint interval to {:?}", period); let mut interval = interval(period); - let mut retry = None; + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut retry: Option = None; loop { if let Some(retry) = retry.take() { + if retry.is_zero() { + tracing::warn!("database was not set in WAL journal mode"); + return Ok(()); + } sleep(retry).await; } else { interval.tick().await; } - retry = match rusqlite::Connection::open(&data_path) { + let data_path = data_path.clone(); + retry = tokio::task::spawn_blocking(move || match rusqlite::Connection::open(&data_path) { Ok(conn) => unsafe { let start = Instant::now(); let mut num_checkpointed: std::ffi::c_int = 0; @@ -616,11 +625,7 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu ); if rc == 0 { if num_checkpointed == -1 { - tracing::warn!( - "database {:?} not in WAL mode. Stopping checkpoint calls.", - data_path - ); - return Ok(()); + return Some(Duration::default()); } else { let elapsed = Instant::now() - start; tracing::info!("database checkpoint (took: {:?})", elapsed); @@ -635,7 +640,8 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu tracing::warn!("couldn't connect to '{:?}': {}", data_path, err); Some(RETRY_INTERVAL) } - }; + }) + .await?; } } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 5b8cc9f17..161fb4550 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -6,7 +6,7 @@ use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::{anyhow, bail, ensure}; +use anyhow::{bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; @@ -719,6 +719,7 @@ impl ReplicationLogger { max_log_size: u64, max_log_duration: Option, dirty: bool, + disable_auto_checkpoint: bool, callback: SnapshotCallback, ) -> anyhow::Result { let log_path = db_path.join("wallog"); @@ -750,7 +751,7 @@ impl ReplicationLogger { }; if should_recover { - Self::recover(log_file, data_path, callback) + Self::recover(log_file, data_path, callback, disable_auto_checkpoint) } else { Self::from_log_file(db_path.to_path_buf(), log_file, callback) } @@ -779,10 +780,11 @@ impl ReplicationLogger { log_file: LogFile, mut data_path: PathBuf, callback: SnapshotCallback, + disable_auto_checkpoint: bool, ) -> anyhow::Result { // It is necessary to checkpoint before we restore the replication log, since the WAL may // contain pages that are not in the database file. - checkpoint_db(&data_path)?; + checkpoint_db(&data_path, disable_auto_checkpoint)?; let mut log_file = log_file.reset()?; let snapshot_path = data_path.parent().unwrap().join("snapshots"); // best effort, there may be no snapshots @@ -885,7 +887,7 @@ impl ReplicationLogger { } } -fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { +fn checkpoint_db(data_path: &Path, disable_auto_checkpoint: bool) -> anyhow::Result<()> { unsafe { let conn = rusqlite::Connection::open(data_path)?; conn.query_row("PRAGMA journal_mode=WAL", (), |_| Ok(()))?; @@ -900,12 +902,13 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { Ok(()) })?; // turn off auto_checkpointing - we'll use a fiber to checkpoint in time steps instead - let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0); - if rc != 0 { - return Err(anyhow!( - "Failed to disable WAL autocheckpoint - error code: {}", - rc - )); + if disable_auto_checkpoint { + let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0); + if rc != 0 { + bail!("Failed to disable WAL autocheckpoint - error code: {}", rc) + } else { + tracing::info!("SQLite autocheckpoint disabled"); + } } let mut num_checkpointed: c_int = 0; let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2( @@ -917,18 +920,13 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> { ); if rc == 0 { if num_checkpointed == -1 { - Err(anyhow!( - "Checkpoint failed: database journal_mode is not WAL" - )) + bail!("Checkpoint failed: database journal_mode is not WAL") } else { conn.execute("VACUUM", ())?; Ok(()) } } else { - Err(anyhow!( - "Checkpoint failed: wal_checkpoint_v2 error code {}", - rc - )) + bail!("Checkpoint failed: wal_checkpoint_v2 error code {}", rc) } } } @@ -941,7 +939,8 @@ mod test { fn write_and_read_from_frame_log() { let dir = tempfile::tempdir().unwrap(); let logger = - ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap(); + ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(()))) + .unwrap(); let frames = (0..10) .map(|i| WalPage { @@ -970,7 +969,8 @@ mod test { fn index_out_of_bounds() { let dir = tempfile::tempdir().unwrap(); let logger = - ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap(); + ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(()))) + .unwrap(); let log_file = logger.log_file.write(); assert!(matches!(log_file.frame(1), Err(LogReadError::Ahead))); } @@ -980,7 +980,8 @@ mod test { fn incorrect_frame_size() { let dir = tempfile::tempdir().unwrap(); let logger = - ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap(); + ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(()))) + .unwrap(); let entry = WalPage { page_no: 0, size_after: 0,