Skip to content

Commit

Permalink
Merge pull request #899 from near/main
Browse files Browse the repository at this point in the history
Prod Release 19/07/24 - 2
  • Loading branch information
morgsmccauley authored Jul 19, 2024
2 parents 35fef33 + 6eccf87 commit af40c14
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::indexer_config::IndexerConfig;

type TaskId = String;

const TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes

#[derive(Clone)]
pub struct DataLayerHandler {
client: DataLayerClient<Channel>,
Expand Down Expand Up @@ -95,8 +97,6 @@ impl DataLayerHandler {
}

pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer");

let start_task_result = self.start_provisioning_task(indexer_config).await;

if let Err(error) = start_task_result {
Expand All @@ -110,24 +110,36 @@ impl DataLayerHandler {

let task_id = start_task_result.unwrap();

tracing::info!(?task_id, "Started provisioning task");

let mut iterations = 0;
let delay_seconds = 1;

loop {
if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete {
break;
match self.get_task_status(task_id.clone()).await? {
TaskStatus::Pending => {}
TaskStatus::Complete => break,
TaskStatus::Failed | TaskStatus::Unspecified => {
tracing::warn!("Provisioning task failed");
anyhow::bail!("Provisioning task failed")
}
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;

iterations += 1;

if iterations * delay_seconds % 60 == 0 {
if iterations * delay_seconds % 10 == 0 {
let delay = iterations * delay_seconds;

if delay > TASK_TIMEOUT_SECONDS {
tracing::warn!("Provisioning task timed out");
anyhow::bail!("Provisioning task timed out");
}

tracing::warn!(
?indexer_config.account_id,
?indexer_config.function_name,
"Still waiting for provisioning to complete after {} seconds",
iterations * delay_seconds
delay
);
}
}
Expand All @@ -140,30 +152,40 @@ impl DataLayerHandler {
account_id: AccountId,
function_name: String,
) -> anyhow::Result<()> {
tracing::info!(?account_id, ?function_name, "Deprovisioning data layer");

let task_id = self
.start_deprovisioning_task(account_id.clone(), function_name.clone())
.await?;

tracing::info!(?task_id, "Started deprovisioning task");

let mut iterations = 0;
let delay_seconds = 1;

loop {
if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete {
break;
match self.get_task_status(task_id.clone()).await? {
TaskStatus::Pending => {}
TaskStatus::Complete => break,
TaskStatus::Failed | TaskStatus::Unspecified => {
tracing::warn!("Deprovisioning task failed");
anyhow::bail!("Deprovisioning task failed")
}
}

tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await;

iterations += 1;

if iterations * delay_seconds % 60 == 0 {
if iterations * delay_seconds % 10 == 0 {
let delay = iterations * delay_seconds;

if delay > TASK_TIMEOUT_SECONDS {
tracing::warn!("Deprovisioning task timed out");
anyhow::bail!("Deprovisioning task timed out");
}

tracing::warn!(
?account_id,
?function_name,
"Still waiting for deprovisioning to complete after {} seconds",
iterations * delay_seconds
"Still waiting for Deprovisioning to complete after {} seconds",
delay
);
}
}
Expand Down

0 comments on commit af40c14

Please sign in to comment.