Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch multiple pieces during object reconstruction #3158

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion shared/subspace-data-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ include = [

[dependencies]
anyhow = "1.0.89"
async-lock = "3.4.0"
async-trait = "0.1.83"
futures = "0.3.31"
parity-scale-codec = { version = "3.6.12", features = ["derive"] }
Expand Down
134 changes: 75 additions & 59 deletions shared/subspace-data-retrieval/src/segment_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
//! Fetching segments of the archived history of Subspace Network.

use crate::piece_getter::ObjectPieceGetter;
use async_lock::Semaphore;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{stream, StreamExt};
use subspace_archiving::archiver::Segment;
use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError};
use subspace_core_primitives::pieces::Piece;
Expand Down Expand Up @@ -72,9 +70,6 @@ where
}

/// Concurrently downloads the pieces for `segment_index`.
// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks():
// <https://github.com/autonomys/subspace/blob/d71ca47e45e1b53cd2e472413caa23472a91cd74/crates/subspace-service/src/sync_from_dsn/import_blocks.rs#L236-L322>
//
// TODO: pass a lower concurrency limit into this function, to avoid overwhelming residential routers or slow connections
pub async fn download_segment_pieces<PG>(
segment_index: SegmentIndex,
Expand All @@ -85,66 +80,87 @@ where
{
debug!(%segment_index, "Retrieving pieces of the segment");

let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS);

let mut received_segment_pieces = segment_index
.segment_piece_indexes_source_first()
.into_iter()
.map(|piece_index| {
// Source pieces will acquire permit here right away
let maybe_permit = semaphore.try_acquire();

async move {
let permit = match maybe_permit {
Some(permit) => permit,
None => {
// Other pieces will acquire permit here instead
semaphore.acquire().await
}
};
let piece = match piece_getter.get_piece(piece_index).await {
Ok(Some(piece)) => piece,
Ok(None) => {
trace!(?piece_index, "Piece not found");
return None;
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request failed",
);
return None;
}
};

trace!(?piece_index, "Piece request succeeded");

// Piece was received successfully, "remove" this slot from semaphore
permit.forget();
Some((piece_index, piece))
}
})
.collect::<FuturesUnordered<_>>();

// We want NUM_RAW_RECORDS pieces to reconstruct the segment, but it doesn't matter exactly which ones.
let piece_indexes = segment_index.segment_piece_indexes_source_first();
let mut piece_indexes = piece_indexes.as_slice();
let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];

let mut pieces_pending = 0;
let mut pieces_received = 0;
let mut piece_streams = Vec::new();
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

// Loop Invariant:
// - the number of remaining piece indexes gets smaller, eventually finishing the fetcher, or
// - the number of pending pieces gets smaller, eventually triggering another batch.
// We also exit early if we have enough pieces to reconstruct a segment.
'fetcher: while !piece_indexes.is_empty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two stages:

  • first try to download all 128 source pieces, if successful reconstruction will be very cheap and fast
  • as a fallback when actual reconstruction is needed, schedule more pieces to download, including parity

Right now it is implemented in a way that is a bit wasteful in terms of bandwidth (triggers more downloads than needed) and in terms of CPU usage (has overwhelmingly high chance of not getting 128 source pieces for cheap segment reconstruction).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already what the code does?

The first batch contains 128 source piece indexes. If all pieces in that batch succeed, then there aren’t any parity piece requests.

But as soon as any pieces fail, a batch of parity piece indexes is created, which contains exactly the number of pieces needed to compensate for those failures. Then all batches are polled concurrently.

The code assumes that any pieces that are still pending will succeed, so there’s no wasted downloads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the way it is written wasn't as clear, but now I understand what it does. If you exhaust the stream of pieces you've generated every time, why do you keep already finished streams in piece_streams (the question above)? I don't see how multiple streams can be pooled concurrently here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the way it is written wasn't as clear, but now I understand what it does.

Good feedback, I might split it into multiple methods so it's clearer.

flatten_unordered() polls all the streams in the vector concurrently, and can return the pieces from any stream.

ready_chunks() waits until at least one piece result is ready, then returns all the ready pieces as a vector. But if any pieces are still pending, they are left in the stream.

Then if any of the piece results in that vector are None, we add a batch of parity pieces to replace them.

We can definitely drop streams once they're done, I'll make some notes about how to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually already wrote an efficient piece retrieval for reconstruction purposes in https://github.com/autonomys/subspace/blob/362c1f5dce076b6c13452977a533f9091d515bb1/crates/subspace-farmer-components/src/segment_reconstruction.rs

It might be a little simpler and it does try to download pieces in batches all the time, avoiding batches of 1 that you will most likely get majority of time after initial request due to the way ready_chunks works. It is overall less eager and tries to not do a lot of heavy lifting.

Can be extracted into a utility somewhere to return a segment worth of pieces and then reused in farmer code, here and in DSN sync on the node, where exactly the same logic will be necessary for piece retrieval, just what we do with those pieces is slightly different.

&& pieces_received < RecordedHistorySegment::NUM_RAW_RECORDS
{
// Request the number of pieces needed to reconstruct the segment, assuming all pending
// pieces are successful.
let batch_size = RecordedHistorySegment::NUM_RAW_RECORDS - pieces_received - pieces_pending;
if batch_size > 0 {
let (batch_indexes, remaining_piece_indexes) = piece_indexes
.split_at_checked(batch_size)
.unwrap_or((piece_indexes, &[]));
// Invariant: the number of remaining piece indexes gets smaller.
piece_indexes = remaining_piece_indexes;

let stream = piece_getter.get_pieces(batch_indexes.iter().cloned()).await;
match stream {
Ok(stream) => {
piece_streams.push(stream);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
pieces_pending += batch_size;
}
Err(error) => {
// A single batch failure isn't fatal.
debug!(
?error,
%segment_index,
batch_size,
pieces_pending,
pieces_received,
pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS,
"Segment piece getter batch failed"
);
}
}
}

while let Some(maybe_piece) = received_segment_pieces.next().await {
let Some((piece_index, piece)) = maybe_piece else {
continue;
// Poll all the batches concurrently, getting all finished pieces.
let piece_responses = stream::iter(&mut piece_streams)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.flatten_unordered(None)
.ready_chunks(RecordedHistorySegment::NUM_RAW_RECORDS)
.next()
.await;

let Some(piece_responses) = piece_responses else {
// All streams have finished, perhaps abnormally, so reset the number of pending pieces.
// Invariant: the number of pending pieces gets smaller.
pieces_pending = 0;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
continue 'fetcher;
};

segment_pieces
.get_mut(piece_index.position() as usize)
.expect("Piece position is by definition within segment; qed")
.replace(piece);
// Process the piece responses.
'processor: for maybe_piece in piece_responses {
// Invariant: the number of pending pieces gets smaller.
pieces_pending -= 1;

let (piece_index, Ok(Some(piece))) = maybe_piece else {
continue 'processor;
};

pieces_received += 1;
segment_pieces
.get_mut(piece_index.position() as usize)
.expect("Piece position is by definition within segment; qed")
.replace(piece);

if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS {
trace!(%segment_index, "Received half of the segment.");
break;
pieces_received += 1;

if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS {
trace!(%segment_index, "Received half of the segment.");
break 'fetcher;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we consider a case with insufficient pieces present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there aren't enough pieces, we go to the top of the 'fetcher: loop again, and try to get more pieces from the segment.

If we've tried all the pieces in the segment, and can't get enough to reconstruct the segment, we end the 'fetcher: loop, and return an error.

That error isn't visible in this diff, it's on line 159/175, about 10 lines below this line.

}
}

Expand Down
Loading