diff --git a/Cargo.lock b/Cargo.lock index 3be6778b7d..96499371b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12609,7 +12609,6 @@ dependencies = [ "prometheus-client 0.22.3", "rand", "rayon", - "schnellru", "schnorrkel", "serde", "serde_json", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 4d83aa4f26..766d38b23b 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -46,7 +46,6 @@ pin-project = "1.1.5" prometheus-client = "0.22.3" rand = "0.8.5" rayon = "1.10.0" -schnellru = "0.2.3" schnorrkel = "0.11.4" serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.128" @@ -67,7 +66,7 @@ supports-color = { version = "3.0.1", optional = true } tempfile = "3.13.0" thiserror = "2.0.0" thread-priority = "1.1.0" -tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio-stream = { version = "0.1.16", features = ["sync"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs index 8efac9d909..41f372d7a8 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs @@ -6,7 +6,7 @@ use futures::StreamExt; use prometheus_client::registry::Registry; use std::fs; use std::future::Future; -use std::num::NonZeroUsize; +use std::num::{NonZeroU32, NonZeroUsize}; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; @@ -160,19 +160,30 @@ pub(super) async fn cache( let caches = disk_caches .iter() .map(|disk_cache| { - DiskPieceCache::open( - &disk_cache.directory, + let capacity = u32::try_from(disk_cache.allocated_space / DiskPieceCache::element_size() as u64) - .unwrap_or(u32::MAX), - None, - Some(registry), - ) - .map_err(|error| { + .map_err(|error| { + anyhow!( + "Unsupported cache #1 size {} at {}: {error}", + disk_cache.allocated_space, + disk_cache.directory.display() + ) + })?; + let capacity = NonZeroU32::try_from(capacity).map_err(|error| { anyhow!( - "Failed to open piece cache at {}: {error}", + "Unsupported cache #2 size {} at {}: {error}", + disk_cache.allocated_space, disk_cache.directory.display() ) - }) + })?; + DiskPieceCache::open(&disk_cache.directory, capacity, None, Some(registry)).map_err( + |error| { + anyhow!( + "Failed to open piece cache at {}: {error}", + disk_cache.directory.display() + ) + }, + ) }) .collect::, _>>()?; diff --git a/crates/subspace-farmer/src/disk_piece_cache.rs b/crates/subspace-farmer/src/disk_piece_cache.rs index 811a170d4e..3456de2b40 100644 --- a/crates/subspace-farmer/src/disk_piece_cache.rs +++ b/crates/subspace-farmer/src/disk_piece_cache.rs @@ -15,6 +15,7 @@ use futures::channel::mpsc; use futures::{stream, SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; +use std::num::NonZeroU32; use std::path::Path; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; @@ -51,9 +52,6 @@ pub enum DiskPieceCacheError { /// Max offset max: u32, }, - /// Cache size has zero capacity, this is not supported, cache size needs to be larger - #[error("Cache size has zero capacity, this is not supported, cache size needs to be larger")] - ZeroCapacity, /// Checksum mismatch #[error("Checksum mismatch")] ChecksumMismatch, @@ -235,14 +233,11 @@ impl DiskPieceCache { /// Open cache, capacity is measured in elements of [`DiskPieceCache::element_size()`] size pub fn open( directory: &Path, - capacity: u32, + capacity: NonZeroU32, id: Option, registry: Option<&mut Registry>, ) -> Result { - if capacity == 0 { - return Err(DiskPieceCacheError::ZeroCapacity); - } - + let capacity = capacity.get(); let files = FilePool::open(&directory.join(Self::FILE_NAME))?; let expected_size = u64::from(Self::element_size()) * u64::from(capacity); diff --git a/crates/subspace-farmer/src/disk_piece_cache/tests.rs b/crates/subspace-farmer/src/disk_piece_cache/tests.rs index 4f89861876..a6a60bea53 100644 --- a/crates/subspace-farmer/src/disk_piece_cache/tests.rs +++ b/crates/subspace-farmer/src/disk_piece_cache/tests.rs @@ -1,6 +1,7 @@ use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError, PieceCacheOffset}; use rand::prelude::*; use std::assert_matches::assert_matches; +use std::num::NonZeroU32; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tempfile::tempdir; @@ -8,7 +9,8 @@ use tempfile::tempdir; fn basic() { let path = tempdir().unwrap(); { - let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap(); + let disk_piece_cache = + DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap(); // Initially empty assert_eq!( @@ -115,7 +117,8 @@ fn basic() { // Reopening works { - let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap(); + let disk_piece_cache = + DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap(); // Two pieces stored assert_eq!( disk_piece_cache @@ -130,7 +133,8 @@ fn basic() { { DiskPieceCache::wipe(path.as_ref()).unwrap(); - let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap(); + let disk_piece_cache = + DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap(); // Wiped successfully assert_eq!( disk_piece_cache diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 19474b0cc8..07d1fd686d 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -38,6 +38,7 @@ use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::{KeyWithDistance, LocalRecordProvider}; use tokio::runtime::Handle; +use tokio::sync::Semaphore; use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; @@ -410,7 +411,11 @@ where match maybe_cache { Ok(cache) => { let backend = cache.backend; - stored_pieces.extend(cache.cache_stored_pieces.into_iter()); + for (key, cache_offset) in cache.cache_stored_pieces { + if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) { + dangling_free_offsets.push_front(old_cache_offset); + } + } dangling_free_offsets.extend( cache.cache_free_offsets.into_iter().filter(|free_offset| { free_offset.piece_offset.0 < backend.used_capacity @@ -533,14 +538,23 @@ where let downloaded_pieces_count = AtomicUsize::new(0); let caches = Mutex::new(caches); self.handlers.progress.call_simple(&0.0); + let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate(); + + let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES); let downloading_pieces_stream = - stream::iter(piece_indices_to_store.into_iter().map(|piece_indices| { + stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| { let downloaded_pieces_count = &downloaded_pieces_count; let caches = &caches; async move { - let mut pieces_stream = match piece_getter.get_pieces(piece_indices).await { + let mut permit = downloading_semaphore + .acquire_many(SYNC_BATCH_SIZE as u32) + .await + .expect("Semaphore is never closed; qed"); + debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces"); + + let pieces_stream = match piece_getter.get_pieces(piece_indices).await { Ok(pieces_stream) => pieces_stream, Err(error) => { error!( @@ -550,8 +564,11 @@ where return; } }; + let mut pieces_stream = pieces_stream.enumerate(); + + while let Some((index, (piece_index, result))) = pieces_stream.next().await { + debug!(%batch, %index, %piece_index, "Downloaded piece"); - while let Some((piece_index, result)) = pieces_stream.next().await { let piece = match result { Ok(Some(piece)) => { trace!(%piece_index, "Downloaded piece successfully"); @@ -570,6 +587,8 @@ where continue; } }; + // Release slot for future batches + permit.split(1); let (offset, maybe_backend) = { let mut caches = caches.lock(); @@ -629,10 +648,13 @@ where } } })); + // Download several batches concurrently to make sure slow tail of one is compensated by // another downloading_pieces_stream - .buffer_unordered(SYNC_CONCURRENT_BATCHES) + // This allows to schedule new batch while previous batches partially completed, but + // avoids excessive memory usage like when all futures are created upfront + .buffer_unordered(SYNC_CONCURRENT_BATCHES * 2) // Simply drain everything .for_each(|()| async {}) .await; diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index 9ca4463d85..2c55c5c32f 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -8,7 +8,7 @@ use futures::{SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use rand::prelude::*; use std::collections::HashMap; -use std::num::NonZeroU64; +use std::num::{NonZeroU32, NonZeroU64}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -226,8 +226,24 @@ async fn basic() { farmer_cache .replace_backing_caches( vec![ - Arc::new(DiskPieceCache::open(path1.as_ref(), 1, None, None).unwrap()), - Arc::new(DiskPieceCache::open(path2.as_ref(), 1, None, None).unwrap()), + Arc::new( + DiskPieceCache::open( + path1.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), + Arc::new( + DiskPieceCache::open( + path2.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), ], vec![], ) @@ -260,7 +276,7 @@ async fn basic() { // Update current segment header such that we keep-up after initial sync is triggered current_segment_index.store(1, Ordering::Release); - // Send segment headers receiver such that keep-up sync can start not + // Send segment headers receiver such that keep-up sync can start now let (mut archived_segment_headers_sender, archived_segment_headers_receiver) = mpsc::channel(0); archived_segment_headers_stream_request_receiver @@ -271,7 +287,8 @@ async fn basic() { .unwrap(); // Send segment header with the same segment index as "current", so it will have no - // side-effects, but acknowledgement will indicate that keep-up after initial sync has finished + // side effects, but acknowledgement will indicate that keep-up after initial sync has + // finished { let segment_header = SegmentHeader::V0 { segment_index: SegmentIndex::ONE, @@ -328,8 +345,8 @@ async fn basic() { } } - // Send two more segment headers (one is not enough because for above peer ID there are no pieces for it to - // store) + // Send two more segment headers (one is not enough because for above peer ID there are no + // pieces for it to store) for segment_index in [2, 3] { let segment_header = SegmentHeader::V0 { segment_index: SegmentIndex::from(segment_index), @@ -426,8 +443,24 @@ async fn basic() { farmer_cache .replace_backing_caches( vec![ - Arc::new(DiskPieceCache::open(path1.as_ref(), 1, None, None).unwrap()), - Arc::new(DiskPieceCache::open(path2.as_ref(), 1, None, None).unwrap()), + Arc::new( + DiskPieceCache::open( + path1.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), + Arc::new( + DiskPieceCache::open( + path2.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), ], vec![], ) @@ -455,6 +488,164 @@ async fn basic() { } } +#[tokio::test(flavor = "multi_thread")] +async fn duplicate_indices() { + let current_segment_index = Arc::new(AtomicU64::new(0)); + let pieces = Arc::default(); + let ( + archived_segment_headers_stream_request_sender, + mut archived_segment_headers_stream_request_receiver, + ) = mpsc::channel(0); + let (acknowledge_archived_segment_header_sender, _acknowledge_archived_segment_header_receiver) = + mpsc::channel(0); + + let node_client = MockNodeClient { + current_segment_index: Arc::clone(¤t_segment_index), + pieces: Arc::clone(&pieces), + archived_segment_headers_stream_request_sender, + acknowledge_archived_segment_header_sender, + }; + let piece_getter = MockPieceGetter { + pieces: Arc::clone(&pieces), + }; + let public_key = + identity::PublicKey::from(identity::ed25519::PublicKey::try_from_bytes(&[42; 32]).unwrap()); + let path1 = tempdir().unwrap(); + let path2 = tempdir().unwrap(); + + // Initialize both disk caches with the same exact contents + for path in [path1.as_ref(), path2.as_ref()] { + let (farmer_cache, farmer_cache_worker) = + FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None); + + let farmer_cache_worker_exited = + tokio::spawn(farmer_cache_worker.run(piece_getter.clone())); + + let (sender, receiver) = oneshot::channel(); + farmer_cache + .on_sync_progress(Arc::new({ + let sender = Mutex::new(Some(sender)); + + move |progress| { + if *progress == 100.0 { + if let Some(sender) = sender.lock().take() { + sender.send(()).unwrap(); + } + } + } + })) + .detach(); + farmer_cache + .replace_backing_caches( + vec![Arc::new( + DiskPieceCache::open(path, NonZeroU32::new(1).unwrap(), None, None).unwrap(), + )], + vec![], + ) + .await; + + // Wait for piece cache to be initialized + receiver.await.unwrap(); + + drop(farmer_cache); + + // Make worker exit + let (mut archived_segment_headers_sender, archived_segment_headers_receiver) = + mpsc::channel(0); + archived_segment_headers_stream_request_receiver + .next() + .await + .unwrap() + .send(archived_segment_headers_receiver) + .unwrap(); + // Make worker exit + archived_segment_headers_sender.close().await.unwrap(); + + farmer_cache_worker_exited.await.unwrap(); + } + + { + // Clear requested pieces + pieces.lock().clear(); + + let (farmer_cache, farmer_cache_worker) = + FarmerCache::new(node_client.clone(), public_key.to_peer_id(), None); + + let farmer_cache_worker_exited = tokio::spawn(farmer_cache_worker.run(piece_getter)); + + let (sender, receiver) = oneshot::channel(); + farmer_cache + .on_sync_progress(Arc::new({ + let sender = Mutex::new(Some(sender)); + + move |progress| { + if *progress == 100.0 { + if let Some(sender) = sender.lock().take() { + sender.send(()).unwrap(); + } + } + } + })) + .detach(); + + // Reopen with the same backing caches + farmer_cache + .replace_backing_caches( + vec![ + Arc::new( + DiskPieceCache::open( + path1.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), + Arc::new( + DiskPieceCache::open( + path2.as_ref(), + NonZeroU32::new(1).unwrap(), + None, + None, + ) + .unwrap(), + ), + ], + vec![], + ) + .await; + + // Wait for piece cache to be initialized + receiver.await.unwrap(); + + // One piece must be requested + let requested_pieces = pieces.lock().keys().copied().collect::>(); + assert_eq!(requested_pieces.len(), 1); + + // Must have stored requested piece + farmer_cache + .get_piece(requested_pieces[0].to_multihash()) + .await + .unwrap(); + + drop(farmer_cache); + + // Make worker exit + let (mut archived_segment_headers_sender, archived_segment_headers_receiver) = + mpsc::channel(0); + archived_segment_headers_stream_request_receiver + .next() + .await + .unwrap() + .send(archived_segment_headers_receiver) + .unwrap(); + // Make worker exit + archived_segment_headers_sender.close().await.unwrap(); + + farmer_cache_worker_exited.await.unwrap(); + } +} + #[test] fn decode_piece_index_from_record_key_test() { let piece_index_0 = PieceIndex::from(0); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index db9c9c1b66..ebfda43729 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -58,7 +58,7 @@ use std::collections::HashSet; use std::fs::{File, OpenOptions}; use std::future::Future; use std::io::Write; -use std::num::NonZeroUsize; +use std::num::{NonZeroU32, NonZeroUsize}; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; @@ -899,9 +899,7 @@ impl SingleDiskFarm { SingleDiskPieceCache::new( id, - if piece_cache_capacity == 0 { - None - } else { + if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) { Some(task::block_in_place(|| { if let Some(registry) = registry { DiskPieceCache::open( @@ -914,6 +912,8 @@ impl SingleDiskFarm { DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None) } })?) + } else { + None }, ) }; @@ -2298,7 +2298,7 @@ impl SingleDiskFarm { let _ = cache_file.advise_sequential_access(); let cache_size = match cache_file.size() { - Ok(metadata_size) => metadata_size, + Ok(cache_size) => cache_size, Err(error) => { return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error }); } diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index cab3a11728..5c5c877935 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -29,7 +29,7 @@ use std::task::{Context, Poll}; use std::{fmt, iter, mem}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; use tokio_stream::StreamMap; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace, warn, Instrument}; /// Validates piece against using its commitment. #[async_trait] @@ -493,18 +493,38 @@ async fn get_from_cache_inner( PV: PieceValidator, PieceIndices: Iterator, { - // Download from connected peers first - let pieces_to_download = - download_cached_pieces_from_connected_peers(piece_indices, node, piece_validator, &results) - .await; + let download_id = random::(); + + // TODO: It'd be nice to combine downloading from connected peers with downloading from closest + // peers concurrently + let fut = async move { + // Download from connected peers first + let pieces_to_download = download_cached_pieces_from_connected_peers( + piece_indices, + node, + piece_validator, + &results, + ) + .await; - if pieces_to_download.is_empty() { - return; - } + if pieces_to_download.is_empty() { + debug!("Done"); + return; + } - // Download from iteratively closer peers according to Kademlia rules - download_cached_pieces_from_closest_peers(pieces_to_download, node, piece_validator, &results) + // Download from iteratively closer peers according to Kademlia rules + download_cached_pieces_from_closest_peers( + pieces_to_download, + node, + piece_validator, + &results, + ) .await; + + debug!("Done #2"); + }; + + fut.instrument(tracing::info_span!("", %download_id)).await; } /// Takes pieces to download as an input, sends results with pieces that were downloaded @@ -528,15 +548,24 @@ where let mut pieces_to_download = piece_indices .map(|piece_index| (piece_index, HashMap::new())) .collect::>>>(); + + debug!(num_pieces = %pieces_to_download.len(), "Starting"); + let mut checked_connected_peers = HashSet::new(); // The loop is in order to check peers that might be connected after the initial loop has // started. loop { let Ok(connected_peers) = node.connected_peers().await else { + trace!("Connected peers error"); break; }; + debug!( + connected_peers = %connected_peers.len(), + pieces_to_download = %pieces_to_download.len(), + "Loop" + ); if connected_peers.is_empty() || pieces_to_download.is_empty() { break; } @@ -589,6 +618,7 @@ where mut cached_pieces, not_cached_pieces, } = result; + trace!(%piece_index, %peer_id, result = %result.is_some(), "Piece response"); let Some(result) = result else { // Downloading failed, ignore peer @@ -597,6 +627,8 @@ where match result { PieceResult::Piece(piece) => { + trace!(%piece_index, %peer_id, "Got piece"); + // Downloaded successfully pieces_to_download.remove(&piece_index); @@ -609,10 +641,16 @@ where } } PieceResult::ClosestPeers(closest_peers) => { + trace!(%piece_index, %peer_id, "Got closest peers"); + // Store closer peers in case piece index was not downloaded yet if let Some(peers) = pieces_to_download.get_mut(&piece_index) { peers.extend(Vec::from(closest_peers)); } + + // No need to ask this peer again if they didn't have the piece we expected, or + // they claimed to have earlier + continue; } } @@ -639,8 +677,10 @@ where let piece_index_to_download_next = if let Some(piece_index) = maybe_piece_index_to_download_next { + trace!(%piece_index, %peer_id, "Next piece to download from peer"); piece_index } else { + trace!(%peer_id, "Peer doesn't have anything else"); // Nothing left to do with this peer continue; }; @@ -680,6 +720,7 @@ where } if pieces_to_download.len() == num_pieces { + debug!(%num_pieces, "Finished downloading from connected peers"); // Nothing was downloaded, we're done here break; }