Skip to content

Commit

Permalink
Merge pull request #3238 from autonomys/piece-cache-fixes
Browse files Browse the repository at this point in the history
Piece cache fixes and improvements
  • Loading branch information
nazar-pc authored Nov 18, 2024
2 parents 4d0fa55 + d303125 commit 8fe5849
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 53 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pin-project = "1.1.5"
prometheus-client = "0.22.3"
rand = "0.8.5"
rayon = "1.10.0"
schnellru = "0.2.3"
schnorrkel = "0.11.4"
serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.128"
Expand All @@ -67,7 +66,7 @@ supports-color = { version = "3.0.1", optional = true }
tempfile = "3.13.0"
thiserror = "2.0.0"
thread-priority = "1.1.0"
tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.16", features = ["sync"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::StreamExt;
use prometheus_client::registry::Registry;
use std::fs;
use std::future::Future;
use std::num::NonZeroUsize;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
Expand Down Expand Up @@ -160,19 +160,30 @@ pub(super) async fn cache(
let caches = disk_caches
.iter()
.map(|disk_cache| {
DiskPieceCache::open(
&disk_cache.directory,
let capacity =
u32::try_from(disk_cache.allocated_space / DiskPieceCache::element_size() as u64)
.unwrap_or(u32::MAX),
None,
Some(registry),
)
.map_err(|error| {
.map_err(|error| {
anyhow!(
"Unsupported cache #1 size {} at {}: {error}",
disk_cache.allocated_space,
disk_cache.directory.display()
)
})?;
let capacity = NonZeroU32::try_from(capacity).map_err(|error| {
anyhow!(
"Failed to open piece cache at {}: {error}",
"Unsupported cache #2 size {} at {}: {error}",
disk_cache.allocated_space,
disk_cache.directory.display()
)
})
})?;
DiskPieceCache::open(&disk_cache.directory, capacity, None, Some(registry)).map_err(
|error| {
anyhow!(
"Failed to open piece cache at {}: {error}",
disk_cache.directory.display()
)
},
)
})
.collect::<Result<Vec<_>, _>>()?;

Expand Down
11 changes: 3 additions & 8 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::num::NonZeroU32;
use std::path::Path;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -51,9 +52,6 @@ pub enum DiskPieceCacheError {
/// Max offset
max: u32,
},
/// Cache size has zero capacity, this is not supported, cache size needs to be larger
#[error("Cache size has zero capacity, this is not supported, cache size needs to be larger")]
ZeroCapacity,
/// Checksum mismatch
#[error("Checksum mismatch")]
ChecksumMismatch,
Expand Down Expand Up @@ -235,14 +233,11 @@ impl DiskPieceCache {
/// Open cache, capacity is measured in elements of [`DiskPieceCache::element_size()`] size
pub fn open(
directory: &Path,
capacity: u32,
capacity: NonZeroU32,
id: Option<PieceCacheId>,
registry: Option<&mut Registry>,
) -> Result<Self, DiskPieceCacheError> {
if capacity == 0 {
return Err(DiskPieceCacheError::ZeroCapacity);
}

let capacity = capacity.get();
let files = FilePool::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
Expand Down
10 changes: 7 additions & 3 deletions crates/subspace-farmer/src/disk_piece_cache/tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError, PieceCacheOffset};
use rand::prelude::*;
use std::assert_matches::assert_matches;
use std::num::NonZeroU32;
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use tempfile::tempdir;

#[test]
fn basic() {
let path = tempdir().unwrap();
{
let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap();
let disk_piece_cache =
DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap();

// Initially empty
assert_eq!(
Expand Down Expand Up @@ -115,7 +117,8 @@ fn basic() {

// Reopening works
{
let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap();
let disk_piece_cache =
DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap();
// Two pieces stored
assert_eq!(
disk_piece_cache
Expand All @@ -130,7 +133,8 @@ fn basic() {
{
DiskPieceCache::wipe(path.as_ref()).unwrap();

let disk_piece_cache = DiskPieceCache::open(path.as_ref(), 2, None, None).unwrap();
let disk_piece_cache =
DiskPieceCache::open(path.as_ref(), NonZeroU32::new(2).unwrap(), None, None).unwrap();
// Wiped successfully
assert_eq!(
disk_piece_cache
Expand Down
32 changes: 27 additions & 5 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use tokio::sync::Semaphore;
use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

Expand Down Expand Up @@ -410,7 +411,11 @@ where
match maybe_cache {
Ok(cache) => {
let backend = cache.backend;
stored_pieces.extend(cache.cache_stored_pieces.into_iter());
for (key, cache_offset) in cache.cache_stored_pieces {
if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
dangling_free_offsets.push_front(old_cache_offset);
}
}
dangling_free_offsets.extend(
cache.cache_free_offsets.into_iter().filter(|free_offset| {
free_offset.piece_offset.0 < backend.used_capacity
Expand Down Expand Up @@ -533,14 +538,23 @@ where
let downloaded_pieces_count = AtomicUsize::new(0);
let caches = Mutex::new(caches);
self.handlers.progress.call_simple(&0.0);
let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();

let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);

let downloading_pieces_stream =
stream::iter(piece_indices_to_store.into_iter().map(|piece_indices| {
stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
let downloaded_pieces_count = &downloaded_pieces_count;
let caches = &caches;

async move {
let mut pieces_stream = match piece_getter.get_pieces(piece_indices).await {
let mut permit = downloading_semaphore
.acquire_many(SYNC_BATCH_SIZE as u32)
.await
.expect("Semaphore is never closed; qed");
debug!(%batch, num_pieces = %piece_indices.len(), "Downloading pieces");

let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
Ok(pieces_stream) => pieces_stream,
Err(error) => {
error!(
Expand All @@ -550,8 +564,11 @@ where
return;
}
};
let mut pieces_stream = pieces_stream.enumerate();

while let Some((index, (piece_index, result))) = pieces_stream.next().await {
debug!(%batch, %index, %piece_index, "Downloaded piece");

while let Some((piece_index, result)) = pieces_stream.next().await {
let piece = match result {
Ok(Some(piece)) => {
trace!(%piece_index, "Downloaded piece successfully");
Expand All @@ -570,6 +587,8 @@ where
continue;
}
};
// Release slot for future batches
permit.split(1);

let (offset, maybe_backend) = {
let mut caches = caches.lock();
Expand Down Expand Up @@ -629,10 +648,13 @@ where
}
}
}));

// Download several batches concurrently to make sure slow tail of one is compensated by
// another
downloading_pieces_stream
.buffer_unordered(SYNC_CONCURRENT_BATCHES)
// This allows to schedule new batch while previous batches partially completed, but
// avoids excessive memory usage like when all futures are created upfront
.buffer_unordered(SYNC_CONCURRENT_BATCHES * 2)
// Simply drain everything
.for_each(|()| async {})
.await;
Expand Down
Loading

0 comments on commit 8fe5849

Please sign in to comment.