diff --git a/coordinator/src/handlers/data_layer.rs b/coordinator/src/handlers/data_layer.rs index 3e2df54d..5b2a5a4c 100644 --- a/coordinator/src/handlers/data_layer.rs +++ b/coordinator/src/handlers/data_layer.rs @@ -14,6 +14,8 @@ use crate::indexer_config::IndexerConfig; type TaskId = String; +const TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes + #[derive(Clone)] pub struct DataLayerHandler { client: DataLayerClient, @@ -95,8 +97,6 @@ impl DataLayerHandler { } pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { - tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); - let start_task_result = self.start_provisioning_task(indexer_config).await; if let Err(error) = start_task_result { @@ -110,24 +110,36 @@ impl DataLayerHandler { let task_id = start_task_result.unwrap(); + tracing::info!(?task_id, "Started provisioning task"); + let mut iterations = 0; let delay_seconds = 1; loop { - if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { - break; + match self.get_task_status(task_id.clone()).await? { + TaskStatus::Pending => {} + TaskStatus::Complete => break, + TaskStatus::Failed | TaskStatus::Unspecified => { + tracing::warn!("Provisioning task failed"); + anyhow::bail!("Provisioning task failed") + } } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; iterations += 1; - if iterations * delay_seconds % 60 == 0 { + if iterations * delay_seconds % 10 == 0 { + let delay = iterations * delay_seconds; + + if delay > TASK_TIMEOUT_SECONDS { + tracing::warn!("Provisioning task timed out"); + anyhow::bail!("Provisioning task timed out"); + } + tracing::warn!( - ?indexer_config.account_id, - ?indexer_config.function_name, "Still waiting for provisioning to complete after {} seconds", - iterations * delay_seconds + delay ); } } @@ -140,30 +152,40 @@ impl DataLayerHandler { account_id: AccountId, function_name: String, ) -> anyhow::Result<()> { - tracing::info!(?account_id, ?function_name, "Deprovisioning data layer"); - let task_id = self .start_deprovisioning_task(account_id.clone(), function_name.clone()) .await?; + tracing::info!(?task_id, "Started deprovisioning task"); + let mut iterations = 0; let delay_seconds = 1; loop { - if self.get_task_status(task_id.clone()).await? == TaskStatus::Complete { - break; + match self.get_task_status(task_id.clone()).await? { + TaskStatus::Pending => {} + TaskStatus::Complete => break, + TaskStatus::Failed | TaskStatus::Unspecified => { + tracing::warn!("Deprovisioning task failed"); + anyhow::bail!("Deprovisioning task failed") + } } tokio::time::sleep(std::time::Duration::from_secs(delay_seconds)).await; iterations += 1; - if iterations * delay_seconds % 60 == 0 { + if iterations * delay_seconds % 10 == 0 { + let delay = iterations * delay_seconds; + + if delay > TASK_TIMEOUT_SECONDS { + tracing::warn!("Deprovisioning task timed out"); + anyhow::bail!("Deprovisioning task timed out"); + } + tracing::warn!( - ?account_id, - ?function_name, - "Still waiting for deprovisioning to complete after {} seconds", - iterations * delay_seconds + "Still waiting for Deprovisioning to complete after {} seconds", + delay ); } }