diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 5ea911eb952a..5fc6aa247a3e 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -236,9 +236,7 @@ jobs: # run pageserver tests with different settings for io_engine in std-fs tokio-epoll-uring ; do - for io_buffer_alignment in 0 1 512 ; do - NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT=$io_buffer_alignment ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)' - done + NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)' done # Run separate tests for real S3 diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 2b212cfed5d7..9241ff569cc3 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -40,7 +40,7 @@ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, - VectoredReadCoalesceMode, VectoredReadPlanner, + VectoredReadPlanner, }; use crate::tenant::PageReconstructError; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; @@ -1133,7 +1133,7 @@ impl DeltaLayerInner { ctx: &RequestContext, ) -> anyhow::Result { use crate::tenant::vectored_blob_io::{ - BlobMeta, VectoredReadBuilder, VectoredReadExtended, + BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended, }; use futures::stream::TryStreamExt; @@ -1183,8 +1183,8 @@ impl DeltaLayerInner { let mut prev: Option<(Key, Lsn, BlobRef)> = None; - let mut read_builder: Option = None; - let read_mode = VectoredReadCoalesceMode::get(); + let mut read_builder: Option = None; + let align = virtual_file::get_io_buffer_alignment(); let max_read_size = self .max_vectored_read_bytes @@ -1228,12 +1228,12 @@ impl DeltaLayerInner { { None } else { - read_builder.replace(VectoredReadBuilder::new( + read_builder.replace(ChunkedVectoredReadBuilder::new( offsets.start.pos(), offsets.end.pos(), meta, max_read_size, - read_mode, + align, )) } } else { diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index aa37a45898bd..1faa6bab99e0 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -185,171 +185,7 @@ pub(crate) enum VectoredReadExtended { No, } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum VectoredReadCoalesceMode { - /// Only coalesce exactly adjacent reads. - AdjacentOnly, - /// In addition to adjacent reads, also consider reads whose corresponding - /// `end` and `start` offsets reside at the same chunk. - Chunked(usize), -} - -impl VectoredReadCoalesceMode { - /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0, - /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher. - pub(crate) fn get() -> Self { - let align = virtual_file::get_io_buffer_alignment_raw(); - if align == 0 { - VectoredReadCoalesceMode::AdjacentOnly - } else { - VectoredReadCoalesceMode::Chunked(align) - } - } -} - -pub(crate) enum VectoredReadBuilder { - Adjacent(AdjacentVectoredReadBuilder), - Chunked(ChunkedVectoredReadBuilder), -} - -impl VectoredReadBuilder { - fn new_impl( - start_offset: u64, - end_offset: u64, - meta: BlobMeta, - max_read_size: Option, - mode: VectoredReadCoalesceMode, - ) -> Self { - match mode { - VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent( - AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size), - ), - VectoredReadCoalesceMode::Chunked(chunk_size) => { - Self::Chunked(ChunkedVectoredReadBuilder::new( - start_offset, - end_offset, - meta, - max_read_size, - chunk_size, - )) - } - } - } - - pub(crate) fn new( - start_offset: u64, - end_offset: u64, - meta: BlobMeta, - max_read_size: usize, - mode: VectoredReadCoalesceMode, - ) -> Self { - Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode) - } - - pub(crate) fn new_streaming( - start_offset: u64, - end_offset: u64, - meta: BlobMeta, - mode: VectoredReadCoalesceMode, - ) -> Self { - Self::new_impl(start_offset, end_offset, meta, None, mode) - } - - pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { - match self { - VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta), - VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta), - } - } - - pub(crate) fn build(self) -> VectoredRead { - match self { - VectoredReadBuilder::Adjacent(builder) => builder.build(), - VectoredReadBuilder::Chunked(builder) => builder.build(), - } - } - - pub(crate) fn size(&self) -> usize { - match self { - VectoredReadBuilder::Adjacent(builder) => builder.size(), - VectoredReadBuilder::Chunked(builder) => builder.size(), - } - } -} - -pub(crate) struct AdjacentVectoredReadBuilder { - /// Start offset of the read. - start: u64, - // End offset of the read. - end: u64, - /// Start offset and metadata for each blob in this read - blobs_at: VecMap, - max_read_size: Option, -} - -impl AdjacentVectoredReadBuilder { - /// Start building a new vectored read. - /// - /// Note that by design, this does not check against reading more than `max_read_size` to - /// support reading larger blobs than the configuration value. The builder will be single use - /// however after that. - pub(crate) fn new( - start_offset: u64, - end_offset: u64, - meta: BlobMeta, - max_read_size: Option, - ) -> Self { - let mut blobs_at = VecMap::default(); - blobs_at - .append(start_offset, meta) - .expect("First insertion always succeeds"); - - Self { - start: start_offset, - end: end_offset, - blobs_at, - max_read_size, - } - } - /// Attempt to extend the current read with a new blob if the start - /// offset matches with the current end of the vectored read - /// and the resuting size is below the max read size - pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended { - tracing::trace!(start, end, "trying to extend"); - let size = (end - start) as usize; - let not_limited_by_max_read_size = { - if let Some(max_read_size) = self.max_read_size { - self.size() + size <= max_read_size - } else { - true - } - }; - - if self.end == start && not_limited_by_max_read_size { - self.end = end; - self.blobs_at - .append(start, meta) - .expect("LSNs are ordered within vectored reads"); - - return VectoredReadExtended::Yes; - } - - VectoredReadExtended::No - } - - pub(crate) fn size(&self) -> usize { - (self.end - self.start) as usize - } - - pub(crate) fn build(self) -> VectoredRead { - VectoredRead { - start: self.start, - end: self.end, - blobs_at: self.blobs_at, - } - } -} - +/// A vectored read builder that tries to coalesce all reads that fits in a chunk. pub(crate) struct ChunkedVectoredReadBuilder { /// Start block number start_blk_no: usize, @@ -373,7 +209,7 @@ impl ChunkedVectoredReadBuilder { /// Note that by design, this does not check against reading more than `max_read_size` to /// support reading larger blobs than the configuration value. The builder will be single use /// however after that. - pub(crate) fn new( + fn new_impl( start_offset: u64, end_offset: u64, meta: BlobMeta, @@ -396,6 +232,25 @@ impl ChunkedVectoredReadBuilder { } } + pub(crate) fn new( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + max_read_size: usize, + align: usize, + ) -> Self { + Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), align) + } + + pub(crate) fn new_streaming( + start_offset: u64, + end_offset: u64, + meta: BlobMeta, + align: usize, + ) -> Self { + Self::new_impl(start_offset, end_offset, meta, None, align) + } + /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk. /// /// The resulting size also must be below the max read size. @@ -474,17 +329,17 @@ pub struct VectoredReadPlanner { max_read_size: usize, - mode: VectoredReadCoalesceMode, + align: usize, } impl VectoredReadPlanner { pub fn new(max_read_size: usize) -> Self { - let mode = VectoredReadCoalesceMode::get(); + let align = virtual_file::get_io_buffer_alignment(); Self { blobs: BTreeMap::new(), prev: None, max_read_size, - mode, + align, } } @@ -545,7 +400,7 @@ impl VectoredReadPlanner { } pub fn finish(self) -> Vec { - let mut current_read_builder: Option = None; + let mut current_read_builder: Option = None; let mut reads = Vec::new(); for (key, blobs_for_key) in self.blobs { @@ -558,12 +413,12 @@ impl VectoredReadPlanner { }; if extended == VectoredReadExtended::No { - let next_read_builder = VectoredReadBuilder::new( + let next_read_builder = ChunkedVectoredReadBuilder::new( start_offset, end_offset, BlobMeta { key, lsn }, self.max_read_size, - self.mode, + self.align, ); let prev_read_builder = current_read_builder.replace(next_read_builder); @@ -688,7 +543,7 @@ impl<'a> VectoredBlobReader<'a> { /// `handle` gets called and when the current key would just exceed the read_size and /// max_cnt constraints. pub struct StreamingVectoredReadPlanner { - read_builder: Option, + read_builder: Option, // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`] prev: Option<(Key, Lsn, u64)>, /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150, @@ -699,21 +554,21 @@ pub struct StreamingVectoredReadPlanner { /// Size of the current batch cnt: usize, - mode: VectoredReadCoalesceMode, + align: usize, } impl StreamingVectoredReadPlanner { pub fn new(max_read_size: u64, max_cnt: usize) -> Self { assert!(max_cnt > 0); assert!(max_read_size > 0); - let mode = VectoredReadCoalesceMode::get(); + let align = virtual_file::get_io_buffer_alignment(); Self { read_builder: None, prev: None, max_cnt, max_read_size, cnt: 0, - mode, + align, } } @@ -762,11 +617,11 @@ impl StreamingVectoredReadPlanner { } None => { self.read_builder = { - Some(VectoredReadBuilder::new_streaming( + Some(ChunkedVectoredReadBuilder::new_streaming( start_offset, end_offset, BlobMeta { key, lsn }, - self.mode, + self.align, )) }; } @@ -1092,7 +947,7 @@ mod tests { let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16; let mut buf = BytesMut::with_capacity(reserved_bytes); - let mode = VectoredReadCoalesceMode::get(); + let align = virtual_file::get_io_buffer_alignment(); let vectored_blob_reader = VectoredBlobReader::new(&file); let meta = BlobMeta { key: Key::MIN, @@ -1104,7 +959,8 @@ mod tests { if idx + 1 == offsets.len() { continue; } - let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode); + let read_builder = + ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, align); let read = read_builder.build(); let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?; assert_eq!(result.blobs.len(), 1); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 57856eea8079..17d9f94c9507 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1147,7 +1147,9 @@ pub fn init(num_slots: usize, engine: IoEngineKind, io_buffer_alignment: usize) panic!("virtual_file::init called twice"); } if set_io_buffer_alignment(io_buffer_alignment).is_err() { - panic!("IO buffer alignment ({io_buffer_alignment}) is not a power of two"); + panic!( + "IO buffer alignment needs to be a power of two and greater than 512, got {io_buffer_alignment}" + ); } io_engine::init(engine); crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64); @@ -1174,14 +1176,16 @@ fn get_open_files() -> &'static OpenFiles { static IO_BUFFER_ALIGNMENT: AtomicUsize = AtomicUsize::new(DEFAULT_IO_BUFFER_ALIGNMENT); -/// Returns true if `x` is zero or a power of two. -fn is_zero_or_power_of_two(x: usize) -> bool { - (x == 0) || ((x & (x - 1)) == 0) +/// Returns true if the alignment is a power of two and is greater or equal to 512. +fn is_valid_io_buffer_alignment(align: usize) -> bool { + align.is_power_of_two() && align >= 512 } +/// Sets IO buffer alignment requirement. Returns error if the alignment requirement is +/// not a power of two or less than 512 bytes. #[allow(unused)] pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> { - if is_zero_or_power_of_two(align) { + if is_valid_io_buffer_alignment(align) { IO_BUFFER_ALIGNMENT.store(align, std::sync::atomic::Ordering::Relaxed); Ok(()) } else { @@ -1189,19 +1193,19 @@ pub(crate) fn set_io_buffer_alignment(align: usize) -> Result<(), usize> { } } -/// Gets the io buffer alignment requirement. Returns 0 if there is no requirement specified. +/// Gets the io buffer alignment. /// -/// This function should be used to check the raw config value. -pub(crate) fn get_io_buffer_alignment_raw() -> usize { +/// This function should be used for getting the actual alignment value to use. +pub(crate) fn get_io_buffer_alignment() -> usize { let align = IO_BUFFER_ALIGNMENT.load(std::sync::atomic::Ordering::Relaxed); if cfg!(test) { let env_var_name = "NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT"; if let Some(test_align) = utils::env::var(env_var_name) { - if is_zero_or_power_of_two(test_align) { + if is_valid_io_buffer_alignment(test_align) { test_align } else { - panic!("IO buffer alignment ({test_align}) is not a power of two"); + panic!("IO buffer alignment needs to be a power of two and greater than 512, got {test_align}"); } } else { align @@ -1211,14 +1215,6 @@ pub(crate) fn get_io_buffer_alignment_raw() -> usize { } } -/// Gets the io buffer alignment requirement. Returns 1 if the alignment config is set to zero. -/// -/// This function should be used for getting the actual alignment value to use. -pub(crate) fn get_io_buffer_alignment() -> usize { - let align = get_io_buffer_alignment_raw(); - align.max(1) -} - #[cfg(test)] mod tests { use crate::context::DownloadBehavior;