From 84ae31bad73b5179d69faa93d743a43eab8ffeb2 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Wed, 25 Sep 2024 16:23:20 -0400 Subject: [PATCH] use IoBufferMut for delta and image layers Signed-off-by: Yuchen Liang --- .../src/tenant/storage_layer/delta_layer.rs | 20 ++++++++++------- .../src/tenant/storage_layer/image_layer.rs | 22 ++++++++++--------- pageserver/src/tenant/vectored_blob_io.rs | 10 +++++---- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2b212cfed5d7..808fd5bcd222 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -43,12 +43,12 @@ use crate::tenant::vectored_blob_io::{ VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::tenant::PageReconstructError; +use crate::virtual_file::dio::IoBufferMut; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -989,7 +989,8 @@ impl DeltaLayerInner { .0 .into(); let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + let align = virtual_file::get_io_buffer_alignment(); + let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align)); // Note that reads are processed in reverse order (from highest key+lsn). // This is the order that `ReconstructState` requires such that it can @@ -1016,7 +1017,7 @@ impl DeltaLayerInner { // We have "lost" the buffer since the lower level IO api // doesn't return the buffer on error. Allocate a new one. - buf = Some(BytesMut::with_capacity(buf_size)); + buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align)); continue; } @@ -1191,7 +1192,8 @@ impl DeltaLayerInner { .map(|x| x.0.get()) .unwrap_or(8192); - let mut buffer = Some(BytesMut::with_capacity(max_read_size)); + let align = virtual_file::get_io_buffer_alignment(); + let mut buffer = Some(IoBufferMut::with_capacity_aligned(max_read_size, align)); // FIXME: buffering of DeltaLayerWriter let mut per_blob_copy = Vec::new(); @@ -1550,12 +1552,12 @@ impl<'a> DeltaLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let align = virtual_file::get_io_buffer_alignment(); + let buf = IoBufferMut::with_capacity_aligned(buf_size, align); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let blob_read = meta.read(&view).await?; let value = Value::des(&blob_read)?; @@ -1930,7 +1932,9 @@ pub(crate) mod test { &vectored_reads, constants::MAX_VECTORED_READ_BYTES, ); - let mut buf = Some(BytesMut::with_capacity(buf_size)); + + let align = virtual_file::get_io_buffer_alignment(); + let mut buf = Some(IoBufferMut::with_capacity_aligned(buf_size, align)); for read in vectored_reads { let blobs_buf = vectored_blob_reader diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 940d169db096..c81002dd8a97 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -40,11 +40,12 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::PageReconstructError; +use crate::virtual_file::dio::IoBufferMut; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; use itertools::Itertools; @@ -542,14 +543,15 @@ impl ImageLayerInner { .await?; let vectored_blob_reader = VectoredBlobReader::new(&self.file); + let align = virtual_file::get_io_buffer_alignment(); let mut key_count = 0; for read in plan.into_iter() { let buf_size = read.size(); - let buf = BytesMut::with_capacity(buf_size); + let buf = IoBufferMut::with_capacity_aligned(buf_size, align); let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; @@ -597,13 +599,13 @@ impl ImageLayerInner { ); } - let buf = BytesMut::with_capacity(buf_size); + let align = virtual_file::get_io_buffer_alignment(); + let buf = IoBufferMut::with_capacity_aligned(buf_size, align); let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; match res { Ok(blobs_buf) => { - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await; @@ -1037,12 +1039,12 @@ impl<'a> ImageLayerIterator<'a> { let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file); let mut next_batch = std::collections::VecDeque::new(); let buf_size = plan.size(); - let buf = BytesMut::with_capacity(buf_size); + let align = virtual_file::get_io_buffer_alignment(); + let buf = IoBufferMut::with_capacity_aligned(buf_size, align); let blobs_buf = vectored_blob_reader .read_blobs(&plan, buf, self.ctx) .await?; - let frozen_buf = blobs_buf.buf.freeze(); - let view = BufView::new_bytes(frozen_buf); + let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { let img_buf = meta.read(&view).await?; next_batch.push_back(( diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index aa37a45898bd..7b375ac79b7b 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::ops::Deref; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pageserver_api::key::Key; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::BoundedBuf; @@ -27,6 +27,7 @@ use utils::vec_map::VecMap; use crate::context::RequestContext; use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK}; +use crate::virtual_file::dio::IoBufferMut; use crate::virtual_file::{self, VirtualFile}; /// Metadata bundled with the start and end offset of a blob. @@ -158,7 +159,7 @@ impl std::fmt::Display for VectoredBlob { /// Return type of [`VectoredBlobReader::read_blobs`] pub struct VectoredBlobsBuf { /// Buffer for all blobs in this read - pub buf: BytesMut, + pub buf: IoBufferMut, /// Offsets into the buffer and metadata for all blobs in this read pub blobs: Vec, } @@ -605,7 +606,7 @@ impl<'a> VectoredBlobReader<'a> { pub async fn read_blobs( &self, read: &VectoredRead, - buf: BytesMut, + buf: IoBufferMut, ctx: &RequestContext, ) -> Result { assert!(read.size() > 0); @@ -1090,7 +1091,8 @@ mod tests { // Multiply by two (compressed data might need more space), and add a few bytes for the header let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; - let mut buf = BytesMut::with_capacity(reserved_bytes); + let align = virtual_file::get_io_buffer_alignment(); + let mut buf = IoBufferMut::with_capacity_aligned(reserved_bytes, align); let mode = VectoredReadCoalesceMode::get(); let vectored_blob_reader = VectoredBlobReader::new(&file);