Skip to content

Commit

Permalink
respond with mempool verification result after a transaction has been…
Browse files Browse the repository at this point in the history
… inserted or has failed to be inserted into the mempool
  • Loading branch information
arya2 committed Nov 19, 2024
1 parent 1d63b0e commit 7c4ad87
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 60 deletions.
5 changes: 1 addition & 4 deletions zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ use zebra_chain::transaction::VerifiedUnminedTx;
use crate::BoxError;

mod gossip;

mod transaction_dependencies;

pub use transaction_dependencies::TransactionDependencies;

pub use self::gossip::Gossip;
pub use self::{gossip::Gossip, transaction_dependencies::TransactionDependencies};

/// A mempool service request.
///
Expand Down
31 changes: 23 additions & 8 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

use zebra_chain::{
Expand All @@ -43,7 +42,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus};
use crate::components::sync::SyncStatus;

pub mod config;
mod crawler;
Expand Down Expand Up @@ -586,10 +585,8 @@ impl Service<Request> for Mempool {
let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) =
pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx)
{
match r {
while let Poll::Ready(Some((result, rsp_tx))) = pin!(&mut *tx_downloads).poll_next(cx) {
match result {
Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => {
// # Correctness:
//
Expand All @@ -609,27 +606,45 @@ impl Service<Request> for Mempool {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx
.send(insert_result.map(|_| ()).map_err(|err| err.into()));
}
} else {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result = tx_downloads
.download_if_needed_and_verify(tx.transaction.into(), None);
.download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
}
}
Ok(Err((tx_id, error))) => {
tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);

storage.reject_if_needed(tx_id, error);

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ =
rsp_tx.send(Err("timeout waiting for verification result".into()));
}
}
Err(_elapsed) => {
Err(elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
// so there is no specific transaction ID.

tracing::info!("mempool transaction failed to verify due to timeout");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(Err(elapsed.into()));
}
}
};
}
Expand Down
90 changes: 51 additions & 39 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::{self as zs, CloneError};

use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::components::{
mempool::crawler::RATE_LIMIT_DELAY,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};

use super::MempoolError;

Expand Down Expand Up @@ -152,16 +155,20 @@ where
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<
JoinHandle<
JoinHandle<(
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
>,
tokio::time::error::Elapsed,
>,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
)>,
>,

/// A list of channels that can be used to cancel pending transaction download and
Expand All @@ -178,14 +185,20 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>;
type Item = (
Result<
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>,
tokio::time::error::Elapsed,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -198,20 +211,28 @@ where
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok((tx, spent_mempool_outpoints, tip_height)) => {
let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
let (result, rsp_tx) =
join_result.expect("transaction download and verify tasks must not panic");

let result = match result {
Ok(Ok((tx, spent_mempool_outpoints, tip_height))) => {
this.cancel_handles.remove(&tx.transaction.id);
Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height))))
Ok(Ok((tx, spent_mempool_outpoints, tip_height)))
}
Err((e, hash)) => {
Ok(Err((e, hash))) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err((hash, e))))
Ok(Err((hash, e)))
}
}
Err(elapsed) => Err(elapsed),
};

Some((result, rsp_tx))
} else {
Poll::Ready(None)
}
None
};

Poll::Ready(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down Expand Up @@ -389,29 +410,20 @@ where
.in_current_span();

let task = tokio::spawn(async move {
let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);

// Prefer the cancel handle if both are ready.
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
Err((TransactionDownloadVerifyError::Cancelled, txid))
Ok(Err((TransactionDownloadVerifyError::Cancelled, txid)))
}
verification = fut => verification,
};

// Send the result to responder channel if one was provided.
// TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
}

result
(result, rsp_tx)
});

self.pending.push(task);
Expand Down
16 changes: 7 additions & 9 deletions zebrad/src/components/mempool/tests/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,22 +978,20 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> {
let result_rx = results.remove(0).expect("should pass initial checks");
assert!(results.is_empty(), "should have 1 result for 1 queued tx");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

// Wait for next steps in mempool's Downloads to finish
// TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after
// waiting to respond with a transaction's verification result until after it's been inserted into the mempool.
// Wait for post-verification steps in mempool's Downloads
tokio::time::sleep(Duration::from_secs(1)).await;

mempool
.ready()
.await
.expect("polling mempool should succeed");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

assert_eq!(
mempool.storage().transaction_count(),
1,
Expand Down

0 comments on commit 7c4ad87

Please sign in to comment.