-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GrpcStore Write Retry #638
GrpcStore Write Retry #638
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+@blakehatch Could you take a first pass?
Reviewed 2 of 4 files at r1, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
Sure! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: 0 of 1 LGTMs obtained
nativelink-util/src/write_request_stream_wrapper.rs
line 127 at r1 (raw file):
maybe_message.err_tip(|| format!("Stream error at byte {}", self.bytes_received)) } Poll::Ready(None) => Err(make_input_err!("Expected WriteRequest struct in stream")),
nit: `match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(maybe_message) => maybe_message?
.err_tip(|| format!("Stream error at byte {}", self.bytes_received)),
}`
Could be made more concise with ? to propogate the error up instead of handling error case separately in the match
Previously, blakehatch (Blake Hatch) wrote…
It can't be because we would need to |
This could be changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 4 files at r1.
Reviewable status: 0 of 1 LGTMs obtained
nativelink-util/src/write_request_stream_wrapper.rs
line 116 at r1 (raw file):
// subsequent message from the wrapped Stream. let maybe_message = if self.message_count == 0 { if let Some(first_msg) = self.first_msg.clone() {
Nit: let maybe_message = if self.message_count == 0 {
self.first_msg.take().ok_or_else(|| make_input_err!("First message was lost in write stream wrapper"))
}
Could possibly be made more concise and a clone of the WriteRequest could be avoided by using take
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-util/src/write_request_stream_wrapper.rs
line 116 at r1 (raw file):
Previously, blakehatch (Blake Hatch) wrote…
Nit: let maybe_message = if self.message_count == 0 {
self.first_msg.take().ok_or_else(|| make_input_err!("First message was lost in write stream wrapper"))
}
Could possibly be made more concise and a clone of the WriteRequest could be avoided by using take
The whole point of this PR is that this is a clone() not a take() to allow a retry. This is not very inefficient since it contains a Bytes which is only a pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 296 at r1 (raw file):
// A wrapper to allow the LocalState to be given to an attempt but // retrieved if the attempt fails. struct StateWrapper<T, E>
nit: Lets hoist this to the top of file, it's too complex (imo) to inline with the function.
nativelink-store/src/grpc_store.rs
line 307 at r1 (raw file):
where T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static, E: Into<Error> + 'static,
super nit: I'm 50% sure you don't need 'static
here.
nativelink-store/src/grpc_store.rs
line 320 at r1 (raw file):
Some(Ok(mut message)) => { // `resource_name` pattern is: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}". if let Some(first_slash_pos) = message.resource_name.find('/') {
nit: Can we instead use ResourceInfo
to do this parsing?
I'm sure the reason you didn't want to use it before was because it cannot re-combine them (which I think we should implement anyway), secondly, You can't mutate easily, which you can probably change all those &'a str
into Cow<'a, str>
instead to get the same effect, but make it easier to mutate, then recombine.
A nice side-effect is that it will preserve things like compressed-blobs
and digest_function
.
nativelink-store/src/grpc_store.rs
line 349 at r1 (raw file):
unfold(local_state, move |local_state| async move { let mut client = local_state.lock().client.clone(); // The client write may occur on a separate thread and
Actually, there's a trick you can use here...
If you want to be super optimized, we could make shared_state
a raw value, and add a mutable reference to the output &mut Option<SharedState>
, then make a custom destructor on StateWrapper
that will move the SharedState to the parent's ref on destruction.
The outer function then just needs to assert it exists.
Technically faster, since it removes both the Arc and the Mutex, but a lot more complicated, so probably not worth the effort.
Plus IIRC, parking_lot::Mutex
is a spinlock then a real lock or something, so in this use case it's only ever going to be atomic operations.
nativelink-store/src/grpc_store.rs
line 366 at r1 (raw file):
// If the stream has been consumed, don't retry, but // otherwise it's ok to try again. let result = if result.is_err() && local_state_locked.read_stream.reset_for_retry() {
nit: Kinda complicated to read, more direct code, like:
match result {
Err(err) => {
if (local_state_locked.read_stream.is_retryable()) {
local_state_locked.read_stream.reset();
log::log!("GRPC message here stating it retried");
RetryResult::Retry(())
} else {
RetryResult::Err(err.err_tip(|| "Retry was not possible in GrpcStore::write"))
}
}
Ok(()) => RetryResult::Ok(()),
}
nativelink-store/src/grpc_store.rs
line 369 at r1 (raw file):
result .err_tip(|| "Retry is possible in GrpcStore::write") .map_or_else(RetryResult::Retry, RetryResult::Ok)
(related to above: This should never be RetryResult::Ok
here.
nativelink-util/src/write_request_stream_wrapper.rs
line 1 at r1 (raw file):
// Copyright 2024 The Native Link Authors. All rights reserved.
nit: 2023-2024
nativelink-util/src/write_request_stream_wrapper.rs
line 81 at r1 (raw file):
} pub fn reset_for_retry(&mut self) -> bool {
nit: Lets break this into a the mutable an immutable parts. I know it's silly, but I will just have better piece of mind someone wont screw this up in the future by having to split up.
ie:
is_retryable()
reset()
nativelink-util/src/write_request_stream_wrapper.rs
line 115 at r1 (raw file):
// Gets the next message, this is either the cached first or a // subsequent message from the wrapped Stream. let maybe_message = if self.message_count == 0 {
nit: I try to use maybe_*
when referencing that it's an optional and *_result
/*_res
when referencing when it's a Result
.
Here is a bit backwards.
nativelink-util/src/write_request_stream_wrapper.rs
line 137 at r1 (raw file):
if self.message_count == 1 { // Upon a successful second message, we discard the first. self.first_msg.take();
nit: for more clarity:
self.first_msg = None;
(I like to use .take()
for ownership transfers and assigns when not taking ownership).
nativelink-util/src/write_request_stream_wrapper.rs
line 140 at r1 (raw file):
} self.write_finished = message.finish_write; self.bytes_received += message.data.len();
I'll trust your judgement on this, but is it going to be an issue with book-keeping this value if a stream is resumed? Specifically around how first_msg
works (since first_msg` stays around until second message is requested).
6d24cf0
to
4c7de24
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 320 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Can we instead use
ResourceInfo
to do this parsing?I'm sure the reason you didn't want to use it before was because it cannot re-combine them (which I think we should implement anyway), secondly, You can't mutate easily, which you can probably change all those
&'a str
intoCow<'a, str>
instead to get the same effect, but make it easier to mutate, then recombine.A nice side-effect is that it will preserve things like
compressed-blobs
anddigest_function
.
Yeah, I just copied the original code here, but there's no reason we can't improve it at the same time I suppose.
nativelink-store/src/grpc_store.rs
line 349 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
Actually, there's a trick you can use here...
If you want to be super optimized, we could make
shared_state
a raw value, and add a mutable reference to the output&mut Option<SharedState>
, then make a custom destructor onStateWrapper
that will move the SharedState to the parent's ref on destruction.The outer function then just needs to assert it exists.
Technically faster, since it removes both the Arc and the Mutex, but a lot more complicated, so probably not worth the effort.
Plus IIRC,
parking_lot::Mutex
is a spinlock then a real lock or something, so in this use case it's only ever going to be atomic operations.
I played with this for ages and it doesn't appear to be possible since the client.write
requires that the structure passed to it is owned for 'static
and therefore we can't pass any instance reference to it. This makes sense since there's no lifetime constraints on the client.write
function call, so it's impossible to determine that the parameter hasn't been moved to another thread somewhere, hence the need for the Mutex
and the lifetime could possibly live longer than the current scope, hence the need for the Arc
. I don't see any other way around this, even with tricks without unsafe
code.
nativelink-store/src/grpc_store.rs
line 369 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
(related to above: This should never be
RetryResult::Ok
here.
Done.
nativelink-util/src/write_request_stream_wrapper.rs
line 115 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: I try to use
maybe_*
when referencing that it's an optional and*_result
/*_res
when referencing when it's aResult
.Here is a bit backwards.
I copied the original naming for the types here, but I'm not at all fixed to them.
nativelink-util/src/write_request_stream_wrapper.rs
line 140 at r1 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
I'll trust your judgement on this, but is it going to be an issue with book-keeping this value if a stream is resumed? Specifically around how
first_msg
works (since first_msg` stays around until second message is requested).
It really shouldn't be an issue so long as the invariant between message_count
and first_msg
is maintained within reset
. This is currently an unspoken invariant, so perhaps I'll document it in the struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r2, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-util/src/resource_info.rs
line 88 at r2 (raw file):
pub digest_function: Option<&'a str>, pub hash: &'a str, size: &'a str,
nit: Any reason not to turn expected_size
back into a string when needed?
nativelink-util/src/resource_info.rs
line 144 at r2 (raw file):
[ Some(self.instance_name), self.is_upload.then_some("uploads"),
nit: Instead of using the ToString
trait, I suggest instead making this an impl with .to_string(is_upload)
. This way it will allow the same object to be converted from a is_upload and then pass it to a download request or something if needed.
nativelink-util/src/resource_info.rs
line 146 at r2 (raw file):
self.is_upload.then_some("uploads"), self.uuid, self.compressed.then_some("compressed-blobs").or(Some("blobs")),
nit: Any reason we can't use self.compressor.map_or_else(|| "blobs", |_| "compressed-blobs")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-util/src/resource_info.rs
line 88 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Any reason not to turn
expected_size
back into a string when needed?
It was really just to avoid the extra effort, but it doesn't matter either way really, it's probably not all that expensive.
nativelink-util/src/resource_info.rs
line 146 at r2 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Any reason we can't use
self.compressor.map_or_else(|| "blobs", |_| "compressed-blobs")
?
I originally did that, but we have a test that doesn't specify the compression format and I didn't check to see if that was standard or not, I should really. I wondered if there was a default algorithm if compressed-blobs was used without an algorithm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-util/src/resource_info.rs
line 146 at r2 (raw file):
Previously, chrisstaite-menlo (Chris Staite) wrote…
I originally did that, but we have a test that doesn't specify the compression format and I didn't check to see if that was standard or not, I should really. I wondered if there was a default algorithm if compressed-blobs was used without an algorithm?
Oh, that test is probably just wrong then. If you specify compressed-blobs
you must specify the compressor
according to spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
4c7de24
to
6f696dc
Compare
6f696dc
to
a4259c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 4 files at r3, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 135 at r3 (raw file):
Ok(mut resource_name) => { resource_name.instance_name = &local_state.instance_name; message.resource_name = resource_name.to_string(true);
super nit: .to_string(true /* is_upload */);
a4259c5
to
b57d59c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@blakehatch just waiting on your final sign-off here please
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r5, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: macos-13 (waiting on @blakehatch)
a4ad2c8
to
6c64a74
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have had another thought and completely changed this again. I've basically reverted most changes in WriteRequestStreamWrapper
and instead always buffer the last message in GrpcStore::write
since ByteStreamServer::write
can resume.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Analyze (javascript-typescript), Bazel Dev / ubuntu-22.04, Vercel, asan / ubuntu-22.04, docker-compose-compiles-nativelink (20.04), pre-commit-checks, publish-image, ubuntu-20.04 / stable, ubuntu-22.04 (waiting on @blakehatch)
6c64a74
to
8d2bd64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r6, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 105 at r6 (raw file):
read_stream_error: Option<Error>, read_stream: WriteRequestStreamWrapper<T, E>, // Tonic doesn't appear to report an error until it has taken two messages,
curiously, is this some tonic buffer size config?
This does make sense though, since it'd fill one buffer up to completion, ship it off, but the previous message is still being processed by the receiver, then tonic gets an error on the 3rd message, so from the down stream component it look this.
nativelink-store/src/grpc_store.rs
line 112 at r6 (raw file):
// message_before_last. Will decrement and play back last_message next and // will then decrement to 0 and start streaming again. resume_count: u32,
Instead, can we do something like:
previous_messages: [Option<WriteRequest>, Option<WriteRequest>],
is_replaying: bool,
Then below something like:
if self.is_replaying {
if self.previous_messages[0].is_some() {
std::mem::swap(&mut self.previous_messages[0], &mut self.previous_messages[1]);
return Poll::Ready(self.previous_messages[1].take());
} else {
self.is_replaying = false;
}
}
It still is isn't pretty (the elegant way is to use a linked list, but that requires a heal allocation), but at least the book-keeping is much simpler and less "magic".
nativelink-store/src/grpc_store.rs
line 189 at r6 (raw file):
// Cache the last request in case there is an error to allow // the upload to be resumed. local_state.message_before_last = local_state.last_message.take();
nit: Very petty, but does save a couple machine instructions, since it doesn't need to zero out last_message
:
std::mem::swap(&mut local_state.message_before_last, &mut local_state.last_message);
nativelink-util/src/write_request_stream_wrapper.rs
line 114 at r6 (raw file):
maybe_message.err_tip(|| format!("Stream error at byte {}", self.bytes_received)) } Poll::Ready(None) => Err(make_input_err!("Expected WriteRequest struct in stream")),
more just general question... I wounder if this is actually a problem?
Like, if we don't read until the None
in the stream I wounder if we can end up with it holding on to data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 105 at r6 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
curiously, is this some tonic buffer size config?
This does make sense though, since it'd fill one buffer up to completion, ship it off, but the previous message is still being processed by the receiver, then tonic gets an error on the 3rd message, so from the down stream component it look this.
Not sure, haven't really looked much into how Tonic works.
nativelink-store/src/grpc_store.rs
line 112 at r6 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
Instead, can we do something like:
previous_messages: [Option<WriteRequest>, Option<WriteRequest>], is_replaying: bool,Then below something like:
if self.is_replaying { if self.previous_messages[0].is_some() { std::mem::swap(&mut self.previous_messages[0], &mut self.previous_messages[1]); return Poll::Ready(self.previous_messages[1].take()); } else { self.is_replaying = false; } }It still is isn't pretty (the elegant way is to use a linked list, but that requires a heal allocation), but at least the book-keeping is much simpler and less "magic".
That doesn't allow it to retry more than once, which if the connection is down it might take a couple of attempts to reconnect.
nativelink-util/src/write_request_stream_wrapper.rs
line 114 at r6 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
more just general question... I wounder if this is actually a problem?
Like, if we don't read until the
None
in the stream I wounder if we can end up with it holding on to data?
It should be dropped though, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 189 at r6 (raw file):
Previously, allada (Nathan (Blaise) Bruer) wrote…
nit: Very petty, but does save a couple machine instructions, since it doesn't need to zero out
last_message
:std::mem::swap(&mut local_state.message_before_last, &mut local_state.last_message);
Cannot borrow local_state
as mutable twice... I'm sure you must be able to do this though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 189 at r6 (raw file):
Previously, chrisstaite-menlo (Chris Staite) wrote…
Cannot borrow
local_state
as mutable twice... I'm sure you must be able to do this though?
Helper method in the struct impl... that's a bit ugly, but I suppose it's actually better encapsulation.
8d2bd64
to
4ff5743
Compare
4ff5743
to
f3dea0b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: Vercel, asan / ubuntu-22.04, integration-tests (20.04), pre-commit-checks (waiting on @blakehatch)
nativelink-store/src/grpc_store.rs
line 112 at r6 (raw file):
Previously, chrisstaite-menlo (Chris Staite) wrote…
That doesn't allow it to retry more than once, which if the connection is down it might take a couple of attempts to reconnect.
I've done the clone()
upfront when we call resume()
and that's sorted it.
f3dea0b
to
85d9891
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r7, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: macos-13 (waiting on @blakehatch)
nativelink-util/src/write_request_stream_wrapper.rs
line 114 at r6 (raw file):
Previously, chrisstaite-menlo (Chris Staite) wrote…
It should be dropped though, right?
It's probably not a big deal, more of just a question of if we should assume streams are read until end on the happy path.
No action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the delay, some stuff came up that I had to deal with.
Reviewable status: 0 of 1 LGTMs obtained, and pending CI: macos-13 (waiting on @blakehatch)
The current implementation of retry in GrpcStore is awkward and only allows retry up until the first call to the WriteRequestStreamWrapper. Since a ByteStreamServer::write is resumable, we should always cache the last WriteRequest and then use that if we retry. This allows there to be a resumable failure at any point in the Stream. This refactors the WriteRequestStreamWrapper into a Stream and then uses that to have a buffering stream in the WriteState for GrpcStore.
85d9891
to
2a3b406
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r8, all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@blakehatch , waiting on your stamp.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: 0 of 1 LGTMs obtained (waiting on @blakehatch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all commit messages.
Reviewable status: complete! 1 of 1 LGTMs obtained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 1 LGTMs obtained
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 1 LGTMs obtained
Description
The current implementation of retry in GrpcStore is awkward and only allows retry up until the first call to the WriteRequestStreamWrapper, however this has a buffer of the first message in it. Therefore, with a bit of refactoring we are able to retry up until the second message is requested by Tonic without any degredation in performance. This has the added benefit of being able to refactor the interface to be a Stream.
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Using proxy in my cluster.
Checklist
bazel test //...
passes locallygit amend
see some docsThis change is