Skip to content

Commit

Permalink
Trace grpc metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-singer committed Nov 15, 2024
1 parent b6cf659 commit 16f9b27
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 143 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions nativelink-config/examples/basic_cas.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
{
"stores": {
"BEP_STORE": {
"redis_store": {
"addresses": [
"redis://127.0.0.1:6379/1"
],
"experimental_pub_sub_channel": "BEP",
"mode": "standard"
}
},
"AC_MAIN_STORE": {
"filesystem": {
"content_path": "/tmp/nativelink/data-worker-test/content_path-ac",
Expand Down Expand Up @@ -156,6 +165,9 @@
},
"admin": {},
"health": {},
"experimental_bep": {
"store": "BEP_STORE"
},
}
}],
"global": {
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tonic = { version = "0.12.3", features = ["transport", "tls"], default-features
tower = { version = "0.5.1", default-features = false }
tracing = { version = "0.1.40", default-features = false }
uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] }
base64 = "0.21"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
75 changes: 52 additions & 23 deletions nativelink-service/src/ac_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use nativelink_util::store_trait::{Store, StoreLike};
use prost::Message;
use tonic::{Request, Response, Status};
use tracing::{error_span, event, instrument, Level};
use tokio::sync::mpsc::UnboundedSender;
use nativelink_util::request_metadata_tracer::MetadataEvent;

