Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Same-thread frame injection (#695)
Browse files Browse the repository at this point in the history
* add same thread injector

* refactor Frame API; primary return commit frame for snapshots

* hook new injector

* add test assets

* handle fatal replication error

* use persistent replication table

* add doc commen to snapshot frames_iter_from

* review fixes

* fmt

* handle potential injector execute error
  • Loading branch information
MarinPostma authored Oct 13, 2023
1 parent 340e278 commit ebb7ae9
Show file tree
Hide file tree
Showing 21 changed files with 978 additions and 526 deletions.
251 changes: 140 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ hyper = { version = "0.14.23", features = ["http2"] }
hyper-tungstenite = "0.10"
itertools = "0.10.5"
jsonwebtoken = "8.2.0"
memmap = "0.7.0"
mimalloc = { version = "0.1.36", default-features = false }
nix = { version = "0.26.2", features = ["fs"] }
once_cell = "1.17.0"
Expand Down
Binary file added sqld/assets/test/simple_wallog
Binary file not shown.
2 changes: 1 addition & 1 deletion sqld/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut config = Config::new();
config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]);
config.bytes([".wal_log"]);
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]")
Expand Down
7 changes: 1 addition & 6 deletions sqld/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ message LogOffset {
message HelloRequest {}

message HelloResponse {
/// Uuid of the current generation
string generation_id = 1;
/// First frame_no in the current generation
uint64 generation_start_index = 2;
/// Uuid of the database being replicated
string database_id = 3;
string log_id = 3;
}

message Frame {
Expand Down
3 changes: 3 additions & 0 deletions sqld/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum Error {
ConflictingRestoreParameters,
#[error("failed to fork database: {0}")]
Fork(#[from] ForkError),
#[error("fatal replication error")]
FatalReplicationError,
}

trait ResponseError: std::error::Error {
Expand Down Expand Up @@ -129,6 +131,7 @@ impl IntoResponse for Error {
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
Fork(e) => e.into_response(),
FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions sqld/src/namespace/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::time::Duration;
use tokio_stream::StreamExt;

use crate::database::PrimaryDatabase;
use crate::replication::frame::Frame;
use crate::replication::frame::FrameBorrowed;
use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::{LogReadError, ReplicationLogger};
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
Expand Down Expand Up @@ -41,7 +41,7 @@ impl From<tokio::task::JoinError> for ForkError {
}
}

async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> {
let page_no = frame.header().page_no;
let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize;
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl ForkTask<'_> {
match res {
Ok(frame) => {
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(frame, &mut data_file).await?;
write_frame(&frame, &mut data_file).await?;
}
Err(LogReadError::SnapshotRequired) => {
let snapshot = loop {
Expand All @@ -147,7 +147,7 @@ impl ForkTask<'_> {
for frame in iter {
let frame = frame.map_err(ForkError::LogRead)?;
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(frame, &mut data_file).await?;
write_frame(&frame, &mut data_file).await?;
}
}
Err(LogReadError::Ahead) => {
Expand Down
11 changes: 4 additions & 7 deletions sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,30 +561,27 @@ impl Namespace<ReplicaDatabase> {
DatabaseConfigStore::load(&db_path).context("Could not load database config")?,
);

let mut join_set = JoinSet::new();
let replicator = Replicator::new(
db_path.clone(),
config.channel.clone(),
config.uri.clone(),
name.clone(),
&mut join_set,
reset,
)
.await?;

let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone();
let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe();
let mut join_set = JoinSet::new();
join_set.spawn(replicator.run());

let stats = make_stats(
&db_path,
&mut join_set,
config.stats_sender.clone(),
name.clone(),
replicator.current_frame_no_notifier.clone(),
applied_frame_no_receiver.clone(),
)
.await?;

join_set.spawn(replicator.run());

let connection_maker = MakeWriteProxyConn::new(
db_path.clone(),
config.extensions.clone(),
Expand Down
149 changes: 111 additions & 38 deletions sqld/src/replication/frame.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::borrow::Cow;
use std::alloc::Layout;
use std::fmt;
use std::mem::{size_of, transmute};
use std::ops::Deref;
use std::mem::size_of;
use std::ops::{Deref, DerefMut};

use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
use bytes::Bytes;

use crate::LIBSQL_PAGE_SIZE;

Expand All @@ -28,10 +28,18 @@ pub struct FrameHeader {
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
/// The owned version of a replication frame.
/// The shared version of a replication frame.
/// Cloning this is cheap.
pub struct Frame {
data: Bytes,
inner: Bytes,
}

impl TryFrom<&[u8]> for Frame {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
Ok(FrameMut::try_from(data)?.into())
}
}

impl fmt::Debug for Frame {
Expand All @@ -43,64 +51,129 @@ impl fmt::Debug for Frame {
}
}

impl Frame {
/// size of a single frame
pub const SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;
/// Owned version of a frame, on the heap
pub struct FrameMut {
inner: Box<FrameBorrowed>,
}

pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
let mut buf = BytesMut::with_capacity(Self::SIZE);
buf.extend_from_slice(bytes_of(header));
buf.extend_from_slice(data);
impl TryFrom<&[u8]> for FrameMut {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
anyhow::ensure!(
data.len() == size_of::<FrameBorrowed>(),
"invalid frame size"
);
// frames are relatively large (~4ko), we want to avoid allocating them on the stack and
// then copying them to the heap, and instead copy them to the heap directly.
let inner = unsafe {
let layout = Layout::new::<FrameBorrowed>();
let ptr = std::alloc::alloc(layout);
ptr.copy_from(data.as_ptr(), data.len());
Box::from_raw(ptr as *mut FrameBorrowed)
};

Ok(Self { inner })
}
}

impl From<FrameMut> for Frame {
fn from(value: FrameMut) -> Self {
// transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of
// [u8] divides the alignement of FrameBorrowed
let data = unsafe {
Vec::from_raw_parts(
Box::into_raw(value.inner) as *mut u8,
size_of::<FrameBorrowed>(),
size_of::<FrameBorrowed>(),
)
};

Self {
inner: Bytes::from(data),
}
}
}

Self { data: buf.freeze() }
impl From<FrameBorrowed> for FrameMut {
fn from(inner: FrameBorrowed) -> Self {
Self {
inner: Box::new(inner),
}
}
}

pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> {
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size");
Ok(Self { data })
impl Frame {
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
FrameBorrowed::from_parts(header, data).into()
}

pub fn bytes(&self) -> Bytes {
self.data.clone()
self.inner.clone()
}
}

impl From<FrameBorrowed> for Frame {
fn from(value: FrameBorrowed) -> Self {
FrameMut::from(value).into()
}
}

/// The borrowed version of Frame
#[repr(transparent)]
#[repr(C)]
#[derive(Pod, Zeroable, Copy, Clone)]
pub struct FrameBorrowed {
data: [u8],
header: FrameHeader,
page: [u8; LIBSQL_PAGE_SIZE as usize],
}

impl FrameBorrowed {
pub fn header(&self) -> Cow<FrameHeader> {
let data = &self.data[..size_of::<FrameHeader>()];
try_from_bytes(data)
.map(Cow::Borrowed)
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
}

/// Returns the bytes for this frame. Includes the header bytes.
pub fn as_slice(&self) -> &[u8] {
&self.data
}

pub fn from_bytes(data: &[u8]) -> &Self {
assert_eq!(data.len(), Frame::SIZE);
// SAFETY: &FrameBorrowed is equivalent to &[u8]
unsafe { transmute(data) }
bytes_of(self)
}

/// returns this frame's page data.
pub fn page(&self) -> &[u8] {
&self.data[size_of::<FrameHeader>()..]
&self.page
}

pub fn header(&self) -> &FrameHeader {
&self.header
}

pub fn header_mut(&mut self) -> &mut FrameHeader {
&mut self.header
}

pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self {
assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize);

FrameBorrowed {
header: *header,
page: page.try_into().unwrap(),
}
}
}

impl Deref for Frame {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
FrameBorrowed::from_bytes(&self.data)
from_bytes(&self.inner)
}
}

impl Deref for FrameMut {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl DerefMut for FrameMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut()
}
}
Loading

0 comments on commit ebb7ae9

Please sign in to comment.