Skip to content

Commit

Permalink
Implement unoffloading
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Sep 20, 2024
1 parent a755dd5 commit 25461cc
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 23 deletions.
4 changes: 3 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ async fn timeline_archival_config_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;

let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);

let request_data: TimelineArchivalConfigRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
Expand All @@ -709,7 +711,7 @@ async fn timeline_archival_config_handler(
.get_attached_tenant_shard(tenant_shard_id)?;

tenant
.apply_timeline_archival_config(timeline_id, request_data.state)
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
.await?;
Ok::<_, ApiError>(())
}
Expand Down
106 changes: 84 additions & 22 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,9 +1366,11 @@ impl Tenant {
let mut part_downloads = JoinSet::new();
for timeline_id in timeline_ids {
let cancel_clone = cancel.clone();
part_downloads.spawn(
self.clone().load_timeline_metadata(timeline_id, remote_storage.clone(), cancel_clone)
);
part_downloads.spawn(self.clone().load_timeline_metadata(
timeline_id,
remote_storage.clone(),
cancel_clone,
));
}

let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
Expand Down Expand Up @@ -1397,9 +1399,10 @@ impl Tenant {
}

pub(crate) async fn apply_timeline_archival_config(
&self,
self: &Arc<Self>,
timeline_id: TimelineId,
state: TimelineArchivalState,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
let timeline_or_unarchive_offloaded = 'outer: {
Expand Down Expand Up @@ -1465,26 +1468,85 @@ impl Tenant {
Some(Arc::clone(timeline))
};

if let Some(timeline) = timeline_or_unarchive_offloaded {
let upload_needed = timeline
.remote_client
.schedule_index_upload_for_timeline_archival_state(state)?;

if upload_needed {
info!("Uploading new state");
const MAX_WAIT: Duration = Duration::from_secs(10);
let Ok(v) =
tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await
else {
tracing::warn!("reached timeout for waiting on upload queue");
return Err(TimelineArchivalError::Timeout);
};
v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?;
}
Ok(())
let timeline = if let Some(timeline) = timeline_or_unarchive_offloaded {
timeline
} else {
Ok(())
let cancel = self.cancel.clone();
let timeline_preload = Self::load_timeline_metadata(
self.clone(),
timeline_id,
self.remote_storage.clone(),
cancel,
)
.await?;

let index_part = match timeline_preload.index_part {
Ok(index_part) => {
debug!("remote index part exists for timeline {timeline_id}");
index_part
}
Err(DownloadError::NotFound) => {
info!(%timeline_id, "index_part not found on remote");
return Err(TimelineArchivalError::NotFound);
}
Err(e) => {
// Some (possibly ephemeral) error happened during index_part download.
warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
return Err(TimelineArchivalError::Other(
anyhow::Error::new(e).context("downloading index_part from remote storage"),
));
}
};
let index_part = match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_index_part) => {
info!("timeline is deleted according to index_part.json");
return Err(TimelineArchivalError::NotFound);
}
};
let remote_metadata = index_part.metadata.clone();
let timeline_resources = self.build_timeline_resources(timeline_id);
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
timeline_resources,
&ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_shard_id
)
})?;
let timelines = self.timelines.lock().unwrap();
if let Some(timeline) = timelines.get(&timeline_id) {
Arc::clone(timeline)
} else {
warn!("timeline not available directly after attach");
return Err(TimelineArchivalError::Other(anyhow::anyhow!(
"timeline not available directly after attach"
)));
}
};

let upload_needed = timeline
.remote_client
.schedule_index_upload_for_timeline_archival_state(state)?;

if upload_needed {
info!("Uploading new state");
const MAX_WAIT: Duration = Duration::from_secs(10);
let Ok(v) =
tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await
else {
tracing::warn!("reached timeout for waiting on upload queue");
return Err(TimelineArchivalError::Timeout);
};
v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?;
}
Ok(())
}

pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
Expand Down

0 comments on commit 25461cc

Please sign in to comment.