From 7c4ad879f74cefc5d53db29f8444ff478684eeb6 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 19 Nov 2024 01:16:52 -0500 Subject: [PATCH] respond with mempool verification result after a transaction has been inserted or has failed to be inserted into the mempool --- zebra-node-services/src/mempool.rs | 5 +- zebrad/src/components/mempool.rs | 31 +++++-- zebrad/src/components/mempool/downloads.rs | 90 +++++++++++-------- zebrad/src/components/mempool/tests/vector.rs | 16 ++-- 4 files changed, 82 insertions(+), 60 deletions(-) diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 793e5b1fe3b..7a912f49f15 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -16,12 +16,9 @@ use zebra_chain::transaction::VerifiedUnminedTx; use crate::BoxError; mod gossip; - mod transaction_dependencies; -pub use transaction_dependencies::TransactionDependencies; - -pub use self::gossip::Gossip; +pub use self::{gossip::Gossip, transaction_dependencies::TransactionDependencies}; /// A mempool service request. /// diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index bc5796df615..960aa2c7b01 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -28,7 +28,6 @@ use std::{ use futures::{future::FutureExt, stream::Stream}; use tokio::sync::{broadcast, oneshot}; -use tokio_stream::StreamExt; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ @@ -43,7 +42,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; -use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus}; +use crate::components::sync::SyncStatus; pub mod config; mod crawler; @@ -586,10 +585,8 @@ impl Service for Mempool { let best_tip_height = self.latest_chain_tip.best_tip_height(); // Clean up completed download tasks and add to mempool if successful. - while let Poll::Ready(Some(r)) = - pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx) - { - match r { + while let Poll::Ready(Some((result, rsp_tx))) = pin!(&mut *tx_downloads).poll_next(cx) { + match result { Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => { // # Correctness: // @@ -609,27 +606,45 @@ impl Service for Mempool { // Save transaction ids that we will send to peers send_to_peers_ids.insert(inserted_id); } + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = rsp_tx + .send(insert_result.map(|_| ()).map_err(|err| err.into())); + } } else { tracing::trace!("chain grew during tx verification, retrying ..",); // We don't care if re-queueing the transaction request fails. let _result = tx_downloads - .download_if_needed_and_verify(tx.transaction.into(), None); + .download_if_needed_and_verify(tx.transaction.into(), rsp_tx); } } Ok(Err((tx_id, error))) => { tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify"); metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1); + storage.reject_if_needed(tx_id, error); + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = + rsp_tx.send(Err("timeout waiting for verification result".into())); + } } - Err(_elapsed) => { + Err(elapsed) => { // A timeout happens when the stream hangs waiting for another service, // so there is no specific transaction ID. tracing::info!("mempool transaction failed to verify due to timeout"); metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1); + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = rsp_tx.send(Err(elapsed.into())); + } } }; } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 45fd44a7c05..91d5a05c20e 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -54,7 +54,10 @@ use zebra_network as zn; use zebra_node_services::mempool::Gossip; use zebra_state::{self as zs, CloneError}; -use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; +use crate::components::{ + mempool::crawler::RATE_LIMIT_DELAY, + sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, +}; use super::MempoolError; @@ -152,16 +155,20 @@ where /// A list of pending transaction download and verify tasks. #[pin] pending: FuturesUnordered< - JoinHandle< + JoinHandle<( Result< - ( - VerifiedUnminedTx, - Vec, - Option, - ), - (TransactionDownloadVerifyError, UnminedTxId), + Result< + ( + VerifiedUnminedTx, + Vec, + Option, + ), + (TransactionDownloadVerifyError, UnminedTxId), + >, + tokio::time::error::Elapsed, >, - >, + Option>>, + )>, >, /// A list of channels that can be used to cancel pending transaction download and @@ -178,14 +185,20 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - type Item = Result< - ( - VerifiedUnminedTx, - Vec, - Option, - ), - (UnminedTxId, TransactionDownloadVerifyError), - >; + type Item = ( + Result< + Result< + ( + VerifiedUnminedTx, + Vec, + Option, + ), + (UnminedTxId, TransactionDownloadVerifyError), + >, + tokio::time::error::Elapsed, + >, + Option>>, + ); fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -198,20 +211,28 @@ where // task is scheduled for wakeup when the next task becomes ready. // // TODO: this would be cleaner with poll_map (#2693) - if let Some(join_result) = ready!(this.pending.poll_next(cx)) { - match join_result.expect("transaction download and verify tasks must not panic") { - Ok((tx, spent_mempool_outpoints, tip_height)) => { + let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) { + let (result, rsp_tx) = + join_result.expect("transaction download and verify tasks must not panic"); + + let result = match result { + Ok(Ok((tx, spent_mempool_outpoints, tip_height))) => { this.cancel_handles.remove(&tx.transaction.id); - Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height)))) + Ok(Ok((tx, spent_mempool_outpoints, tip_height))) } - Err((e, hash)) => { + Ok(Err((e, hash))) => { this.cancel_handles.remove(&hash); - Poll::Ready(Some(Err((hash, e)))) + Ok(Err((hash, e))) } - } + Err(elapsed) => Err(elapsed), + }; + + Some((result, rsp_tx)) } else { - Poll::Ready(None) - } + None + }; + + Poll::Ready(item) } fn size_hint(&self) -> (usize, Option) { @@ -389,29 +410,20 @@ where .in_current_span(); let task = tokio::spawn(async move { + let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut); + // Prefer the cancel handle if both are ready. let result = tokio::select! { biased; _ = &mut cancel_rx => { trace!("task cancelled prior to completion"); metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1); - Err((TransactionDownloadVerifyError::Cancelled, txid)) + Ok(Err((TransactionDownloadVerifyError::Cancelled, txid))) } verification = fut => verification, }; - // Send the result to responder channel if one was provided. - // TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`. - if let Some(rsp_tx) = rsp_tx { - let _ = rsp_tx.send( - result - .as_ref() - .map(|_| ()) - .map_err(|(err, _)| err.clone().into()), - ); - } - - result + (result, rsp_tx) }); self.pending.push(task); diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 1b87097aaf1..9b7af4f6453 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -978,15 +978,7 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> { let result_rx = results.remove(0).expect("should pass initial checks"); assert!(results.is_empty(), "should have 1 result for 1 queued tx"); - tokio::time::timeout(Duration::from_secs(10), result_rx) - .await - .expect("should not time out") - .expect("mempool tx verification result channel should not be closed") - .expect("mocked verification should be successful"); - - // Wait for next steps in mempool's Downloads to finish - // TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after - // waiting to respond with a transaction's verification result until after it's been inserted into the mempool. + // Wait for post-verification steps in mempool's Downloads tokio::time::sleep(Duration::from_secs(1)).await; mempool @@ -994,6 +986,12 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> { .await .expect("polling mempool should succeed"); + tokio::time::timeout(Duration::from_secs(10), result_rx) + .await + .expect("should not time out") + .expect("mempool tx verification result channel should not be closed") + .expect("mocked verification should be successful"); + assert_eq!( mempool.storage().transaction_count(), 1,