Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create directory for action #752

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,21 @@ pub enum EnvironmentSource {
/// All fields are optional, file does not need to be created and may be
/// empty.
side_channel_file,

/// A "root" directory for the action. This directory can be used to
/// store temporary files that are not needed after the action has
/// completed. This directory will be purged after the action has
/// completed.
///
/// For example:
/// If an action writes temporary data to a path but nativelink should
/// clean up this path after the job has executed, you may create any
/// directory under the path provided in this variable. A common pattern
/// would be to use `entrypoint` to set a shell script that reads this
/// variable, `mkdir $ENV_VAR_NAME/tmp` and `export TMPDIR=$ENV_VAR_NAME/tmp`.
/// Another example might be to bind-mount the `/tmp` path in a container to
/// this path in `entrypoint`.
action_directory,
}

#[derive(Deserialize, Debug, Default)]
Expand Down
3 changes: 1 addition & 2 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ pub async fn new_local_worker(
};
let running_actions_manager =
Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
root_work_directory: config.work_directory.clone(),
root_action_directory: config.work_directory.clone(),
execution_configuration: ExecutionConfiguration {
entrypoint,
additional_environment: config.additional_environment.clone(),
Expand All @@ -391,7 +391,6 @@ pub async fn new_local_worker(
.timeout
.unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
let timeout_duration = Duration::from_secs_f32(timeout);

let tls_config =
tls_utils::load_client_config(&config.worker_api_endpoint.tls_config)
.err_tip(|| "Parsing local worker TLS configuration")?;
Expand Down
65 changes: 39 additions & 26 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ struct RunningActionImplState {

pub struct RunningActionImpl {
action_id: ActionId,
action_directory: String,
work_directory: String,
action_info: ActionInfo,
timeout: Duration,
Expand All @@ -601,14 +602,16 @@ impl RunningActionImpl {
fn new(
execution_metadata: ExecutionMetadata,
action_id: ActionId,
work_directory: String,
action_directory: String,
action_info: ActionInfo,
timeout: Duration,
running_actions_manager: Arc<RunningActionsManagerImpl>,
) -> Self {
let work_directory = format!("{}/{}", action_directory, "work");
let (kill_channel_tx, kill_channel_rx) = oneshot::channel();
Self {
action_id,
action_directory,
work_directory,
action_info,
timeout,
Expand Down Expand Up @@ -654,17 +657,22 @@ impl RunningActionImpl {
});
let filesystem_store_pin =
Pin::new(self.running_actions_manager.filesystem_store.as_ref());
// Download the input files/folder and place them into the temp directory.
let download_to_directory_fut =
let (command, _) = try_join(command_fut, async {
fs::create_dir(&self.work_directory)
.await
.err_tip(|| format!("Error creating work directory {}", self.work_directory))?;
// Download the input files/folder and place them into the temp directory.
self.metrics()
.download_to_directory
.wrap(download_to_directory(
cas_store_pin,
filesystem_store_pin,
&self.action_info.input_root_digest,
&self.work_directory,
));
let (command, _) = try_join(command_fut, download_to_directory_fut).await?;
))
.await
})
.await?;
command
};
{
Expand Down Expand Up @@ -779,15 +787,14 @@ impl RunningActionImpl {
Cow::Owned(self.timeout.as_millis().to_string())
}
EnvironmentSource::side_channel_file => {
let file_cow = format!(
"{}/{}/{}",
self.work_directory,
command_proto.working_directory,
Uuid::new_v4().simple(),
);
let file_cow =
format!("{}/{}", self.action_directory, Uuid::new_v4().simple());
maybe_side_channel_file = Some(Cow::Owned(file_cow.clone().into()));
Cow::Owned(file_cow)
}
EnvironmentSource::action_directory => {
Cow::Borrowed(self.action_directory.as_str())
}
};
command_builder.env(name, value.as_ref());
}
Expand Down Expand Up @@ -1211,9 +1218,14 @@ impl RunningActionImpl {
async fn inner_cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
info!("\x1b[0;31mWorker Cleanup\x1b[0m");
// Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
let remove_dir_result = fs::remove_dir_all(&self.work_directory)
let remove_dir_result = fs::remove_dir_all(&self.action_directory)
.await
.err_tip(|| format!("Could not remove working directory {}", self.work_directory));
.err_tip(|| {
format!(
"Could not remove working directory {}",
self.action_directory
)
});
self.did_cleanup.store(true, Ordering::Relaxed);
if let Err(e) = self.running_actions_manager.cleanup_action(&self.action_id) {
error!("Error cleaning up action: {e:?}");
Expand Down Expand Up @@ -1583,7 +1595,7 @@ impl UploadActionResults {
}

pub struct RunningActionsManagerArgs<'a> {
pub root_work_directory: String,
pub root_action_directory: String,
pub execution_configuration: ExecutionConfiguration,
pub cas_store: Arc<FastSlowStore>,
pub ac_store: Option<Arc<dyn Store>>,
Expand All @@ -1596,7 +1608,7 @@ pub struct RunningActionsManagerArgs<'a> {
/// Holds state info about what is being executed and the interface for interacting
/// with actions while they are running.
pub struct RunningActionsManagerImpl {
root_work_directory: String,
root_action_directory: String,
execution_configuration: ExecutionConfiguration,
cas_store: Arc<FastSlowStore>,
filesystem_store: Arc<FilesystemStore>,
Expand Down Expand Up @@ -1630,7 +1642,7 @@ impl RunningActionsManagerImpl {
})?;
let (action_done_tx, _) = watch::channel(());
Ok(Self {
root_work_directory: args.root_work_directory,
root_action_directory: args.root_action_directory,
execution_configuration: args.execution_configuration,
cas_store: args.cas_store,
filesystem_store,
Expand Down Expand Up @@ -1659,16 +1671,17 @@ impl RunningActionsManagerImpl {
)
}

fn make_work_directory<'a>(
fn make_action_directory<'a>(
&'a self,
action_id: &'a ActionId,
) -> impl Future<Output = Result<String, Error>> + 'a {
self.metrics.make_work_directory.wrap(async move {
let work_directory = format!("{}/{}", self.root_work_directory, hex::encode(action_id));
fs::create_dir(&work_directory)
self.metrics.make_action_directory.wrap(async move {
let action_directory =
format!("{}/{}", self.root_action_directory, hex::encode(action_id));
fs::create_dir(&action_directory)
.await
.err_tip(|| format!("Error creating work directory {work_directory}"))?;
Ok(work_directory)
.err_tip(|| format!("Error creating action directory {action_directory}"))?;
Ok(action_directory)
})
}

Expand Down Expand Up @@ -1751,7 +1764,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let action_info = self.create_action_info(start_execute, queued_timestamp).await?;
info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info);
let action_id = action_info.unique_qualifier.get_hash();
let work_directory = self.make_work_directory(&action_id).await?;
let action_directory = self.make_action_directory(&action_id).await?;
let execution_metadata = ExecutionMetadata {
worker: worker_id,
queued_timestamp: action_info.insert_timestamp,
Expand Down Expand Up @@ -1780,7 +1793,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
let running_action = Arc::new(RunningActionImpl::new(
execution_metadata,
action_id,
work_directory,
action_directory,
action_info,
timeout,
self.clone(),
Expand Down Expand Up @@ -1849,7 +1862,7 @@ pub struct Metrics {
cache_action_result: AsyncCounterWrapper,
kill_all: AsyncCounterWrapper,
create_action_info: AsyncCounterWrapper,
make_work_directory: AsyncCounterWrapper,
make_action_directory: AsyncCounterWrapper,
prepare_action: AsyncCounterWrapper,
execute: AsyncCounterWrapper,
upload_results: AsyncCounterWrapper,
Expand Down Expand Up @@ -1891,7 +1904,7 @@ impl MetricsComponent for Metrics {
);
c.publish(
"make_work_directory",
&self.make_work_directory,
&self.make_action_directory,
"Stats about the make_work_directory command.",
);
c.publish(
Expand Down
Loading
Loading