Skip to content

Commit

Permalink
Revert "Add API so stores can get Arc<Store> or &Store (TraceMachina#679
Browse files Browse the repository at this point in the history
)"

This reverts commit 5df8a78.

This appears to increase build size unnaturally.
  • Loading branch information
aaronmondal committed Mar 2, 2024
1 parent 6491fc7 commit 35afce5
Show file tree
Hide file tree
Showing 23 changed files with 108 additions and 230 deletions.
13 changes: 6 additions & 7 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ pub struct CacheLookupScheduler {
}

async fn get_action_from_store(
ac_store: Pin<&dyn Store>,
ac_store: &Arc<dyn Store>,
action_digest: DigestInfo,
instance_name: String,
) -> Option<ProtoActionResult> {
// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = ac_store.inner_store(Some(action_digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = ac_store.clone().inner_store(Some(action_digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let action_result_request = GetActionResultRequest {
instance_name,
action_digest: Some(action_digest.into()),
Expand All @@ -77,7 +78,7 @@ async fn get_action_from_store(
.map(|response| response.into_inner())
.ok()
} else {
get_and_decode_digest::<ProtoActionResult>(ac_store, &action_digest)
get_and_decode_digest::<ProtoActionResult>(Pin::new(ac_store.as_ref()), &action_digest)
.await
.ok()
}
Expand Down Expand Up @@ -184,9 +185,7 @@ impl ActionScheduler for CacheLookupScheduler {
// Perform cache check.
let action_digest = current_state.action_digest();
let instance_name = action_info.instance_name().clone();
if let Some(action_result) =
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name).await
{
if let Some(action_result) = get_action_from_store(&ac_store, *action_digest, instance_name).await {
if validate_outputs_exist(&cas_store, &action_result).await {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(action_result);
Expand Down
11 changes: 7 additions & 4 deletions nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_info.store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store_info.store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store.get_action_result(Request::new(get_action_request)).await;
}

Expand Down Expand Up @@ -124,8 +125,10 @@ impl AcServer {
.try_into()?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_info.store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store_info.store.clone().inner_store(Some(digest)).as_any();

let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store
.update_action_result(Request::new(update_action_request))
.await;
Expand Down
15 changes: 9 additions & 6 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}
Expand Down Expand Up @@ -368,8 +369,9 @@ impl ByteStreamServer {
.err_tip(|| "Invalid digest input in ByteStream::write")?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store.write(stream).await;
}

Expand Down Expand Up @@ -495,8 +497,9 @@ impl ByteStreamServer {
let digest = DigestInfo::try_new(resource_info.hash, resource_info.expected_size)?;

// If we are a GrpcStore we shortcut here, as this is a special store.
let any_store = store_clone.inner_store(Some(digest)).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store_clone.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store.query_write_status(Request::new(query_request.clone())).await;
}

Expand Down
16 changes: 10 additions & 6 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store.batch_update_blobs(Request::new(inner_request)).await;
}

Expand Down Expand Up @@ -161,8 +162,9 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
return grpc_store.batch_read_blobs(Request::new(inner_request)).await;
}

Expand Down Expand Up @@ -219,8 +221,10 @@ impl CasServer {
// If we are a GrpcStore we shortcut here, as this is a special store.
// Note: We don't know the digests here, so we try perform a very shallow
// check to see if it's a grpc store.
let any_store = store.inner_store(None).as_any();
if let Some(grpc_store) = any_store.downcast_ref::<GrpcStore>() {
let any_store = store.clone().inner_store(None).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();

if let Some(grpc_store) = maybe_grpc_store {
let stream = grpc_store.get_tree(Request::new(inner_request)).await?.into_inner();
return Ok(Response::new(Box::pin(stream)));
}
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,20 +354,12 @@ impl Store for CompletenessCheckingStore {
ac_store.get_part_ref(digest, writer, offset, length).await
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}

Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,20 +578,12 @@ impl Store for CompressionStore {
Ok(())
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,12 @@ impl Store for DedupStore {
Ok(())
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}

Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/existence_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,12 @@ impl Store for ExistenceCacheStore {
result
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,20 +265,12 @@ impl Store for FastSlowStore {
}
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,20 +778,12 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
Ok(())
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,20 +837,12 @@ impl Store for GrpcStore {
.await
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}

Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,12 @@ impl Store for MemoryStore {
Ok(())
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
Expand Down
14 changes: 3 additions & 11 deletions nativelink-store/src/noop_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,12 @@ impl Store for NoopStore {
Err(make_err!(Code::NotFound, "Not found in noop store"))
}

fn inner_store(&self, _digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn inner_store_arc(self: Arc<Self>, _digest: Option<DigestInfo>) -> Arc<dyn Store> {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}

Expand Down
22 changes: 4 additions & 18 deletions nativelink-store/src/ref_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,32 +124,18 @@ impl Store for RefStore {
.await
}

fn inner_store(&self, digest: Option<DigestInfo>) -> &'_ dyn Store {
fn inner_store(self: Arc<Self>, digest: Option<DigestInfo>) -> Arc<dyn Store> {
match self.get_store() {
Ok(store) => store.inner_store(digest),
Ok(store) => store.clone().inner_store(digest),
Err(e) => {
error!("Failed to get store for digest: {e:?}");
self
}
}
}

fn inner_store_arc(self: Arc<Self>, digest: Option<DigestInfo>) -> Arc<dyn Store> {
match self.get_store() {
Ok(store) => store.clone().inner_store_arc(digest),
Err(e) => {
error!("Failed to get store for digest: {e:?}");
self
}
}
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}
}

Expand Down
Loading

0 comments on commit 35afce5

Please sign in to comment.