Skip to content

Commit

Permalink
feat: bls transaction batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Nov 16, 2024
1 parent 820c180 commit 3ae3ea9
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 8 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,17 @@ alloy = { version = "0.5.3", features = [
"providers",
"provider-http",
"signers",
"consensus",
] }
alloy-network = { version = "0.5.3" }
alloy-primitives = { version = "0.8.7" }
alloy-network = { version = "0.5.3" }
alloy-rpc-types = { version = "0.5.3" }
alloy-rpc-types-beacon = { version = "0.5.3" }
alloy-signer-local = { version = "0.5.3", features = ["mnemonic"] }

# tokio
tokio = { version = "1.21", default-features = false }
tokio-stream = "0.1"

# reth
reth-chainspec = { git = "https://github.com/paradigmxyz/reth.git", rev = "e98a050" }
Expand Down
2 changes: 2 additions & 0 deletions bin/odyssey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ reth-node-builder.workspace = true
reth-optimism-node.workspace = true
reth-optimism-cli.workspace = true
reth-provider.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
clap = { workspace = true, features = ["derive"] }

[features]
Expand Down
24 changes: 22 additions & 2 deletions bin/odyssey/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@
//! - `min-debug-logs`: Disables all logs below `debug` level.
//! - `min-trace-logs`: Disables all logs below `trace` level.

use std::time::Duration;

use alloy_network::EthereumWallet;
use alloy_primitives::Address;
use alloy_signer_local::PrivateKeySigner;
use clap::Parser;
use eyre::Context;
use odyssey_node::{chainspec::OdysseyChainSpecParser, node::OdysseyNode};
use odyssey_wallet::{OdysseyWallet, OdysseyWalletApiServer};
use odyssey_wallet::{BlsTransactionBatcher, OdysseyWallet, OdysseyWalletApiServer};
use odyssey_walltime::{OdysseyWallTime, OdysseyWallTimeRpcApiServer};
use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher};
use reth_optimism_cli::Cli;
use reth_optimism_node::{args::RollupArgs, node::OptimismAddOns};
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{info, warn};

