diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 3cf245d23..6ac9427e8 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -768,6 +768,12 @@ impl RunningActionImpl { )) .env_clear(); + let requested_timeout = if self.action_info.timeout.is_zero() { + self.running_actions_manager.max_action_timeout + } else { + self.action_info.timeout + }; + let mut maybe_side_channel_file: Option> = None; if let Some(additional_environment) = &self .running_actions_manager @@ -784,7 +790,7 @@ impl RunningActionImpl { .map_or_else(|| Cow::Borrowed(""), |v| v.as_str()), EnvironmentSource::value(value) => Cow::Borrowed(value.as_str()), EnvironmentSource::timeout_millis => { - Cow::Owned(self.timeout.as_millis().to_string()) + Cow::Owned(requested_timeout.as_millis().to_string()) } EnvironmentSource::side_channel_file => { let file_cow = @@ -929,7 +935,7 @@ impl RunningActionImpl { }; let maybe_error_override = if let Some(side_channel_file) = maybe_side_channel_file { - process_side_channel_file(side_channel_file.clone(), &args, self.timeout).await + process_side_channel_file(side_channel_file.clone(), &args, requested_timeout).await .err_tip(|| format!("Error processing side channel file: {side_channel_file:?}"))? } else { None @@ -1777,7 +1783,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { output_upload_start_timestamp: SystemTime::UNIX_EPOCH, output_upload_completed_timestamp: SystemTime::UNIX_EPOCH, }; - let timeout = if action_info.timeout == Duration::ZERO || self.timeout_handled_externally { + let timeout = if action_info.timeout.is_zero() || self.timeout_handled_externally { self.max_action_timeout } else { action_info.timeout diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index f24c8f5d6..58d054c9d 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -3018,4 +3018,111 @@ exit 1 ); Ok(()) } + + #[tokio::test] + async fn running_actions_manager_respects_action_timeout( + ) -> Result<(), Box> { + const WORKER_ID: &str = "foo_worker_id"; + const SALT: u64 = 66; + + let (_, _, cas_store, ac_store) = setup_stores().await?; + let root_action_directory = make_temp_path("root_work_directory"); + fs::create_dir_all(&root_action_directory).await?; + + // Ignore the sleep and immediately timeout. + static ACTION_TIMEOUT: i64 = 1; + fn test_monotonic_clock() -> SystemTime { + static CLOCK: AtomicU64 = AtomicU64::new(0); + monotonic_clock(&CLOCK) + } + + let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( + RunningActionsManagerArgs { + root_action_directory, + execution_configuration: Default::default(), + cas_store: Pin::into_inner(cas_store.clone()), + ac_store: Some(Pin::into_inner(ac_store.clone())), + historical_store: Pin::into_inner(cas_store.clone()), + upload_action_result_config: + &nativelink_config::cas_server::UploadActionResultConfig { + upload_ac_results_strategy: + nativelink_config::cas_server::UploadCacheResultsStrategy::never, + ..Default::default() + }, + max_action_timeout: Duration::MAX, + timeout_handled_externally: false, + }, + Callbacks { + now_fn: test_monotonic_clock, + // If action_timeout is the passed duration then return immeidately, + // which will cause the action to be killed and pass the test, + // otherwise return pending and fail the test. + sleep_fn: |duration| { + assert_eq!(duration.as_secs(), ACTION_TIMEOUT as u64); + Box::pin(futures::future::ready(())) + }, + }, + )?); + #[cfg(target_family = "unix")] + let arguments = vec!["sleep".to_string(), "2".to_string()]; + #[cfg(target_family = "windows")] + let arguments = vec!["timeout".to_string(), "/t".to_string(), "2".to_string()]; + let command = Command { + arguments, + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message( + &command, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let input_root_digest = serialize_and_upload_message( + &Directory::default(), + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + platform: Some(Platform { + properties: vec![Property { + name: "property_name".into(), + value: "property_value".into(), + }], + }), + timeout: Some(prost_types::Duration { + seconds: ACTION_TIMEOUT, + nanos: 0, + }), + ..Default::default() + }; + let action_digest = serialize_and_upload_message( + &action, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + + let running_action_impl = running_actions_manager + .clone() + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + queued_timestamp: Some(make_system_time(1000).into()), + }, + ) + .await?; + + let result = run_action(running_action_impl).await?; + assert_eq!(result.exit_code, 9, "Action process should be been killed"); + Ok(()) + } }