diff --git a/Cargo.lock b/Cargo.lock index c9609016a..d2e29aa9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,7 @@ dependencies = [ "async-lock", "async-trait", "axum", + "base64 0.21.7", "bytes", "futures", "http-body 1.0.1", @@ -1974,6 +1975,7 @@ version = "0.5.3" dependencies = [ "async-lock", "async-trait", + "base64 0.21.7", "bitflags", "blake3", "bytes", diff --git a/nativelink-config/examples/basic_cas.json b/nativelink-config/examples/basic_cas.json index 173951deb..092599943 100644 --- a/nativelink-config/examples/basic_cas.json +++ b/nativelink-config/examples/basic_cas.json @@ -1,5 +1,14 @@ { "stores": { + "BEP_STORE": { + "redis_store": { + "addresses": [ + "redis://127.0.0.1:6379/1" + ], + "experimental_pub_sub_channel": "BEP", + "mode": "standard" + } + }, "AC_MAIN_STORE": { "filesystem": { "content_path": "/tmp/nativelink/data-worker-test/content_path-ac", @@ -156,6 +165,9 @@ }, "admin": {}, "health": {}, + "experimental_bep": { + "store": "BEP_STORE" + }, } }], "global": { diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 92b9e6b51..004ee4d4e 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -25,6 +25,7 @@ tonic = { version = "0.12.3", features = ["transport", "tls"], default-features tower = { version = "0.5.1", default-features = false } tracing = { version = "0.1.40", default-features = false } uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] } +base64 = "0.21" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-service/src/ac_server.rs b/nativelink-service/src/ac_server.rs index cd6be0f8e..9fabecb05 100644 --- a/nativelink-service/src/ac_server.rs +++ b/nativelink-service/src/ac_server.rs @@ -34,6 +34,8 @@ use nativelink_util::store_trait::{Store, StoreLike}; use prost::Message; use tonic::{Request, Response, Status}; use tracing::{error_span, event, instrument, Level}; +use tokio::sync::mpsc::UnboundedSender; +use nativelink_util::request_metadata_tracer::MetadataEvent; #[derive(Clone)] pub struct AcStoreInfo { @@ -43,6 +45,7 @@ pub struct AcStoreInfo { pub struct AcServer { stores: HashMap, + metadata_tx: Option> } impl Debug for AcServer { @@ -55,6 +58,7 @@ impl AcServer { pub fn new( config: &HashMap, store_manager: &StoreManager, + metadata_tx: UnboundedSender ) -> Result { let mut stores = HashMap::with_capacity(config.len()); for (instance_name, ac_cfg) in config { @@ -71,6 +75,7 @@ impl AcServer { } Ok(AcServer { stores: stores.clone(), + metadata_tx: Some(metadata_tx.clone()) }) } @@ -170,20 +175,32 @@ impl ActionCache for AcServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - let resp = make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In AcServer::get_action_result")? - .wrap_async( - error_span!("ac_server_get_action_result"), - self.inner_get_action_result(request), - ) - .await; - - // let resp = self.inner_get_action_result(grpc_request).await; - if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound { - event!(Level::ERROR, return = ?resp); - } - return resp.map_err(Into::into); + let inner_get_action_result = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + let resp = make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In AcServer::get_action_result")? + .wrap_async( + error_span!("ac_server_get_action_result"), + self.inner_get_action_result(request), + ) + .await; + + // let resp = self.inner_get_action_result(grpc_request).await; + if resp.is_err() && resp.as_ref().err().unwrap().code != Code::NotFound { + event!(Level::ERROR, return = ?resp); + } + return resp.map_err(Into::into); + }; + + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "get_action_result", + inner_get_action_result, + grpc_request, + metadata_tx + ) } #[allow(clippy::blocks_in_conditions)] @@ -198,14 +215,26 @@ impl ActionCache for AcServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In AcServer::update_action_result")? - .wrap_async( - error_span!("ac_server_update_action_result"), - self.inner_update_action_result(request), - ) - .await - .map_err(Into::into) + let inner_update_action_result = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In AcServer::update_action_result")? + .wrap_async( + error_span!("ac_server_update_action_result"), + self.inner_update_action_result(request), + ) + .await + .map_err(Into::into) + }; + + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "update_action_result", + inner_update_action_result, + grpc_request, + metadata_tx + ) } } diff --git a/nativelink-service/src/bep_server.rs b/nativelink-service/src/bep_server.rs index 7cfa9172f..c13979fa1 100644 --- a/nativelink-service/src/bep_server.rs +++ b/nativelink-service/src/bep_server.rs @@ -14,6 +14,7 @@ use std::borrow::Cow; use std::pin::Pin; +use std::sync::Arc; use bytes::BytesMut; use futures::stream::unfold; @@ -30,7 +31,11 @@ use nativelink_store::store_manager::StoreManager; use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; use prost::Message; use tonic::{Request, Response, Result, Status, Streaming}; -use tracing::{instrument, Level}; +use tracing::{instrument, Level, event}; +use nativelink_util::background_spawn; +use tokio::sync::Mutex; +use tokio::sync::mpsc::UnboundedReceiver; +use nativelink_util::request_metadata_tracer::MetadataEvent; pub struct BepServer { store: Store, @@ -40,10 +45,72 @@ impl BepServer { pub fn new( config: &nativelink_config::cas_server::BepConfig, store_manager: &StoreManager, + metadata_rx: Arc>> ) -> Result { let store = store_manager .get_store(&config.store) .err_tip(|| format!("Expected store {} to exist in store manager", &config.store))?; + // Cloned store for pushing metadata to redis. + let metadata_store = store.clone(); + + background_spawn!("bep_metadata_event_flusher", async move { + let mut metadata_rx = metadata_rx.lock().await; + loop { + event!(Level::TRACE, "bep_metadata_event_flusher loop"); + tokio::select! { + metadata_event = metadata_rx.recv() => { + event!(Level::DEBUG, ?metadata_event, "metadata_event received"); + if let Some(metadata_event) = metadata_event { + let metadata = metadata_event.request_metadata_tracer.decode(); + match metadata { + Ok(metadata) => { + let mut buf = BytesMut::new(); + let name = &metadata_event.name; + let tool_invocation_id = &metadata.tool_invocation_id; + let action_id = &metadata.action_id; + let correlated_invocations_id = &metadata.correlated_invocations_id; + let action_mnemonic = &metadata.action_mnemonic; + let target_id = &metadata.target_id; + let configuration_id = &metadata.configuration_id; + // Ex: RequestMetadata:3d00ea40-ba20-4897-b5e5-8c0d072e17a9:816e5e02-0e01-4167-8748-a84925c0abb9:capabilities:get_capabilities::: + // Note: This key encoding doesn't work because actions can also contain ":" + // Ex: RequestMetadata:1e76602f-167f-4576-935a-89d9979754f1:ac0c5b4c-0fdf-4cff-bc38-9c315c053c5a:c731e519b64e1c2087870819d8d6e6cc31284ffa8ceb6709e505651b94b82d7d:get_action_result://main:hello-world:e397b2d20257449e54bdd8501d86a350a746c7c6384f524486f141651d7993c8:CppLink + let store_key = StoreKey::Str(Cow::Owned(format!( + "RequestMetadata:{}:{}:{}:{}:{}:{}:{}", + tool_invocation_id, + correlated_invocations_id, + action_id, + name, + target_id, + configuration_id, + action_mnemonic + ))); + let encoding_result = metadata.encode(&mut buf); + + match encoding_result { + Ok(_) => { + let store_result = metadata_store.update_oneshot( + store_key.clone(), + buf.freeze() + ).await; + + match store_result { + Ok(_) => event!(Level::DEBUG, ?store_key, "Successfully stored result"), + Err(err) => event!(Level::ERROR, ?err, ?store_key, "Failed to store result") + } + }, + Err(err) => event!(Level::ERROR, ?err, ?metadata, "Failed to encode RequestMetadata buffer") + } + }, + Err(err) => event!(Level::ERROR, ?err, ?metadata_event, "Failed to deserialize metadata") + } + } else { + event!(Level::DEBUG, "metadata_event is empty"); + } + } + } + } + }); Ok(Self { store }) } diff --git a/nativelink-service/src/capabilities_server.rs b/nativelink-service/src/capabilities_server.rs index 99b14931f..08ffa50a2 100644 --- a/nativelink-service/src/capabilities_server.rs +++ b/nativelink-service/src/capabilities_server.rs @@ -32,18 +32,23 @@ use nativelink_util::digest_hasher::default_digest_hasher_func; use nativelink_util::operation_state_manager::ClientStateManager; use tonic::{Request, Response, Status}; use tracing::{event, instrument, Level}; +use tracing::error_span; +use tokio::sync::mpsc::UnboundedSender; +use nativelink_util::request_metadata_tracer::MetadataEvent; const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; #[derive(Debug, Default)] pub struct CapabilitiesServer { supported_node_properties_for_instance: HashMap>, + metadata_tx: Option> } impl CapabilitiesServer { pub async fn new( config: &HashMap, scheduler_map: &HashMap>, + metadata_tx: UnboundedSender ) -> Result { let mut supported_node_properties_for_instance = HashMap::new(); for (instance_name, cfg) in config { @@ -80,6 +85,7 @@ impl CapabilitiesServer { } Ok(CapabilitiesServer { supported_node_properties_for_instance, + metadata_tx: Some(metadata_tx.clone()) }) } @@ -102,57 +108,69 @@ impl Capabilities for CapabilitiesServer { &self, grpc_request: Request, ) -> Result, Status> { - let instance_name = grpc_request.into_inner().instance_name; - let maybe_supported_node_properties = self - .supported_node_properties_for_instance - .get(&instance_name); - let execution_capabilities = - maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities { - digest_function: default_digest_hasher_func().proto_digest_func().into(), - exec_enabled: true, // TODO(blaise.bruer) Make this configurable. - execution_priority_capabilities: Some(PriorityCapabilities { - priorities: vec![PriorityRange { - min_priority: 0, - max_priority: i32::MAX, - }], - }), - supported_node_properties: props_for_instance.clone(), - digest_functions: vec![ - DigestFunction::Sha256.into(), - DigestFunction::Blake3.into(), - ], - }); + let inner_get_capabilities = |grpc_request: Request| async { + let instance_name = grpc_request.into_inner().instance_name; + let maybe_supported_node_properties = self + .supported_node_properties_for_instance + .get(&instance_name); + let execution_capabilities = + maybe_supported_node_properties.map(|props_for_instance| ExecutionCapabilities { + digest_function: default_digest_hasher_func().proto_digest_func().into(), + exec_enabled: true, // TODO(blaise.bruer) Make this configurable. + execution_priority_capabilities: Some(PriorityCapabilities { + priorities: vec![PriorityRange { + min_priority: 0, + max_priority: i32::MAX, + }], + }), + supported_node_properties: props_for_instance.clone(), + digest_functions: vec![ + DigestFunction::Sha256.into(), + DigestFunction::Blake3.into(), + ], + }); - let resp = ServerCapabilities { - cache_capabilities: Some(CacheCapabilities { - digest_functions: vec![ - DigestFunction::Sha256.into(), - DigestFunction::Blake3.into(), - ], - action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { - update_enabled: true, + let resp = ServerCapabilities { + cache_capabilities: Some(CacheCapabilities { + digest_functions: vec![ + DigestFunction::Sha256.into(), + DigestFunction::Blake3.into(), + ], + action_cache_update_capabilities: Some(ActionCacheUpdateCapabilities { + update_enabled: true, + }), + cache_priority_capabilities: None, + max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE, + symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(), + supported_compressors: vec![], + supported_batch_update_compressors: vec![], + }), + execution_capabilities, + deprecated_api_version: None, + low_api_version: Some(SemVer { + major: 2, + minor: 0, + patch: 0, + prerelease: String::new(), }), - cache_priority_capabilities: None, - max_batch_total_size_bytes: MAX_BATCH_TOTAL_SIZE, - symlink_absolute_path_strategy: SymlinkAbsolutePathStrategy::Disallowed.into(), - supported_compressors: vec![], - supported_batch_update_compressors: vec![], - }), - execution_capabilities, - deprecated_api_version: None, - low_api_version: Some(SemVer { - major: 2, - minor: 0, - patch: 0, - prerelease: String::new(), - }), - high_api_version: Some(SemVer { - major: 2, - minor: 3, - patch: 0, - prerelease: String::new(), - }), + high_api_version: Some(SemVer { + major: 2, + minor: 3, + patch: 0, + prerelease: String::new(), + }), + }; + Ok(Response::new(resp)) }; - Ok(Response::new(resp)) + + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap(); + wrap_with_metadata_tracing!( + "get_capabilities", + inner_get_capabilities, + grpc_request, + metadata_tx + ) } } diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index 1fb6890aa..ae5b58e00 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -38,9 +38,12 @@ use nativelink_util::digest_hasher::make_ctx_for_hash_func; use nativelink_util::store_trait::{Store, StoreLike}; use tonic::{Request, Response, Status}; use tracing::{error_span, event, instrument, Level}; +use tokio::sync::mpsc::UnboundedSender; +use nativelink_util::request_metadata_tracer::MetadataEvent; pub struct CasServer { stores: HashMap, + metadata_tx: Option> } type GetTreeStream = Pin> + Send + 'static>>; @@ -49,6 +52,7 @@ impl CasServer { pub fn new( config: &HashMap, store_manager: &StoreManager, + metadata_tx: UnboundedSender ) -> Result { let mut stores = HashMap::with_capacity(config.len()); for (instance_name, cas_cfg) in config { @@ -57,7 +61,7 @@ impl CasServer { })?; stores.insert(instance_name.to_string(), store); } - Ok(CasServer { stores }) + Ok(CasServer { stores , metadata_tx: Some(metadata_tx.clone()) }) } pub fn into_service(self) -> Server { @@ -315,16 +319,27 @@ impl ContentAddressableStorage for CasServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In CasServer::find_missing_blobs")? - .wrap_async( - error_span!("cas_server_find_missing_blobs"), - self.inner_find_missing_blobs(request), - ) - .await - .err_tip(|| "Failed on find_missing_blobs() command") - .map_err(Into::into) + let inner_find_missing_blobs = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In CasServer::find_missing_blobs")? + .wrap_async( + error_span!("cas_server_find_missing_blobs"), + self.inner_find_missing_blobs(request), + ) + .await + .err_tip(|| "Failed on find_missing_blobs() command") + .map_err(Into::into) + }; + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "find_missing_blobs", + inner_find_missing_blobs, + grpc_request, + metadata_tx + ) } #[allow(clippy::blocks_in_conditions)] @@ -339,16 +354,28 @@ impl ContentAddressableStorage for CasServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In CasServer::batch_update_blobs")? - .wrap_async( - error_span!("cas_server_batch_update_blobs"), - self.inner_batch_update_blobs(request), - ) - .await - .err_tip(|| "Failed on batch_update_blobs() command") - .map_err(Into::into) + let inner_batch_update_blobs = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In CasServer::batch_update_blobs")? + .wrap_async( + error_span!("cas_server_batch_update_blobs"), + self.inner_batch_update_blobs(request), + ) + .await + .err_tip(|| "Failed on batch_update_blobs() command") + .map_err(Into::into) + }; + + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "batch_update_blobs", + inner_batch_update_blobs, + grpc_request, + metadata_tx + ) } #[allow(clippy::blocks_in_conditions)] @@ -363,16 +390,27 @@ impl ContentAddressableStorage for CasServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In CasServer::batch_read_blobs")? - .wrap_async( - error_span!("cas_server_batch_read_blobs"), - self.inner_batch_read_blobs(request), - ) - .await - .err_tip(|| "Failed on batch_read_blobs() command") - .map_err(Into::into) + let inner_batch_read_blobs = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In CasServer::batch_read_blobs")? + .wrap_async( + error_span!("cas_server_batch_read_blobs"), + self.inner_batch_read_blobs(request), + ) + .await + .err_tip(|| "Failed on batch_read_blobs() command") + .map_err(Into::into) + }; + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "batch_read_blobs", + inner_batch_read_blobs, + grpc_request, + metadata_tx + ) } #[allow(clippy::blocks_in_conditions)] @@ -386,19 +424,30 @@ impl ContentAddressableStorage for CasServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - let resp = make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In CasServer::get_tree")? - .wrap_async( - error_span!("cas_server_get_tree"), - self.inner_get_tree(request), - ) - .await - .err_tip(|| "Failed on get_tree() command") - .map_err(Into::into); - if resp.is_ok() { - event!(Level::DEBUG, return = "Ok()"); - } - resp + let inner_get_tree = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + let resp = make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In CasServer::get_tree")? + .wrap_async( + error_span!("cas_server_get_tree"), + self.inner_get_tree(request), + ) + .await + .err_tip(|| "Failed on get_tree() command") + .map_err(Into::into); + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); + } + resp + }; + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "get_tree", + inner_get_tree, + grpc_request, + metadata_tx + ) } } diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index c4df4e788..e0d7fe2ab 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -43,6 +43,8 @@ use nativelink_util::operation_state_manager::{ use nativelink_util::store_trait::Store; use tonic::{Request, Response, Status}; use tracing::{error_span, event, instrument, Level}; +use tokio::sync::mpsc::UnboundedSender; +use nativelink_util::request_metadata_tracer::MetadataEvent; type InstanceInfoName = String; @@ -153,6 +155,7 @@ impl InstanceInfo { pub struct ExecutionServer { instance_infos: HashMap, + metadata_tx: Option> } type ExecuteStream = Pin> + Send + 'static>>; @@ -162,6 +165,7 @@ impl ExecutionServer { config: &HashMap, scheduler_map: &HashMap>, store_manager: &StoreManager, + metadata_tx: UnboundedSender ) -> Result { let mut instance_infos = HashMap::with_capacity(config.len()); for (instance_name, exec_cfg) in config { @@ -188,7 +192,7 @@ impl ExecutionServer { }, ); } - Ok(Self { instance_infos }) + Ok(Self { instance_infos , metadata_tx: Some(metadata_tx.clone()) }) } pub fn into_service(self) -> Server { @@ -334,16 +338,27 @@ impl Execution for ExecutionServer { &self, grpc_request: Request, ) -> Result, Status> { - let request = grpc_request.into_inner(); - make_ctx_for_hash_func(request.digest_function) - .err_tip(|| "In ExecutionServer::execute")? - .wrap_async( - error_span!("execution_server_execute"), - self.inner_execute(request), - ) - .await - .err_tip(|| "Failed on execute() command") - .map_err(Into::into) + let inner_execute = |grpc_request: Request| async { + let request = grpc_request.into_inner(); + make_ctx_for_hash_func(request.digest_function) + .err_tip(|| "In ExecutionServer::execute")? + .wrap_async( + error_span!("execution_server_execute"), + self.inner_execute(request), + ) + .await + .err_tip(|| "Failed on execute() command") + .map_err(Into::into) + }; + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "execute", + inner_execute, + grpc_request, + metadata_tx + ) } #[allow(clippy::blocks_in_conditions)] @@ -357,15 +372,26 @@ impl Execution for ExecutionServer { &self, grpc_request: Request, ) -> Result, Status> { - let resp = self - .inner_wait_execution(grpc_request) - .await - .err_tip(|| "Failed on wait_execution() command") - .map_err(Into::into); - - if resp.is_ok() { - event!(Level::DEBUG, return = "Ok()"); - } - resp + let inner_wait_execution = |grpc_request: Request| async { + let resp = self + .inner_wait_execution(grpc_request) + .await + .err_tip(|| "Failed on wait_execution() command") + .map_err(Into::into); + + if resp.is_ok() { + event!(Level::DEBUG, return = "Ok()"); + } + resp + }; + // DANGER DANGER WILL ROBINSON + // An option was used to avoid the Default derive macro + let metadata_tx = &self.metadata_tx.as_ref().unwrap().clone(); + wrap_with_metadata_tracing!( + "wait_execution", + inner_wait_execution, + grpc_request, + metadata_tx + ) } } diff --git a/nativelink-service/src/lib.rs b/nativelink-service/src/lib.rs index 534b55072..e33f72005 100644 --- a/nativelink-service/src/lib.rs +++ b/nativelink-service/src/lib.rs @@ -12,6 +12,33 @@ // See the License for the specific language governing permissions and // limitations under the License. +macro_rules! wrap_with_metadata_tracing { + ($span_name:expr, $inner_fn:expr, $grpc_request:expr, $metadata_tx:expr) => {{ + use futures::future::FutureExt; + use nativelink_util::request_metadata_tracer; + match request_metadata_tracer::extract_request_metadata_bin(&$grpc_request) { + Some(request_metadata_tracer) => { + let context = request_metadata_tracer::make_ctx_request_metadata_tracer( + &request_metadata_tracer.metadata, + $metadata_tx, + ) + .err_tip(|| "Unable to parse request metadata")?; + + context + .wrap_async( + error_span!($span_name), + ($inner_fn)($grpc_request).inspect(|_| { + request_metadata_tracer::emit_metadata_event(String::from($span_name)); + }), + ) + .await + .map_err(Into::into) + } + _ => ($inner_fn)($grpc_request).await, + } + }}; +} + pub mod ac_server; pub mod bep_server; pub mod bytestream_server; diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index ac17063f1..fbe3dc3a4 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -28,6 +28,7 @@ rust_library( "src/origin_context.rs", "src/platform_properties.rs", "src/proto_stream_utils.rs", + "src/request_metadata_tracer.rs", "src/resource_info.rs", "src/retry.rs", "src/store_trait.rs", @@ -45,6 +46,7 @@ rust_library( "//nativelink-metric", "//nativelink-proto", "@crates//:async-lock", + "@crates//:base64", "@crates//:bitflags", "@crates//:blake3", "@crates//:bytes", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index fb635af60..7fc2c03a0 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -42,6 +42,7 @@ tracing = { version = "0.1.40", default-features = false } tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter", "json"], default-features = false } uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] } mock_instant = "0.5.1" +base64 = "0.21" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 17edbf700..7867a21a7 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -30,6 +30,7 @@ pub mod operation_state_manager; pub mod origin_context; pub mod platform_properties; pub mod proto_stream_utils; +pub mod request_metadata_tracer; pub mod resource_info; pub mod retry; pub mod store_trait; diff --git a/nativelink-util/src/request_metadata_tracer.rs b/nativelink-util/src/request_metadata_tracer.rs new file mode 100644 index 000000000..8f3c7794b --- /dev/null +++ b/nativelink-util/src/request_metadata_tracer.rs @@ -0,0 +1,126 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use nativelink_error::ResultExt; +use crate::origin_context::{ActiveOriginContext, OriginContext}; +use crate::make_symbol; +use nativelink_error::{make_err, Code, Error}; +use std::sync::Arc; +use tonic::Request; +use base64::Engine; +use base64::engine::general_purpose::STANDARD_NO_PAD; +use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata; +use prost::Message; +use bytes; +use tracing::{event, Level}; +use tokio::sync::mpsc::UnboundedSender; + +type RequestMetadataTraceFnType = Box; +make_symbol!(REQUEST_METADATA_TRACE, RequestMetadataTraceFnType); + +const BAZEL_METADATA_KEY: &'static str = "build.bazel.remote.execution.v2.requestmetadata-bin"; + +#[derive(Clone, Debug)] +pub struct RequestMetadataTracer { + pub metadata: String, +} + +#[derive(Clone, Debug)] +pub struct MetadataEvent { + pub request_metadata_tracer: RequestMetadataTracer, + pub name: String // Name of who emitted the event +} + +pub fn make_ctx_request_metadata_tracer(metadata_bin: &str, metadata_tx: &UnboundedSender) -> Result, Error> { + + let mut new_ctx = ActiveOriginContext::fork().err_tip(|| "Must be in a ActiveOriginContext")?; + let metadata = RequestMetadataTracer { + metadata: metadata_bin.to_string() + }; + + let metadata_tx = metadata_tx.to_owned(); + + let sender: Box = Box::new(move |name: String| { + let metadata_event = MetadataEvent { + request_metadata_tracer: metadata.clone(), + name: name + }; + let _ = metadata_tx.send(metadata_event); + }); + + new_ctx.set_value(&REQUEST_METADATA_TRACE, Arc::new(sender)); + Ok(Arc::new(new_ctx)) +} + +pub fn emit_metadata_event(name: String) { + let sender_result = ActiveOriginContext::get_value(&REQUEST_METADATA_TRACE); + match sender_result { + Ok(Some(sender)) => { + event!(Level::DEBUG, ?name, "Sending event"); + sender(name) + }, + _ => event!(Level::WARN, ?name, "REQUEST_METADATA_TRACE not in ActiveOriginContext") + } +} + +pub fn extract_request_metadata_bin(request: &Request) -> Option { + let headers = request.metadata().clone().into_headers(); + let metadata_opt: Option<&hyper::header::HeaderValue> = headers.get(BAZEL_METADATA_KEY); + + if let Some(header_value) = metadata_opt { + match header_value.to_str() { + Ok(metadata_str) => { + event!(Level::DEBUG, ?metadata_str, "RequestMetadataTracer in header"); + return Some(RequestMetadataTracer { + metadata: metadata_str.to_string() + }) + }, + Err(err) => { + event!( + Level::ERROR, + ?err, + "Unable to extract metadata from headers", + ); + return None + } + } + } + + event!(Level::INFO, "Header does not contain bazel metadata key"); + return None +} + +impl RequestMetadataTracer { + pub fn decode(&self) -> Result { + let decoded = match STANDARD_NO_PAD.decode(self.metadata.clone()) { + Ok(decoded) => decoded, + Err(err) => { + event!(Level::ERROR, "Could not convert request data from base64: {err}"); + return Err(make_err!(Code::Internal, "Could not convert request data from base64: {err}")); + } + }; + + let buf = bytes::BytesMut::from(decoded.as_slice()); + + let request_metadata = match RequestMetadata::decode(buf) { + Ok(request_metadata) => request_metadata, + Err(err) => { + event!(Level::ERROR, "Could not convert grpc request from binary data: {err}"); + return Err(make_err!(Code::Internal, "Could not convert grpc request from binary data: {err}")); + } + }; + + return Ok(request_metadata) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 7e2276a63..fb7e2985f 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -168,6 +168,9 @@ async fn inner_main( server_start_timestamp: u64, shutdown_tx: broadcast::Sender>>, ) -> Result<(), Box> { + // rx/tx, with the rx we spawn / pool for information and flush into redis + // bep service, pack data into bep. + fn into_encoding(from: HttpCompressionAlgorithm) -> Option { match from { HttpCompressionAlgorithm::gzip => Some(CompressionEncoding::Gzip), @@ -243,6 +246,20 @@ async fn inner_main( schedulers: action_schedulers.clone(), })); + let (metadata_tx, metadata_rx) = tokio::sync::mpsc::unbounded_channel::< + nativelink_util::request_metadata_tracer::MetadataEvent, + >(); + + // let metadata_rx: Arc>> = Arc::new(Mutex::new(metadata_rx)); + + let metadata_rx: Arc< + tokio::sync::Mutex< + tokio::sync::mpsc::UnboundedReceiver< + nativelink_util::request_metadata_tracer::MetadataEvent, + >, + >, + > = Arc::new(tokio::sync::Mutex::new(metadata_rx)); + for (server_cfg, connected_clients_mux) in servers_and_clients { let services = server_cfg.services.ok_or("'services' must be configured")?; @@ -254,7 +271,7 @@ async fn inner_main( services .ac .map_or(Ok(None), |cfg| { - AcServer::new(&cfg, &store_manager).map(|v| { + AcServer::new(&cfg, &store_manager, metadata_tx.clone()).map(|v| { let mut service = v.into_service(); let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = @@ -280,7 +297,7 @@ async fn inner_main( services .cas .map_or(Ok(None), |cfg| { - CasServer::new(&cfg, &store_manager).map(|v| { + CasServer::new(&cfg, &store_manager, metadata_tx.clone()).map(|v| { let mut service = v.into_service(); let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = @@ -306,7 +323,13 @@ async fn inner_main( services .execution .map_or(Ok(None), |cfg| { - ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| { + ExecutionServer::new( + &cfg, + &action_schedulers, + &store_manager, + metadata_tx.clone(), + ) + .map(|v| { let mut service = v.into_service(); let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = @@ -364,6 +387,7 @@ async fn inner_main( CapabilitiesServer::new( services.capabilities.as_ref().unwrap(), &action_schedulers, + metadata_tx.clone(), ) }), ) @@ -422,7 +446,8 @@ async fn inner_main( services .experimental_bep .map_or(Ok(None), |cfg| { - BepServer::new(&cfg, &store_manager).map(|v| { + // Pass the rx into this service. + BepServer::new(&cfg, &store_manager, metadata_rx.clone()).map(|v| { let mut service = v.into_service(); let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) =