Skip to content

Commit

Permalink
Create directory for action (#752)
Browse files Browse the repository at this point in the history
Modifies running actions manager and workers to create an action
directory with a dedicated work directory inside instead of just
creating a work directory for the task. Other directories can be
mounted to this space and will be automatically cleaned on completion.

Co-Authored-By: Zach Birenbaum <[email protected]>
  • Loading branch information
zbirenbaum and Zach Birenbaum authored Apr 1, 2024
1 parent c60fb55 commit 414fff3
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 80 deletions.
15 changes: 15 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,21 @@ pub enum EnvironmentSource {
/// All fields are optional, file does not need to be created and may be
/// empty.
side_channel_file,

/// A "root" directory for the action. This directory can be used to
/// store temporary files that are not needed after the action has
/// completed. This directory will be purged after the action has
/// completed.
///
/// For example:
/// If an action writes temporary data to a path but nativelink should
/// clean up this path after the job has executed, you may create any
/// directory under the path provided in this variable. A common pattern
/// would be to use `entrypoint` to set a shell script that reads this
/// variable, `mkdir $ENV_VAR_NAME/tmp` and `export TMPDIR=$ENV_VAR_NAME/tmp`.
/// Another example might be to bind-mount the `/tmp` path in a container to
/// this path in `entrypoint`.
action_directory,
}

#[derive(Deserialize, Debug, Default)]
Expand Down
3 changes: 1 addition & 2 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ pub async fn new_local_worker(
};
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_work_directory: config.work_directory.clone(),
root_action_directory: config.work_directory.clone(),
execution_configuration: ExecutionConfiguration {
entrypoint,
additional_environment: config.additional_environment.clone(),
Expand All @@ -386,7 +386,6 @@ pub async fn new_local_worker(
.timeout
.unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
let timeout_duration = Duration::from_secs_f32(timeout);

let tls_config =
tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
.err_tip(|| "Parsing local worker TLS configuration")?;
Expand Down
65 changes: 39 additions & 26 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ struct RunningActionImplState {

pub struct RunningActionImpl {
action_id: ActionId,
action_directory: String,
work_directory: String,
action_info: ActionInfo,
timeout: Duration,
Expand All @@ -601,14 +602,16 @@ impl RunningActionImpl {
fn new(
execution_metadata: ExecutionMetadata,
action_id: ActionId,
work_directory: String,
action_directory: String,
action_info: ActionInfo,
timeout: Duration,
running_actions_manager: Arc<RunningActionsManagerImpl>,
) -> Self {
let work_directory = format!("{}/{}", action_directory, "work");
let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
Self {
action_id,
action_directory,
work_directory,
action_info,
timeout,
Expand Down Expand Up @@ -654,17 +657,22 @@ impl RunningActionImpl {
});
let filesystem_store_pin =
Pin::new(self.running_actions_manager.filesystem_store.as_ref());
// Download the input files/folder and place them into the temp directory.
let download_to_directory_fut =
let (command, _) = try_join(command_fut, async {
fs::create_dir(&self.work_directory)
.await
.err_tip(|| format!("Error creating work directory {}", self.work_directory))?;
// Download the input files/folder and place them into the temp directory.
self.metrics()
.download_to_directory
.wrap(download_to_directory(
cas_store_pin,
filesystem_store_pin,
&self.action_info.input_root_digest,
&self.work_directory,
));
let (command, _) = try_join(command_fut, download_to_directory_fut).await?;
))
.await
})
.await?;
command
};
{
Expand Down Expand Up @@ -779,15 +787,14 @@ impl RunningActionImpl {
Cow::Owned(self.timeout.as_millis().to_string())
}
EnvironmentSource::side_channel_file => {
let file_cow = format!(
"{}/{}/{}",
self.work_directory,
command_proto.working_directory,
Uuid::new_v4().simple(),
);
let file_cow =
format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
Cow::Owned(file_cow)
}
EnvironmentSource::action_directory => {
Cow::Borrowed(self.action_directory.as_str())
}
};
command_builder.env(name, value.as_ref());
}
Expand Down Expand Up @@ -1211,9 +1218,14 @@ impl RunningActionImpl {
async fn inner_cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
info!("\x1b[0;31mWorker Cleanup\x1b[0m");
// Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
let remove_dir_result = fs::remove_dir_all(&self.work_directory)
let remove_dir_result = fs::remove_dir_all(&self.action_directory)
.await
.err_tip(|| format!("Could not remove working directory {}", self.work_directory));
.err_tip(|| {
format!(
"Could not remove working directory {}",
self.action_directory
)
});
self.did_cleanup.store(true, Ordering::Relaxed);
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) {
error!("Error cleaning up action: {e:?}");
Expand Down Expand Up @@ -1583,7 +1595,7 @@ impl UploadActionResults {
}

pub struct RunningActionsManagerArgs<'a> {
pub root_work_directory: String,
pub root_action_directory: String,
pub execution_configuration: ExecutionConfiguration,
pub cas_store: Arc<FastSlowStore>,
pub ac_store: Option<Arc<dyn Store>>,
Expand All @@ -1596,7 +1608,7 @@ pub struct RunningActionsManagerArgs<'a> {
/// Holds state info about what is being executed and the interface for interacting
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
root_work_directory: String,
root_action_directory: String,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
Expand Down Expand Up @@ -1630,7 +1642,7 @@ impl RunningActionsManagerImpl {
})?;
let (action_done_tx, _) = watch::channel(());
Ok(Self {
root_work_directory: args.root_work_directory,
root_action_directory: args.root_action_directory,
execution_configuration: args.execution_configuration,
cas_store: args.cas_store,
filesystem_store,
Expand Down Expand Up @@ -1659,16 +1671,17 @@ impl RunningActionsManagerImpl {
)
}

fn make_work_directory<'a>(
fn make_action_directory<'a>(
&'a self,
action_id: &'a ActionId,
) -> impl Future<Output = Result<String, Error>> + 'a {
self.metrics.make_work_directory.wrap(async move {
let work_directory = format!("{}/{}", self.root_work_directory, hex::encode(action_id));
fs::create_dir(&work_directory)
self.metrics.make_action_directory.wrap(async move {
let action_directory =
format!("{}/{}", self.root_action_directory, hex::encode(action_id));
fs::create_dir(&action_directory)
.await
.err_tip(|| format!("Error creating work directory {work_directory}"))?;
Ok(work_directory)
.err_tip(|| format!("Error creating action directory {action_directory}"))?;
Ok(action_directory)
})
}

Expand Down Expand Up @@ -1751,7 +1764,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let action_info = self.create_action_info(start_execute, queued_timestamp).await?;
info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info);
let action_id = action_info.unique_qualifier.get_hash();
let work_directory = self.make_work_directory(&action_id).await?;
let action_directory = self.make_action_directory(&action_id).await?;
let execution_metadata = ExecutionMetadata {
worker: worker_id,
queued_timestamp: action_info.insert_timestamp,
Expand Down Expand Up @@ -1780,7 +1793,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let running_action = Arc::new(RunningActionImpl::new(
execution_metadata,
action_id,
work_directory,
action_directory,
action_info,
timeout,
self.clone(),
Expand Down Expand Up @@ -1849,7 +1862,7 @@ pub struct Metrics {
cache_action_result: AsyncCounterWrapper,
kill_all: AsyncCounterWrapper,
create_action_info: AsyncCounterWrapper,
make_work_directory: AsyncCounterWrapper,
make_action_directory: AsyncCounterWrapper,
prepare_action: AsyncCounterWrapper,
execute: AsyncCounterWrapper,
upload_results: AsyncCounterWrapper,
Expand Down Expand Up @@ -1891,7 +1904,7 @@ impl MetricsComponent for Metrics {
);
c.publish(
"make_work_directory",
&self.make_work_directory,
&self.make_action_directory,
"Stats about the make_work_directory command.",
);
c.publish(
Expand Down
Loading

0 comments on commit 414fff3

Please sign in to comment.