From 1fbcf47970fc28913ee44103331ebcd3e044a832 Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Sat, 9 Mar 2024 16:54:53 -0800 Subject: [PATCH] Add safe request timeout for running actions manager Changes running actions manager to use the action timeout if set to a nonzero value for deciding when to kill a process, otherwise defaults to its configured max_action_timeout --- .../src/running_actions_manager.rs | 12 ++- .../tests/running_actions_manager_test.rs | 90 +++++++++++++++++++ 2 files changed, 99 insertions(+), 3 deletions(-) diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 1c573070b..df7ffef40 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -760,6 +760,12 @@ impl RunningActionImpl { )) .env_clear(); + let requested_timeout = if self.action_info.timeout.is_zero() { + self.running_actions_manager.max_action_timeout + } else { + self.action_info.timeout + }; + let mut maybe_side_channel_file: Option> = None; if let Some(additional_environment) = &self .running_actions_manager @@ -776,7 +782,7 @@ impl RunningActionImpl { .map_or_else(|| Cow::Borrowed(""), |v| v.as_str()), EnvironmentSource::value(value) => Cow::Borrowed(value.as_str()), EnvironmentSource::timeout_millis => { - Cow::Owned(self.timeout.as_millis().to_string()) + Cow::Owned(requested_timeout.as_millis().to_string()) } EnvironmentSource::side_channel_file => { let file_cow = format!( @@ -922,7 +928,7 @@ impl RunningActionImpl { }; let maybe_error_override = if let Some(side_channel_file) = maybe_side_channel_file { - process_side_channel_file(side_channel_file.clone(), &args, self.timeout).await + process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await .err_tip(|| format!("Error processing side channel file: {side_channel_file:?}"))? } else { None @@ -1764,7 +1770,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { output_upload_start_timestamp: SystemTime::UNIX_EPOCH, output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }; - let timeout = if action_info.timeout == Duration::ZERO || self.timeout_handled_externally { + let timeout = if action_info.timeout.is_zero() || self.timeout_handled_externally { self.max_action_timeout } else { action_info.timeout diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index f98da961e..251e783b7 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -2930,4 +2930,94 @@ exit 1 ); Ok(()) } + + #[tokio::test] + async fn running_actions_manager_respects_action_timeout( + ) -> Result<(), Box> { + #[cfg(target_family = "unix")] + const WORKER_ID: &str = "foo_worker_id"; + const SALT: u64 = 66; + + let (_, _, cas_store, ac_store) = setup_stores().await?; + let root_work_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_work_directory).await?; + + let running_actions_manager = + Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { + root_work_directory: root_work_directory.clone(), + execution_configuration: Default::default(), + cas_store: Pin::into_inner(cas_store.clone()), + ac_store: Some(Pin::into_inner(ac_store.clone())), + historical_store: Pin::into_inner(cas_store.clone()), + upload_action_result_config: + &nativelink_config::cas_server::UploadActionResultConfig { + upload_ac_results_strategy: + nativelink_config::cas_server::UploadCacheResultsStrategy::never, + ..Default::default() + }, + max_action_timeout: Duration::from_secs(0), + timeout_handled_externally: false, + })?); + #[cfg(target_family = "unix")] + let arguments = vec!["sleep".to_string(), "2".to_string()]; + #[cfg(target_family = "windows")] + let arguments = vec!["timeout".to_string(), "2".to_string()]; + let command = Command { + arguments, + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message( + &command, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let input_root_digest = serialize_and_upload_message( + &Directory::default(), + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + platform: Some(Platform { + properties: vec![Property { + name: "property_name".into(), + value: "property_value".into(), + }], + }), + timeout: Some(prost_types::Duration { + seconds: 0, + nanos: 0, + }), + ..Default::default() + }; + let action_digest = serialize_and_upload_message( + &action, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + + let running_action_impl = running_actions_manager + .clone() + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + queued_timestamp: Some(make_system_time(1000).into()), + }, + ) + .await?; + + let result = run_action(running_action_impl).await?; + assert_eq!(result.exit_code, 9, "Action process should be been killed"); + Ok(()) + } }