Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
fixed review remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Jul 26, 2023
1 parent 3b1aa49 commit 73a9f8f
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 41 deletions.
21 changes: 10 additions & 11 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,16 @@ impl Options {
if let Some(region) = self.region.as_deref() {
loader = loader.region(Region::new(region.to_string()));
}
match (&self.access_key_id, &self.secret_access_key) {
(Some(access_key_id), Some(secret_access_key)) => {
loader = loader.credentials_provider(Credentials::new(
access_key_id,
secret_access_key,
None,
None,
"Static",
));
}
_ => {}
if let (Some(access_key_id), Some(secret_access_key)) =
(&self.access_key_id, &self.secret_access_key)
{
loader = loader.credentials_provider(Credentials::new(
access_key_id,
secret_access_key,
None,
None,
"Static",
));
}
if let Some(endpoint) = self.aws_endpoint.as_deref() {
loader = loader.endpoint_url(endpoint);
Expand Down
1 change: 0 additions & 1 deletion sqld/src/database/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ where
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
);

sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx)
}

Expand Down
24 changes: 15 additions & 9 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::replication::replica::Replicator;
use crate::stats::Stats;

use sha256::try_digest;
use tokio::time::{interval, sleep, Instant};
use tokio::time::{interval, sleep, Instant, MissedTickBehavior};

pub use sqld_libsql_bindings as libsql;

Expand Down Expand Up @@ -442,11 +442,14 @@ async fn start_primary(
snapshot_callback: SnapshotCallback,
) -> anyhow::Result<()> {
let is_fresh_db = check_fresh_db(&config.db_path);
// switch frame-count checkpoint to time-based one
let disable_auto_checkpoint = config.checkpoint_interval.is_some();
let logger = Arc::new(ReplicationLogger::open(
&config.db_path,
config.max_log_size,
config.max_log_duration.map(Duration::from_secs_f32),
db_is_dirty,
disable_auto_checkpoint,
snapshot_callback,
)?);

Expand Down Expand Up @@ -596,14 +599,20 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu
let data_path = db_path.join("data");
tracing::info!("setting checkpoint interval to {:?}", period);
let mut interval = interval(period);
let mut retry = None;
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut retry: Option<Duration> = None;
loop {
if let Some(retry) = retry.take() {
if retry.is_zero() {
tracing::warn!("database was not set in WAL journal mode");
return Ok(());
}
sleep(retry).await;
} else {
interval.tick().await;
}
retry = match rusqlite::Connection::open(&data_path) {
let data_path = data_path.clone();
retry = tokio::task::spawn_blocking(move || match rusqlite::Connection::open(&data_path) {
Ok(conn) => unsafe {
let start = Instant::now();
let mut num_checkpointed: std::ffi::c_int = 0;
Expand All @@ -616,11 +625,7 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu
);
if rc == 0 {
if num_checkpointed == -1 {
tracing::warn!(
"database {:?} not in WAL mode. Stopping checkpoint calls.",
data_path
);
return Ok(());
return Some(Duration::default());
} else {
let elapsed = Instant::now() - start;
tracing::info!("database checkpoint (took: {:?})", elapsed);
Expand All @@ -635,7 +640,8 @@ async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Resu
tracing::warn!("couldn't connect to '{:?}': {}", data_path, err);
Some(RETRY_INTERVAL)
}
};
})
.await?;
}
}

Expand Down
41 changes: 21 additions & 20 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{anyhow, bail, ensure};
use anyhow::{bail, ensure};
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
Expand Down Expand Up @@ -719,6 +719,7 @@ impl ReplicationLogger {
max_log_size: u64,
max_log_duration: Option<Duration>,
dirty: bool,
disable_auto_checkpoint: bool,
callback: SnapshotCallback,
) -> anyhow::Result<Self> {
let log_path = db_path.join("wallog");
Expand Down Expand Up @@ -750,7 +751,7 @@ impl ReplicationLogger {
};

if should_recover {
Self::recover(log_file, data_path, callback)
Self::recover(log_file, data_path, callback, disable_auto_checkpoint)
} else {
Self::from_log_file(db_path.to_path_buf(), log_file, callback)
}
Expand Down Expand Up @@ -779,10 +780,11 @@ impl ReplicationLogger {
log_file: LogFile,
mut data_path: PathBuf,
callback: SnapshotCallback,
disable_auto_checkpoint: bool,
) -> anyhow::Result<Self> {
// It is necessary to checkpoint before we restore the replication log, since the WAL may
// contain pages that are not in the database file.
checkpoint_db(&data_path)?;
checkpoint_db(&data_path, disable_auto_checkpoint)?;
let mut log_file = log_file.reset()?;
let snapshot_path = data_path.parent().unwrap().join("snapshots");
// best effort, there may be no snapshots
Expand Down Expand Up @@ -885,7 +887,7 @@ impl ReplicationLogger {
}
}

fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
fn checkpoint_db(data_path: &Path, disable_auto_checkpoint: bool) -> anyhow::Result<()> {
unsafe {
let conn = rusqlite::Connection::open(data_path)?;
conn.query_row("PRAGMA journal_mode=WAL", (), |_| Ok(()))?;
Expand All @@ -900,12 +902,13 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
Ok(())
})?;
// turn off auto_checkpointing - we'll use a fiber to checkpoint in time steps instead
let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0);
if rc != 0 {
return Err(anyhow!(
"Failed to disable WAL autocheckpoint - error code: {}",
rc
));
if disable_auto_checkpoint {
let rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(conn.handle(), 0);
if rc != 0 {
bail!("Failed to disable WAL autocheckpoint - error code: {}", rc)
} else {
tracing::info!("SQLite autocheckpoint disabled");
}
}
let mut num_checkpointed: c_int = 0;
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
Expand All @@ -917,18 +920,13 @@ fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
);
if rc == 0 {
if num_checkpointed == -1 {
Err(anyhow!(
"Checkpoint failed: database journal_mode is not WAL"
))
bail!("Checkpoint failed: database journal_mode is not WAL")
} else {
conn.execute("VACUUM", ())?;
Ok(())
}
} else {
Err(anyhow!(
"Checkpoint failed: wal_checkpoint_v2 error code {}",
rc
))
bail!("Checkpoint failed: wal_checkpoint_v2 error code {}", rc)
}
}
}
Expand All @@ -941,7 +939,8 @@ mod test {
fn write_and_read_from_frame_log() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap();
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();

let frames = (0..10)
.map(|i| WalPage {
Expand Down Expand Up @@ -970,7 +969,8 @@ mod test {
fn index_out_of_bounds() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap();
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();
let log_file = logger.log_file.write();
assert!(matches!(log_file.frame(1), Err(LogReadError::Ahead)));
}
Expand All @@ -980,7 +980,8 @@ mod test {
fn incorrect_frame_size() {
let dir = tempfile::tempdir().unwrap();
let logger =
ReplicationLogger::open(dir.path(), 0, None, false, Box::new(|_| Ok(()))).unwrap();
ReplicationLogger::open(dir.path(), 0, None, false, false, Box::new(|_| Ok(())))
.unwrap();
let entry = WalPage {
page_no: 0,
size_after: 0,
Expand Down

0 comments on commit 73a9f8f

Please sign in to comment.