Skip to content

Commit

Permalink
use IoBufferMut for delta and image layers
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Sep 25, 2024
1 parent d942b91 commit 84ae31b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
20 changes: 12 additions & 8 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -989,7 +989,8 @@ impl DeltaLayerInner {
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));

// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
Expand All @@ -1016,7 +1017,7 @@ impl DeltaLayerInner {

// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));

continue;
}
Expand Down Expand Up @@ -1191,7 +1192,8 @@ impl DeltaLayerInner {
.map(|x| x.0.get())
.unwrap_or(8192);

let mut buffer = Some(BytesMut::with_capacity(max_read_size));
let align = virtual_file::get_io_buffer_alignment();
let mut buffer = Some(IoBufferMut::with_capacity_aligned(max_read_size, align));

// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
Expand Down Expand Up @@ -1550,12 +1552,12 @@ impl<'a> DeltaLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let blob_read = meta.read(&view).await?;
let value = Value::des(&blob_read)?;
Expand Down Expand Up @@ -1930,7 +1932,9 @@ pub(crate) mod test {
&vectored_reads,
constants::MAX_VECTORED_READ_BYTES,
);
let mut buf = Some(BytesMut::with_capacity(buf_size));

let align = virtual_file::get_io_buffer_alignment();
let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align));

for read in vectored_reads {
let blobs_buf = vectored_blob_reader
Expand Down
22 changes: 12 additions & 10 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ use crate::tenant::vectored_blob_io::{
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
Expand Down Expand Up @@ -542,14 +543,15 @@ impl ImageLayerInner {
.await?;

let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let align = virtual_file::get_io_buffer_alignment();
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();

let buf = BytesMut::with_capacity(buf_size);
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);

let view = BufView::new_slice(&blobs_buf.buf);

for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
Expand Down Expand Up @@ -597,13 +599,13 @@ impl ImageLayerInner {
);
}

let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;

match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;

Expand Down Expand Up @@ -1037,12 +1039,12 @@ impl<'a> ImageLayerIterator<'a> {
let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
let mut next_batch = std::collections::VecDeque::new();
let buf_size = plan.size();
let buf = BytesMut::with_capacity(buf_size);
let align = virtual_file::get_io_buffer_alignment();
let buf = IoBufferMut::with_capacity_aligned(buf_size, align);
let blobs_buf = vectored_blob_reader
.read_blobs(&plan, buf, self.ctx)
.await?;
let frozen_buf = blobs_buf.buf.freeze();
let view = BufView::new_bytes(frozen_buf);
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await?;
next_batch.push_back((
Expand Down
10 changes: 6 additions & 4 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::BTreeMap;
use std::ops::Deref;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
Expand All @@ -27,6 +27,7 @@ use utils::vec_map::VecMap;

use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::dio::IoBufferMut;
use crate::virtual_file::{self, VirtualFile};

/// Metadata bundled with the start and end offset of a blob.
Expand Down Expand Up @@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob {
/// Return type of [`VectoredBlobReader::read_blobs`]
pub struct VectoredBlobsBuf {
/// Buffer for all blobs in this read
pub buf: BytesMut,
pub buf: IoBufferMut,
/// Offsets into the buffer and metadata for all blobs in this read
pub blobs: Vec<VectoredBlob>,
}
Expand Down Expand Up @@ -605,7 +606,7 @@ impl<'a> VectoredBlobReader<'a> {
pub async fn read_blobs(
&self,
read: &VectoredRead,
buf: BytesMut,
buf: IoBufferMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
Expand Down Expand Up @@ -1090,7 +1091,8 @@ mod tests {

// Multiply by two (compressed data might need more space), and add a few bytes for the header
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);
let align = virtual_file::get_io_buffer_alignment();
let mut buf = IoBufferMut::with_capacity_aligned(reserved_bytes, align);

let mode = VectoredReadCoalesceMode::get();
let vectored_blob_reader = VectoredBlobReader::new(&file);
Expand Down

0 comments on commit 84ae31b

Please sign in to comment.