Skip to content

Commit

Permalink
Add API so stores can get Arc<Store> or &Store
Browse files Browse the repository at this point in the history
A little bit of cleanup to remove the need have an Arc<Store> when
a user wants to up/down-cast to another store. Users can now do this
by reference if they don't have an Arc<Store>.
  • Loading branch information
allada committed Feb 23, 2024
1 parent f4d9db3 commit 2fd26ed
Show file tree
Hide file tree
Showing 24 changed files with 231 additions and 106 deletions.
13 changes: 7 additions & 6 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ pub struct CacheLookupScheduler {
}

async fn get_action_from_store(
ac_store: &Arc<dyn Store>,
ac_store: Pin<&dyn Store>,
action_digest: DigestInfo,
instance_name: String,
) -> Option<ProtoActionResult> {
// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = ac_store.clone().inner_store(Some(action_digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = ac_store.inner_store(Some(action_digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let action_result_request = GetActionResultRequest {
instance_name,
action_digest: Some(action_digest.into()),
Expand All @@ -78,7 +77,7 @@ async fn get_action_from_store(
.map(|response| response.into_inner())
.ok()
} else {
get_and_decode_digest::<ProtoActionResult>(Pin::new(ac_store.as_ref()), &action_digest)
get_and_decode_digest::<ProtoActionResult>(ac_store, &action_digest)
.await
.ok()
}
Expand Down Expand Up @@ -185,7 +184,9 @@ impl ActionScheduler for CacheLookupScheduler {
// Perform cache check.
let action_digest = current_state.action_digest();
let instance_name = action_info.instance_name().clone();
if let Some(action_result) = get_action_from_store(&ac_store, *action_digest, instance_name).await {
if let Some(action_result) =
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name).await
{
if validate_outputs_exist(&cas_store, &action_result).await {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(action_result);
Expand Down
11 changes: 4 additions & 7 deletions nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_info.store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store_info.store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store.get_action_result(Request::new(get_action_request)).await;
}

Expand Down Expand Up @@ -125,10 +124,8 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_info.store.clone().inner_store(Some(digest)).as_any();

let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store_info.store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store
.update_action_result(Request::new(update_action_request))
.await;
Expand Down
15 changes: 6 additions & 9 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,8 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}
Expand Down Expand Up @@ -369,9 +368,8 @@ impl ByteStreamServer {
.err_tip(|| "Invalid digest input in ByteStream::write")?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store.write(stream).await;
}

Expand Down Expand Up @@ -497,9 +495,8 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_clone.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store_clone.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store.query_write_status(Request::new(query_request.clone())).await;
}

Expand Down
16 changes: 6 additions & 10 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store.batch_update_blobs(Request::new(inner_request)).await;
}

Expand Down Expand Up @@ -162,9 +161,8 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
return grpc_store.batch_read_blobs(Request::new(inner_request)).await;
}

Expand Down Expand Up @@ -221,10 +219,8 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();

if let Some(grpc_store) = maybe_grpc_store {
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let stream = grpc_store.get_tree(Request::new(inner_request)).await?.into_inner();
return Ok(Response::new(Box::pin(stream)));
}
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,20 @@ impl Store for CompletenessCheckingStore {
ac_store.get_part_ref(digest, writer, offset, length).await
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,20 @@ impl Store for CompressionStore {
Ok(())
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,20 @@ impl Store for DedupStore {
Ok(())
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/existence_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,20 @@ impl Store for ExistenceCacheStore {
result
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,20 @@ impl Store for FastSlowStore {
}
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,12 +778,20 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
Ok(())
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,12 +837,20 @@ impl Store for GrpcStore {
.await
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,20 @@ impl Store for MemoryStore {
Ok(())
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 11 additions & 3 deletions nativelink-store/src/noop_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,20 @@ impl Store for NoopStore {
Err(make_err!(Code::NotFound, "Not found in noop store"))
}

fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
self
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

Expand Down
22 changes: 18 additions & 4 deletions nativelink-store/src/ref_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,32 @@ impl Store for RefStore {
.await
}

fn inner_store(self: Arc<Self>, digest: Option<DigestInfo>) -> Arc<dyn Store> {
fn inner_store(&self, digest: Option<DigestInfo>) -> &'_ dyn Store {
match self.get_store() {
Ok(store) => store.clone().inner_store(digest),
Ok(store) => store.inner_store(digest),
Err(e) => {
error!("Failed to get store for digest: {e:?}");
self
}
}
}

fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
fn inner_store_arc(self: Arc<Self>, digest: Option<DigestInfo>) -> Arc<dyn Store> {
match self.get_store() {
Ok(store) => store.clone().inner_store_arc(digest),
Err(e) => {
error!("Failed to get store for digest: {e:?}");
self
}
}
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

Expand Down
Loading

0 comments on commit 2fd26ed

Please sign in to comment.