Skip to content

Commit

Permalink
Create directory for action
Browse files Browse the repository at this point in the history
Modifies running actions manager and workers to create an action
directory with a dedicated work directory inside instead of just
creating a work directory for the task. Other directories can be
mounted to this space and will be automatically cleaned on completion.
  • Loading branch information
Zach Birenbaum committed Mar 27, 2024
1 parent 646253d commit 9f17d81
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 80 deletions.
6 changes: 6 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ pub enum EnvironmentSource {
/// All fields are optional, file does not need to be created and may be
/// empty.
side_channel_file,

/// A "root" directory for the action. This directory can be used to
/// store temporary files that are not needed after the action has
/// completed. This directory will be purged after the action has
/// completed.
action_directory,
}

#[derive(Deserialize, Debug, Default)]
Expand Down
3 changes: 1 addition & 2 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ pub async fn new_local_worker(
};
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_work_directory: config.work_directory.clone(),
root_action_directory: config.work_directory.clone(),
execution_configuration: ExecutionConfiguration {
entrypoint,
additional_environment: config.additional_environment.clone(),
Expand All @@ -391,7 +391,6 @@ pub async fn new_local_worker(
.timeout
.unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
let timeout_duration = Duration::from_secs_f32(timeout);

let tls_config =
tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
.err_tip(|| "Parsing local worker TLS configuration")?;
Expand Down
65 changes: 39 additions & 26 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ struct RunningActionImplState {

pub struct RunningActionImpl {
action_id: ActionId,
action_directory: String,
work_directory: String,
action_info: ActionInfo,
timeout: Duration,
Expand All @@ -601,14 +602,16 @@ impl RunningActionImpl {
fn new(
execution_metadata: ExecutionMetadata,
action_id: ActionId,
work_directory: String,
action_directory: String,
action_info: ActionInfo,
timeout: Duration,
running_actions_manager: Arc<RunningActionsManagerImpl>,
) -> Self {
let work_directory = format!("{}/{}", action_directory, "work");
let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
Self {
action_id,
action_directory,
work_directory,
action_info,
timeout,
Expand Down Expand Up @@ -654,17 +657,22 @@ impl RunningActionImpl {
});
let filesystem_store_pin =
Pin::new(self.running_actions_manager.filesystem_store.as_ref());
// Download the input files/folder and place them into the temp directory.
let download_to_directory_fut =
let (command, _) = try_join(command_fut, async {
fs::create_dir(&self.work_directory)
.await
.err_tip(|| format!("Error creating work directory {}", self.work_directory))?;
// Download the input files/folder and place them into the temp directory.
self.metrics()
.download_to_directory
.wrap(download_to_directory(
cas_store_pin,
filesystem_store_pin,
&self.action_info.input_root_digest,
&self.work_directory,
));
let (command, _) = try_join(command_fut, download_to_directory_fut).await?;
))
.await
})
.await?;
command
};
{
Expand Down Expand Up @@ -779,15 +787,14 @@ impl RunningActionImpl {
Cow::Owned(self.timeout.as_millis().to_string())
}
EnvironmentSource::side_channel_file => {
let file_cow = format!(
"{}/{}/{}",
self.work_directory,
command_proto.working_directory,
Uuid::new_v4().simple(),
);
let file_cow =
format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
Cow::Owned(file_cow)
}
EnvironmentSource::action_directory => {
Cow::Borrowed(self.action_directory.as_str())
}
};
command_builder.env(name, value.as_ref());
}
Expand Down Expand Up @@ -1211,9 +1218,14 @@ impl RunningActionImpl {
async fn inner_cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
info!("\x1b[0;31mWorker Cleanup\x1b[0m");
// Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
let remove_dir_result = fs::remove_dir_all(&self.work_directory)
let remove_dir_result = fs::remove_dir_all(&self.action_directory)
.await
.err_tip(|| format!("Could not remove working directory {}", self.work_directory));
.err_tip(|| {
format!(
"Could not remove working directory {}",
self.action_directory
)
});
self.did_cleanup.store(true, Ordering::Relaxed);
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) {
error!("Error cleaning up action: {e:?}");
Expand Down Expand Up @@ -1583,7 +1595,7 @@ impl UploadActionResults {
}

pub struct RunningActionsManagerArgs<'a> {
pub root_work_directory: String,
pub root_action_directory: String,
pub execution_configuration: ExecutionConfiguration,
pub cas_store: Arc<FastSlowStore>,
pub ac_store: Option<Arc<dyn Store>>,
Expand All @@ -1596,7 +1608,7 @@ pub struct RunningActionsManagerArgs<'a> {
/// Holds state info about what is being executed and the interface for interacting
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
root_work_directory: String,
root_action_directory: String,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
Expand Down Expand Up @@ -1630,7 +1642,7 @@ impl RunningActionsManagerImpl {
})?;
let (action_done_tx, _) = watch::channel(());
Ok(Self {
root_work_directory: args.root_work_directory,
root_action_directory: args.root_action_directory,
execution_configuration: args.execution_configuration,
cas_store: args.cas_store,
filesystem_store,
Expand Down Expand Up @@ -1659,16 +1671,17 @@ impl RunningActionsManagerImpl {
)
}

fn make_work_directory<'a>(
fn make_action_directory<'a>(
&'a self,
action_id: &'a ActionId,
) -> impl Future<Output = Result<String, Error>> + 'a {
self.metrics.make_work_directory.wrap(async move {
let work_directory = format!("{}/{}", self.root_work_directory, hex::encode(action_id));
fs::create_dir(&work_directory)
self.metrics.make_action_directory.wrap(async move {
let action_directory =
format!("{}/{}", self.root_action_directory, hex::encode(action_id));
fs::create_dir(&action_directory)
.await
.err_tip(|| format!("Error creating work directory {work_directory}"))?;
Ok(work_directory)
.err_tip(|| format!("Error creating action directory {action_directory}"))?;
Ok(action_directory)
})
}

Expand Down Expand Up @@ -1751,7 +1764,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let action_info = self.create_action_info(start_execute, queued_timestamp).await?;
info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info);
let action_id = action_info.unique_qualifier.get_hash();
let work_directory = self.make_work_directory(&action_id).await?;
let action_directory = self.make_action_directory(&action_id).await?;
let execution_metadata = ExecutionMetadata {
worker: worker_id,
queued_timestamp: action_info.insert_timestamp,
Expand Down Expand Up @@ -1780,7 +1793,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let running_action = Arc::new(RunningActionImpl::new(
execution_metadata,
action_id,
work_directory,
action_directory,
action_info,
timeout,
self.clone(),
Expand Down Expand Up @@ -1849,7 +1862,7 @@ pub struct Metrics {
cache_action_result: AsyncCounterWrapper,
kill_all: AsyncCounterWrapper,
create_action_info: AsyncCounterWrapper,
make_work_directory: AsyncCounterWrapper,
make_action_directory: AsyncCounterWrapper,
prepare_action: AsyncCounterWrapper,
execute: AsyncCounterWrapper,
upload_results: AsyncCounterWrapper,
Expand Down Expand Up @@ -1891,7 +1904,7 @@ impl MetricsComponent for Metrics {
);
c.publish(
"make_work_directory",
&self.make_work_directory,
&self.make_action_directory,
"Stats about the make_work_directory command.",
);
c.publish(
Expand Down
Loading

0 comments on commit 9f17d81

Please sign in to comment.