#[global_allocator]
Expand Down Expand Up @@ -70,13 +74,29 @@ fn main() {
.collect::<Result<_, _>>()
.wrap_err("No valid EXP0001 delegations specified")?;

let chain_id = ctx.config().chain.chain().id();
let (bls_batcher_tx, bls_batcher_rx) = mpsc::unbounded_channel();

let batch_gas_limit = 3_500_000;
let wallet_ = wallet.clone();
let eth_api = ctx.registry.eth_api().clone();
tokio::task::spawn(BlsTransactionBatcher::new(
chain_id,
wallet_,
Duration::from_millis(100),
batch_gas_limit,
eth_api,
UnboundedReceiverStream::from(bls_batcher_rx),
));

ctx.modules.merge_configured(
OdysseyWallet::new(
ctx.provider().clone(),
wallet,
ctx.registry.eth_api().clone(),
ctx.config().chain.chain().id(),
chain_id,
valid_delegations,
bls_batcher_tx,
)
.into_rpc(),
)?;
Expand Down
6 changes: 5 additions & 1 deletion crates/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ keywords.workspace = true
categories.workspace = true

[dependencies]
alloy.workspace = true
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-rpc-types.workspace = true
alloy-rpc-types-beacon.workspace = true

reth-primitives.workspace = true
reth-storage-api.workspace = true
Expand All @@ -23,7 +25,9 @@ jsonrpsee = { workspace = true, features = ["server", "macros"] }
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio = { workspace = true, features = ["sync", "time"] }
tokio-stream.workspace = true
futures.workspace = true

metrics.workspace = true
metrics-derive.workspace = true
Expand Down
231 changes: 231 additions & 0 deletions crates/wallet/src/bls_batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use crate::OdysseyWalletError;
use alloy::{sol, sol_types::SolCall};
use alloy_network::{
eip2718::Encodable2718, Ethereum, EthereumWallet, NetworkWallet, TransactionBuilder,
};
use alloy_primitives::{bytes::BytesMut, Address, Bytes, ChainId, TxHash, TxKind};
use alloy_rpc_types::{BlockId, TransactionInput, TransactionRequest};
use alloy_rpc_types_beacon::BlsPublicKey;
use futures::{stream::FuturesUnordered, StreamExt};
use jsonrpsee::core::RpcResult;
use reth_rpc_eth_api::helpers::{EthCall, EthTransactions, FullEthApi, LoadFee, LoadState};
use std::{
collections::{HashMap, VecDeque},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{
sync::Mutex,
time::{interval, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*;

const BATCH_DELEGATION_CONTRACT: Address = Address::new([0; 20]);

sol! {
contract BlsAggregator {
struct CallByUser {
address user;
bytes pubkey;
bytes calls;
}

function executeAggregated(
bytes calldata signature,
CallByUser[] memory callsByUser
) external;
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct BlsData {
signature: Bytes,
pubkey: BlsPublicKey,
}

#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct BlsTransactionBatcher<Eth> {
chain_id: ChainId,
/// Ethereum wallet. Idea is that a wallet used for BLS signature aggregation would be
/// different than the one in the regular flow.
wallet: EthereumWallet,
interval: Interval,
batch_gas_limit: u64,
eth_api: Eth,
transaction_stream: UnboundedReceiverStream<(TransactionRequest, BlsData)>,
batch: VecDeque<(TransactionRequest, BlsData)>, // (sig, validated tx, request)
/// Used to guard tx signing.
permit: Arc<Mutex<()>>,
/// The pending requests that were sent to the sequencer.
pending_requests: FuturesUnordered<Pin<Box<dyn Future<Output = RpcResult<TxHash>> + Send>>>,
}

impl<Eth> BlsTransactionBatcher<Eth> {
/// Create new BLS transaction batcher.
pub fn new(
chain_id: ChainId,
wallet: EthereumWallet,
period: Duration,
batch_gas_limit: u64,
eth_api: Eth,
transaction_stream: UnboundedReceiverStream<(TransactionRequest, BlsData)>,
) -> Self {
Self {
chain_id,
wallet,
interval: interval(period),
batch_gas_limit,
eth_api,
transaction_stream,
batch: VecDeque::new(),
permit: Arc::<Mutex<()>>::default(),
pending_requests: FuturesUnordered::new(),
}
}
}

impl<Eth> Future for BlsTransactionBatcher<Eth>
where
Eth: FullEthApi + Clone + Send + Sync + Unpin + 'static,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
if let Poll::Ready(Some(item)) = this.transaction_stream.poll_next_unpin(cx) {
this.batch.push_back(item);
continue;
}

if let Poll::Ready(Some(pending)) = this.pending_requests.poll_next_unpin(cx) {
match pending {
Ok(tx_hash) => {
debug!(target: "rpc::wallet::bls_aggregator", %tx_hash, "Batch transaction successfully submitted to the pool");
}
Err(err) => {
error!(target: "rpc::wallet::bls_aggregator", ?err, "Error sending batch transaction");
}
}
continue;
}

if this.interval.poll_tick(cx).is_ready() {
let mut cumulative_gas = 0;
let mut current_batch = Vec::new();
while let Some((tx, _sig)) = this.batch.front() {
let tx_gas = tx.gas.unwrap();
if cumulative_gas + tx_gas > this.batch_gas_limit {
break;
}
cumulative_gas += tx_gas;
current_batch.push(this.batch.pop_front().unwrap());
}

if !current_batch.is_empty() {
let chain_id = this.chain_id;
let wallet = this.wallet.clone();
let eth_api = this.eth_api.clone();
let permit = this.permit.clone();
this.pending_requests.push(Box::pin(async move {
let permit = permit.lock().await;
construct_and_send_batch_transaction(
chain_id,
wallet,
eth_api,
current_batch,
cumulative_gas,
)
.await
}));
}

continue;
}

return Poll::Pending;
}
}
}

async fn construct_and_send_batch_transaction<Eth: FullEthApi>(
chain_id: ChainId,
wallet: EthereumWallet,
eth_api: Eth,
batch: Vec<(TransactionRequest, BlsData)>,
cumulative_gas: u64,
) -> RpcResult<TxHash> {
let signature = Bytes::new();
let mut aggregated = HashMap::<Address, HashMap<BlsPublicKey, Vec<Bytes>>>::default();
for (tx, bls_data) in batch {
// TODO: aggregate signature
let to_address = *tx.to.unwrap().to().unwrap();
let input = tx.input.input().take().unwrap().clone();
aggregated.entry(to_address).or_default().entry(bls_data.pubkey).or_default().push(input);
}

let mut calls_by_user = Vec::new();
for (user, by_key) in aggregated {
for (pubkey, inputs) in by_key {
let mut calls = BytesMut::default();
for input in inputs {
calls.extend(input);
}
calls_by_user.push(BlsAggregator::CallByUser {
user,
pubkey: Bytes::copy_from_slice(pubkey.as_slice()),
calls: calls.freeze().into(),
});
}
}
let input = Bytes::from(
BlsAggregator::executeAggregatedCall { signature, callsByUser: calls_by_user }.abi_encode(),
);

let next_nonce = LoadState::next_available_nonce(
&eth_api,
NetworkWallet::<Ethereum>::default_signer_address(&wallet),
)
.await
.map_err(Into::into)?;
let mut request = TransactionRequest {
chain_id: Some(chain_id),
nonce: Some(next_nonce),
to: Some(TxKind::Call(BATCH_DELEGATION_CONTRACT)),
input: TransactionInput::from(input),
gas: Some(cumulative_gas),
..Default::default()
};

let (estimate, base_fee) = tokio::join!(
EthCall::estimate_gas_at(&eth_api, request.clone(), BlockId::latest(), None),
LoadFee::eip1559_fees(&eth_api, None, None)
);
let estimate = estimate.map_err(Into::into)?;
let (base_fee, _) = base_fee.map_err(Into::into)?;

// Finish the request
let max_priority_fee_per_gas = 1_000_000_000; // 1 gwei
request.max_fee_per_gas = Some(base_fee.to::<u128>() + max_priority_fee_per_gas);
request.max_priority_fee_per_gas = Some(max_priority_fee_per_gas);
request.gas = Some(estimate.to());

// build and sign
let envelope = <TransactionRequest as TransactionBuilder<Ethereum>>::build::<EthereumWallet>(
request, &wallet,
)
.await
.map_err(|_| OdysseyWalletError::InvalidTransactionRequest)?;

EthTransactions::send_raw_transaction(&eth_api, envelope.encoded_2718().into())
.await
.inspect_err(
|err| warn!(target: "rpc::wallet::bls_aggregator", ?err, "Error adding batch tx to pool"),
)
.map_err(Into::into)
}
Loading

0 comments on commit 3ae3ea9

Please sign in to comment.