#[derive(Clone)]
pub struct AcStoreInfo {
Expand All @@ -43,6 +45,7 @@ pub struct AcStoreInfo {

pub struct AcServer {
stores: HashMap<String, AcStoreInfo>,
metadata_tx: Option<UnboundedSender<MetadataEvent>>
}

impl Debug for AcServer {
Expand All @@ -55,6 +58,7 @@ impl AcServer {
pub fn new(
config: &HashMap<InstanceName, AcStoreConfig>,
store_manager: &StoreManager,
metadata_tx: UnboundedSender<MetadataEvent>
) -> Result<Self, Error> {
let mut stores = HashMap::with_capacity(config.len());
for (instance_name, ac_cfg) in config {
Expand All @@ -71,6 +75,7 @@ impl AcServer {
}
Ok(AcServer {
stores: stores.clone(),
metadata_tx: Some(metadata_tx.clone())
})
}

Expand Down Expand Up @@ -170,20 +175,32 @@ impl ActionCache for AcServer {
&self,
grpc_request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
let request = grpc_request.into_inner();
let resp = make_ctx_for_hash_func(request.digest_function)
.err_tip(|| "In AcServer::get_action_result")?
.wrap_async(
error_span!("ac_server_get_action_result"),
self.inner_get_action_result(request),
)
.await;

// let resp = self.inner_get_action_result(grpc_request).await;
if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound {
event!(Level::ERROR, return = ?resp);
}
return resp.map_err(Into::into);
let inner_get_action_result = |grpc_request: Request<GetActionResultRequest>| async {
let request = grpc_request.into_inner();
let resp = make_ctx_for_hash_func(request.digest_function)
.err_tip(|| "In AcServer::get_action_result")?
.wrap_async(
error_span!("ac_server_get_action_result"),
self.inner_get_action_result(request),
)
.await;

// let resp = self.inner_get_action_result(grpc_request).await;
if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound {
event!(Level::ERROR, return = ?resp);
}
return resp.map_err(Into::into);
};

// DANGER DANGER WILL ROBINSON
// An option was used to avoid the Default derive macro
let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone();
wrap_with_metadata_tracing!(
"get_action_result",
inner_get_action_result,
grpc_request,
metadata_tx
)
}

#[allow(clippy::blocks_in_conditions)]
Expand All @@ -198,14 +215,26 @@ impl ActionCache for AcServer {
&self,
grpc_request: Request<UpdateActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
let request = grpc_request.into_inner();
make_ctx_for_hash_func(request.digest_function)
.err_tip(|| "In AcServer::update_action_result")?
.wrap_async(
error_span!("ac_server_update_action_result"),
self.inner_update_action_result(request),
)
.await
.map_err(Into::into)
let inner_update_action_result = |grpc_request: Request<UpdateActionResultRequest>| async {
let request = grpc_request.into_inner();
make_ctx_for_hash_func(request.digest_function)
.err_tip(|| "In AcServer::update_action_result")?
.wrap_async(
error_span!("ac_server_update_action_result"),
self.inner_update_action_result(request),
)
.await
.map_err(Into::into)
};

// DANGER DANGER WILL ROBINSON
// An option was used to avoid the Default derive macro
let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone();
wrap_with_metadata_tracing!(
"update_action_result",
inner_update_action_result,
grpc_request,
metadata_tx
)
}
}
69 changes: 68 additions & 1 deletion nativelink-service/src/bep_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;

use bytes::BytesMut;
use futures::stream::unfold;
Expand All @@ -30,7 +31,11 @@ use nativelink_store::store_manager::StoreManager;
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
use prost::Message;
use tonic::{Request, Response, Result, Status, Streaming};
use tracing::{instrument, Level};
use tracing::{instrument, Level, event};
use nativelink_util::background_spawn;
use tokio::sync::Mutex;
use tokio::sync::mpsc::UnboundedReceiver;
use nativelink_util::request_metadata_tracer::MetadataEvent;

pub struct BepServer {
store: Store,
Expand All @@ -40,10 +45,72 @@ impl BepServer {
pub fn new(
config: &nativelink_config::cas_server::BepConfig,
store_manager: &StoreManager,
metadata_rx: Arc<Mutex<UnboundedReceiver<MetadataEvent>>>
) -> Result<Self, Error> {
let store = store_manager
.get_store(&config.store)
.err_tip(|| format!("Expected store {} to exist in store manager", &config.store))?;
// Cloned store for pushing metadata to redis.
let metadata_store = store.clone();

background_spawn!("bep_metadata_event_flusher", async move {
let mut metadata_rx = metadata_rx.lock().await;
loop {
event!(Level::TRACE, "bep_metadata_event_flusher loop");
tokio::select! {
metadata_event = metadata_rx.recv() => {
event!(Level::DEBUG, ?metadata_event, "metadata_event received");
if let Some(metadata_event) = metadata_event {
let metadata = metadata_event.request_metadata_tracer.decode();
match metadata {
Ok(metadata) => {
let mut buf = BytesMut::new();
let name = &metadata_event.name;
let tool_invocation_id = &metadata.tool_invocation_id;
let action_id = &metadata.action_id;
let correlated_invocations_id = &metadata.correlated_invocations_id;
let action_mnemonic = &metadata.action_mnemonic;
let target_id = &metadata.target_id;
let configuration_id = &metadata.configuration_id;
// Ex: RequestMetadata:3d00ea40-ba20-4897-b5e5-8c0d072e17a9:816e5e02-0e01-4167-8748-a84925c0abb9:capabilities:get_capabilities:::
// Note: This key encoding doesn't work because actions can also contain ":"
// Ex: RequestMetadata:1e76602f-167f-4576-935a-89d9979754f1:ac0c5b4c-0fdf-4cff-bc38-9c315c053c5a:c731e519b64e1c2087870819d8d6e6cc31284ffa8ceb6709e505651b94b82d7d:get_action_result://main:hello-world:e397b2d20257449e54bdd8501d86a350a746c7c6384f524486f141651d7993c8:CppLink
let store_key = StoreKey::Str(Cow::Owned(format!(
"RequestMetadata:{}:{}:{}:{}:{}:{}:{}",
tool_invocation_id,
correlated_invocations_id,
action_id,
name,
target_id,
configuration_id,
action_mnemonic
)));
let encoding_result = metadata.encode(&mut buf);

match encoding_result {
Ok(_) => {
let store_result = metadata_store.update_oneshot(
store_key.clone(),
buf.freeze()
).await;

match store_result {
Ok(_) => event!(Level::DEBUG, ?store_key, "Successfully stored result"),
Err(err) => event!(Level::ERROR, ?err, ?store_key, "Failed to store result")
}
},
Err(err) => event!(Level::ERROR, ?err, ?metadata, "Failed to encode RequestMetadata buffer")
}
},
Err(err) => event!(Level::ERROR, ?err, ?metadata_event, "Failed to deserialize metadata")
}
} else {
event!(Level::DEBUG, "metadata_event is empty");
}
}
}
}
});

Ok(Self { store })
}
Expand Down
Loading

0 comments on commit 16f9b27

Please sign in to comment.