-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Decreasing Processed Block Error #988
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,18 +120,22 @@ impl BlockStream { | |
} | ||
} | ||
|
||
fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> { | ||
fn start_health_monitoring_task( | ||
&self, | ||
redis: Arc<RedisClient>, | ||
start_block_height: near_indexer_primitives::types::BlockHeight, | ||
) -> JoinHandle<()> { | ||
tokio::spawn({ | ||
let config = self.indexer_config.clone(); | ||
let health = self.health.clone(); | ||
let redis_stream = self.redis_stream.clone(); | ||
let stalled_timeout_seconds = 120; | ||
|
||
async move { | ||
let mut last_processed_block = | ||
redis.get_last_processed_block(&config).await.unwrap(); | ||
|
||
let mut last_processed_block = Some(start_block_height - 1); | ||
loop { | ||
tokio::time::sleep(std::time::Duration::from_secs(120)).await; | ||
tokio::time::sleep(std::time::Duration::from_secs(stalled_timeout_seconds)) | ||
.await; | ||
|
||
let new_last_processed_block = | ||
if let Ok(block) = redis.get_last_processed_block(&config).await { | ||
|
@@ -183,6 +187,11 @@ impl BlockStream { | |
health_lock.processing_state = ProcessingState::Waiting; | ||
} | ||
Ordering::Equal => { | ||
tracing::error!( | ||
account_id = config.account_id.as_str(), | ||
function_name = config.function_name, | ||
"No block has been processed for {stalled_timeout_seconds} seconds" | ||
); | ||
health_lock.processing_state = ProcessingState::Stalled; | ||
} | ||
Ordering::Greater => { | ||
|
@@ -266,7 +275,8 @@ impl BlockStream { | |
|
||
let cancellation_token = tokio_util::sync::CancellationToken::new(); | ||
|
||
let monitor_handle = self.start_health_monitoring_task(redis.clone()); | ||
let monitor_handle = | ||
self.start_health_monitoring_task(redis.clone(), start_block_height.clone()); | ||
|
||
let stream_handle = self.start_block_stream_task( | ||
start_block_height, | ||
|
@@ -342,7 +352,7 @@ pub(crate) async fn start_block_stream( | |
.context("Failed while fetching and streaming bitmap indexer blocks")?; | ||
|
||
let last_indexed_near_lake_block = process_near_lake_blocks( | ||
last_bitmap_indexer_block, | ||
last_bitmap_indexer_block + 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did a +1 here too, as I suspect that the handoff results in a repeat processing of that particular block. The input is directly passed as the start block height argument for the stream. I believe the stream is capable of guessing forward if the specific block does not exist, since it uses list operations first before getting the files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you confirmed this? or just a suspicion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha. I did see a repeat in queryapi_indexer, but I wasn't 100% sure. I'll keep it as is and separately investigate that. |
||
lake_s3_client, | ||
lake_prefetch_size, | ||
redis, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use
warn
.error
should be for unrecoverable situations.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha