Skip to content

Commit

Permalink
Merge pull request #3218 from autonomys/segment-processing-fallback
Browse files Browse the repository at this point in the history
Fall back to getting pieces from DSN when getting from node fails
  • Loading branch information
nazar-pc authored Nov 7, 2024
2 parents c163775 + bc50b83 commit dedf017
Showing 1 changed file with 38 additions and 25 deletions.
63 changes: 38 additions & 25 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ where
}
maybe_segment_header = segment_headers_notifications.next().fuse() => {
if let Some(segment_header) = maybe_segment_header {
self.process_segment_header(segment_header, &mut last_segment_index_internal).await;
self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
} else {
// Keep-up sync only ends with subscription, which lasts for duration of an
// instance
Expand Down Expand Up @@ -644,11 +644,14 @@ where
info!("Finished piece cache synchronization");
}

async fn process_segment_header(
async fn process_segment_header<PG>(
&self,
piece_getter: &PG,
segment_header: SegmentHeader,
last_segment_index_internal: &mut SegmentIndex,
) {
) where
PG: PieceGetter,
{
let segment_index = segment_header.segment_index();
debug!(%segment_index, "Starting to process newly archived segment");

Expand Down Expand Up @@ -678,33 +681,45 @@ where
return None;
}

let maybe_piece = match self.node_client.piece(piece_index).await {
Ok(maybe_piece) => maybe_piece,
let maybe_piece_result =
self.node_client
.piece(piece_index)
.await
.inspect_err(|error| {
debug!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving"
);
});

if let Ok(Some(piece)) = maybe_piece_result {
return Some((piece_index, piece));
}

match piece_getter.get_piece(piece_index).await {
Ok(Some(piece)) => Some((piece_index, piece)),
Ok(None) => {
warn!(
%segment_index,
%piece_index,
"Failed to retrieve piece right after archiving"
);

None
}
Err(error) => {
error!(
warn!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, \
this should never happen and is an implementation bug"
"Failed to retrieve piece right after archiving"
);

return None;
None
}
};

let Some(piece) = maybe_piece else {
error!(
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this \
should never happen and is an implementation bug"
);

return None;
};

Some((piece_index, piece))
}
})
.collect::<FuturesUnordered<_>>()
.filter_map(|maybe_piece| async move { maybe_piece })
Expand All @@ -716,8 +731,6 @@ where
self.acknowledge_archived_segment_processing(segment_index)
.await;

// TODO: Would be nice to have concurrency here, but heap is causing a bit of
// difficulties unfortunately
// Go through potentially matching pieces again now that segment was acknowledged and
// try to persist them if necessary
for (piece_index, piece) in pieces_to_maybe_include {
Expand Down

0 comments on commit dedf017

Please sign in to comment.