Skip to content

Commit

Permalink
GrpcStore Write Retry
Browse files Browse the repository at this point in the history
The current implementation of retry in GrpcStore is awkward and only allows
retry up until the first call to the WriteRequestStreamWrapper, however this
has a buffer of the first message in it.  Therefore, with a bit of
refactoring we are able to retry up until the second message is requested by
Tonic without any degredation in performance.  This has the added benefit
of being able to refactor the interface to be a Stream.
  • Loading branch information
chrisstaite-menlo committed Jan 29, 2024
1 parent 3037a66 commit 6f696dc
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 128 deletions.
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,11 @@ impl ByteStreamServer {
// by counting the number of bytes sent from the client. If they send
// less than the amount they said they were going to send and then
// close the stream, we know there's a problem.
Ok(None) => return Err(make_input_err!("Client closed stream before sending all data")),
None => return Err(make_input_err!("Client closed stream before sending all data")),
// Code path for client stream error. Probably client disconnect.
Err(err) => return Err(err),
Some(Err(err)) => return Err(err),
// Code path for received chunk of data.
Ok(Some(write_request)) => write_request,
Some(Ok(write_request)) => write_request,
};
if write_request.write_offset as u64 != tx.get_bytes_written() {
return Err(make_input_err!(
Expand Down
2 changes: 1 addition & 1 deletion nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub mod write_tests {
// Now disconnect our stream.
drop(tx);
let (result, _bs_server) = join_handle.await?;
assert!(result.is_ok(), "Expected success to be returned");
result?;
}
{
// Check to make sure our store recorded the data properly.
Expand Down
Loading

0 comments on commit 6f696dc

Please sign in to comment.