From e1af2e1b83d0e3f6d5bb413e24e80830fd06a946 Mon Sep 17 00:00:00 2001 From: Zach Birenbaum Date: Mon, 11 Mar 2024 15:58:57 -0700 Subject: [PATCH] Create directory for action Modifies running actions manager and workers to create an action directory with a dedicated work directory inside instead of just creating a work directory for the task. Other directories can be mounted to this space and will be automatically cleaned on completion. --- nativelink-config/src/cas_server.rs | 15 ++ nativelink-worker/src/local_worker.rs | 3 +- .../src/running_actions_manager.rs | 65 +++--- .../tests/running_actions_manager_test.rs | 192 +++++++++++++----- 4 files changed, 195 insertions(+), 80 deletions(-) diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 6631dce8d..a22068da3 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -448,6 +448,21 @@ pub enum EnvironmentSource { /// All fields are optional, file does not need to be created and may be /// empty. side_channel_file, + + /// A "root" directory for the action. This directory can be used to + /// store temporary files that are not needed after the action has + /// completed. This directory will be purged after the action has + /// completed. + /// + /// For example: + /// If an action writes temporary data to a path but nativelink should + /// clean up this path after the job has executed, you may create any + /// directory under the path provided in this variable. A common pattern + /// would be to use `entrypoint` to set a shell script that reads this + /// variable, `mkdir $ENV_VAR_NAME/tmp` and `export TMPDIR=$ENV_VAR_NAME/tmp`. + /// Another example might be to bind-mount the `/tmp` path in a container to + /// this path in `entrypoint`. + action_directory, } #[derive(Deserialize, Debug, Default)] diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 9a8b94ac6..6048f5703 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -367,7 +367,7 @@ pub async fn new_local_worker( }; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: config.work_directory.clone(), + root_action_directory: config.work_directory.clone(), execution_configuration: ExecutionConfiguration { entrypoint, additional_environment: config.additional_environment.clone(), @@ -391,7 +391,6 @@ pub async fn new_local_worker( .timeout .unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S); let timeout_duration = Duration::from_secs_f32(timeout); - let tls_config = tls_utils::load_client_config(&config.worker_api_endpoint.tls_config) .err_tip(|| "Parsing local worker TLS configuration")?; diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 686be8629..dcdfea4ea 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -589,6 +589,7 @@ struct RunningActionImplState { pub struct RunningActionImpl { action_id: ActionId, + action_directory: String, work_directory: String, action_info: ActionInfo, timeout: Duration, @@ -601,14 +602,16 @@ impl RunningActionImpl { fn new( execution_metadata: ExecutionMetadata, action_id: ActionId, - work_directory: String, + action_directory: String, action_info: ActionInfo, timeout: Duration, running_actions_manager: Arc, ) -> Self { + let work_directory = format!("{}/{}", action_directory, "work"); let (kill_channel_tx, kill_channel_rx) = oneshot::channel(); Self { action_id, + action_directory, work_directory, action_info, timeout, @@ -654,8 +657,11 @@ impl RunningActionImpl { }); let filesystem_store_pin = Pin::new(self.running_actions_manager.filesystem_store.as_ref()); - // Download the input files/folder and place them into the temp directory. - let download_to_directory_fut = + let (command, _) = try_join(command_fut, async { + fs::create_dir(&self.work_directory) + .await + .err_tip(|| format!("Error creating work directory {}", self.work_directory))?; + // Download the input files/folder and place them into the temp directory. self.metrics() .download_to_directory .wrap(download_to_directory( @@ -663,8 +669,10 @@ impl RunningActionImpl { filesystem_store_pin, &self.action_info.input_root_digest, &self.work_directory, - )); - let (command, _) = try_join(command_fut, download_to_directory_fut).await?; + )) + .await + }) + .await?; command }; { @@ -779,15 +787,14 @@ impl RunningActionImpl { Cow::Owned(self.timeout.as_millis().to_string()) } EnvironmentSource::side_channel_file => { - let file_cow = format!( - "{}/{}/{}", - self.work_directory, - command_proto.working_directory, - Uuid::new_v4().simple(), - ); + let file_cow = + format!("{}/{}", self.action_directory, Uuid::new_v4().simple()); maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into())); Cow::Owned(file_cow) } + EnvironmentSource::action_directory => { + Cow::Borrowed(self.action_directory.as_str()) + } }; command_builder.env(name, value.as_ref()); } @@ -1211,9 +1218,14 @@ impl RunningActionImpl { async fn inner_cleanup(self: Arc) -> Result, Error> { info!("\x1b[0;31mWorker Cleanup\x1b[0m"); // Note: We need to be careful to keep trying to cleanup even if one of the steps fails. - let remove_dir_result = fs::remove_dir_all(&self.work_directory) + let remove_dir_result = fs::remove_dir_all(&self.action_directory) .await - .err_tip(|| format!("Could not remove working directory {}", self.work_directory)); + .err_tip(|| { + format!( + "Could not remove working directory {}", + self.action_directory + ) + }); self.did_cleanup.store(true, Ordering::Relaxed); if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) { error!("Error cleaning up action: {e:?}"); @@ -1583,7 +1595,7 @@ impl UploadActionResults { } pub struct RunningActionsManagerArgs<'a> { - pub root_work_directory: String, + pub root_action_directory: String, pub execution_configuration: ExecutionConfiguration, pub cas_store: Arc, pub ac_store: Option>, @@ -1596,7 +1608,7 @@ pub struct RunningActionsManagerArgs<'a> { /// Holds state info about what is being executed and the interface for interacting /// with actions while they are running. pub struct RunningActionsManagerImpl { - root_work_directory: String, + root_action_directory: String, execution_configuration: ExecutionConfiguration, cas_store: Arc, filesystem_store: Arc, @@ -1630,7 +1642,7 @@ impl RunningActionsManagerImpl { })?; let (action_done_tx, _) = watch::channel(()); Ok(Self { - root_work_directory: args.root_work_directory, + root_action_directory: args.root_action_directory, execution_configuration: args.execution_configuration, cas_store: args.cas_store, filesystem_store, @@ -1659,16 +1671,17 @@ impl RunningActionsManagerImpl { ) } - fn make_work_directory<'a>( + fn make_action_directory<'a>( &'a self, action_id: &'a ActionId, ) -> impl Future> + 'a { - self.metrics.make_work_directory.wrap(async move { - let work_directory = format!("{}/{}", self.root_work_directory, hex::encode(action_id)); - fs::create_dir(&work_directory) + self.metrics.make_action_directory.wrap(async move { + let action_directory = + format!("{}/{}", self.root_action_directory, hex::encode(action_id)); + fs::create_dir(&action_directory) .await - .err_tip(|| format!("Error creating work directory {work_directory}"))?; - Ok(work_directory) + .err_tip(|| format!("Error creating action directory {action_directory}"))?; + Ok(action_directory) }) } @@ -1751,7 +1764,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { let action_info = self.create_action_info(start_execute, queued_timestamp).await?; info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info); let action_id = action_info.unique_qualifier.get_hash(); - let work_directory = self.make_work_directory(&action_id).await?; + let action_directory = self.make_action_directory(&action_id).await?; let execution_metadata = ExecutionMetadata { worker: worker_id, queued_timestamp: action_info.insert_timestamp, @@ -1780,7 +1793,7 @@ impl RunningActionsManager for RunningActionsManagerImpl { let running_action = Arc::new(RunningActionImpl::new( execution_metadata, action_id, - work_directory, + action_directory, action_info, timeout, self.clone(), @@ -1849,7 +1862,7 @@ pub struct Metrics { cache_action_result: AsyncCounterWrapper, kill_all: AsyncCounterWrapper, create_action_info: AsyncCounterWrapper, - make_work_directory: AsyncCounterWrapper, + make_action_directory: AsyncCounterWrapper, prepare_action: AsyncCounterWrapper, execute: AsyncCounterWrapper, upload_results: AsyncCounterWrapper, @@ -1891,7 +1904,7 @@ impl MetricsComponent for Metrics { ); c.publish( "make_work_directory", - &self.make_work_directory, + &self.make_action_directory, "Stats about the make_work_directory command.", ); c.publish( diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index ac9c22c31..ebeb9dec8 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -426,12 +426,12 @@ mod running_actions_manager_tests { } let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -540,12 +540,12 @@ mod running_actions_manager_tests { } let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -656,12 +656,12 @@ mod running_actions_manager_tests { } let (_, slow_store, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -828,12 +828,12 @@ mod running_actions_manager_tests { } let (_, slow_store, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -1001,12 +1001,12 @@ mod running_actions_manager_tests { } let (_, slow_store, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -1200,12 +1200,12 @@ mod running_actions_manager_tests { } let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -1313,10 +1313,10 @@ mod running_actions_manager_tests { message: String::new(), } ); - let mut dir_stream = fs::read_dir(&root_work_directory).await?; + let mut dir_stream = fs::read_dir(&root_action_directory).await?; assert!( dir_stream.as_mut().next_entry().await?.is_none(), - "Expected empty directory at {root_work_directory}" + "Expected empty directory at {root_action_directory}" ); Ok(()) } @@ -1327,12 +1327,12 @@ mod running_actions_manager_tests { const SALT: u64 = 55; let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -1457,8 +1457,8 @@ exit 0 const EXPECTED_STDOUT: &str = "Action did run"; let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let test_wrapper_script = { let test_wrapper_dir = make_temp_path("wrapper_dir"); @@ -1488,7 +1488,7 @@ exit 0 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration { entrypoint: Some(test_wrapper_script.into_string().unwrap()), additional_environment: None, @@ -1600,8 +1600,8 @@ exit 0 const EXPECTED_STDOUT: &str = "Action did run"; let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let test_wrapper_script = { let test_wrapper_dir = make_temp_path("wrapper_dir"); @@ -1631,7 +1631,7 @@ exit 0 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration { entrypoint: Some(test_wrapper_script.into_string().unwrap()), additional_environment: Some(HashMap::from([ @@ -1765,8 +1765,8 @@ exit 1 const SALT: u64 = 66; let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let test_wrapper_script = { let test_wrapper_dir = make_temp_path("wrapper_dir"); @@ -1796,7 +1796,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration { entrypoint: Some(test_wrapper_script.into_string().unwrap()), additional_environment: Some(HashMap::from([( @@ -1876,7 +1876,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -1949,7 +1949,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2021,7 +2021,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2122,7 +2122,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2162,7 +2162,7 @@ exit 1 let (_, _, cas_store, ac_store) = setup_stores().await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2228,7 +2228,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { - root_work_directory: String::new(), + root_action_directory: String::new(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2284,8 +2284,8 @@ exit 1 monotonic_clock(&CLOCK) } - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let (_, _, cas_store, ac_store) = setup_stores().await?; @@ -2342,7 +2342,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2419,7 +2419,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2496,7 +2496,7 @@ exit 1 let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2565,13 +2565,13 @@ exit 1 let (tx, rx) = oneshot::channel(); Mutex::new((Some(tx), Some(rx))) }); - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let (_, _, cas_store, ac_store) = setup_stores().await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2687,13 +2687,13 @@ exit 1 monotonic_clock(&CLOCK) } - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let (_, _, cas_store, ac_store) = setup_stores().await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory: root_work_directory.clone(), + root_action_directory: root_action_directory.clone(), execution_configuration: ExecutionConfiguration::default(), cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), @@ -2847,12 +2847,12 @@ exit 1 } let (_, _, cas_store, ac_store) = setup_stores().await?; - let root_work_directory = make_temp_path("root_work_directory"); - fs::create_dir_all(&root_work_directory).await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks( RunningActionsManagerArgs { - root_work_directory, + root_action_directory, cas_store: Pin::into_inner(cas_store.clone()), ac_store: Some(Pin::into_inner(ac_store.clone())), execution_configuration: ExecutionConfiguration::default(), @@ -2930,4 +2930,92 @@ exit 1 ); Ok(()) } + + #[tokio::test] + async fn action_directory_contents_are_cleaned() -> Result<(), Box> { + const WORKER_ID: &str = "foo_worker_id"; + + let (_, _, cas_store, ac_store) = setup_stores().await?; + let root_action_directory = make_temp_path("root_action_directory"); + fs::create_dir_all(&root_action_directory).await?; + let temp_action_directory = make_temp_path("root_action_directory/temp"); + fs::create_dir_all(&temp_action_directory).await?; + + let running_actions_manager = + Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs { + root_action_directory: root_action_directory.clone(), + execution_configuration: ExecutionConfiguration::default(), + cas_store: Pin::into_inner(cas_store.clone()), + ac_store: Some(Pin::into_inner(ac_store.clone())), + historical_store: Pin::into_inner(cas_store.clone()), + upload_action_result_config: + &nativelink_config::cas_server::UploadActionResultConfig { + upload_ac_results_strategy: + nativelink_config::cas_server::UploadCacheResultsStrategy::never, + ..Default::default() + }, + max_action_timeout: Duration::MAX, + timeout_handled_externally: false, + })?); + let queued_timestamp = make_system_time(1000); + + #[cfg(target_family = "unix")] + let arguments = vec!["sh".to_string(), "-c".to_string(), "exit 0".to_string()]; + #[cfg(target_family = "windows")] + let arguments = vec!["cmd".to_string(), "/C".to_string(), "exit 0".to_string()]; + + const SALT: u64 = 55; + let command = Command { + arguments, + output_paths: vec![], + working_directory: ".".to_string(), + ..Default::default() + }; + let command_digest = serialize_and_upload_message( + &command, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let input_root_digest = serialize_and_upload_message( + &Directory::default(), + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + let action = Action { + command_digest: Some(command_digest.into()), + input_root_digest: Some(input_root_digest.into()), + ..Default::default() + }; + let action_digest = serialize_and_upload_message( + &action, + cas_store.as_ref(), + &mut DigestHasherFunc::Sha256.hasher(), + ) + .await?; + + let running_action_impl = running_actions_manager + .create_and_add_action( + WORKER_ID.to_string(), + StartExecute { + execute_request: Some(ExecuteRequest { + action_digest: Some(action_digest.into()), + ..Default::default() + }), + salt: SALT, + queued_timestamp: Some(queued_timestamp.into()), + }, + ) + .await?; + + run_action(running_action_impl.clone()).await?; + + let mut dir_stream = fs::read_dir(&root_action_directory).await?; + assert!( + dir_stream.as_mut().next_entry().await?.is_none(), + "Expected empty directory at {root_action_directory}" + ); + Ok(()) + } }