Skip to content

Commit

Permalink
Merge branch '202405-mempool-sync' into new-index
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Oct 1, 2024
2 parents 6d182d8 + 7a068bf commit 5168871
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 131 deletions.
23 changes: 9 additions & 14 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down Expand Up @@ -81,14 +82,11 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain());
signal.wait(Duration::from_secs(5), false)?;
},
}

while !Mempool::update(&mempool, &daemon, &tip)? {
// Mempool syncing was aborted because the chain tip moved;
// Index the new block(s) and try again.
tip = indexer.update(&daemon)?;
}

#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -117,7 +115,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
));

loop {

main_loop_count.inc();

if let Err(err) = signal.wait(Duration::from_secs(5), true) {
Expand All @@ -130,14 +127,12 @@ fn run_server(config: Arc<Config>) -> Result<()> {
// Index new blocks
let current_tip = daemon.getbestblockhash()?;
if current_tip != tip {
indexer.update(&daemon)?;
tip = current_tip;
tip = indexer.update(&daemon)?;
};

// Update mempool
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!("Error updating mempool, skipping mempool update: {}", e.display_chain());
if !Mempool::update(&mempool, &daemon, &tip)? {
warn!("skipped failed mempool update, trying again in 5 seconds");
}

// Update subscribed clients
Expand Down
1 change: 1 addition & 0 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn main() {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal,
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Config {
pub daemon_dir: PathBuf,
pub blocks_dir: PathBuf,
pub daemon_rpc_addr: SocketAddr,
pub daemon_parallelism: usize,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub http_addr: SocketAddr,
Expand Down Expand Up @@ -138,6 +139,12 @@ impl Config {
.help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("daemon_parallelism")
.long("daemon-parallelism")
.help("Number of JSONRPC requests to send in parallel")
.default_value("4")
)
.arg(
Arg::with_name("monitoring_addr")
.long("monitoring-addr")
Expand Down Expand Up @@ -396,6 +403,7 @@ impl Config {
daemon_dir,
blocks_dir,
daemon_rpc_addr,
daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize),
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
Expand Down
133 changes: 88 additions & 45 deletions src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::env;
use std::io::{BufRead, BufReader, Lines, Write};
Expand All @@ -8,8 +9,9 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use base64::prelude::{Engine, BASE64_STANDARD};
use error_chain::ChainedError;
use hex::FromHex;
use itertools::Itertools;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -79,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option<i64> {

fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result<Value> {
if let Some(reply_obj) = reply.as_object_mut() {
if let Some(err) = reply_obj.get("error") {
if let Some(err) = reply_obj.get_mut("error") {
if !err.is_null() {
if let Some(code) = parse_error_code(&err) {
match code {
// RPC_IN_WARMUP -> retry by later reconnection
-28 => bail!(ErrorKind::Connection(err.to_string())),
_ => bail!("{} RPC error: {}", method, err),
code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())),
}
}
}
Expand Down Expand Up @@ -248,7 +250,7 @@ impl Connection {
Ok(if status == "HTTP/1.1 200 OK" {
contents
} else if status == "HTTP/1.1 500 Internal Server Error" {
warn!("HTTP status: {}", status);
debug!("RPC HTTP 500 error: {}", contents);
contents // the contents should have a JSONRPC error field
} else {
bail!(
Expand Down Expand Up @@ -287,6 +289,8 @@ pub struct Daemon {
message_id: Counter, // for monotonic JSONRPC 'id'
signal: Waiter,

rpc_threads: Arc<rayon::ThreadPool>,

// monitoring
latency: HistogramVec,
size: HistogramVec,
Expand All @@ -297,6 +301,7 @@ impl Daemon {
daemon_dir: &PathBuf,
blocks_dir: &PathBuf,
daemon_rpc_addr: SocketAddr,
daemon_parallelism: usize,
cookie_getter: Arc<dyn CookieGetter>,
network: Network,
signal: Waiter,
Expand All @@ -313,6 +318,13 @@ impl Daemon {
)?),
message_id: Counter::new(),
signal: signal.clone(),
rpc_threads: Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap(),
),
latency: metrics.histogram_vec(
HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"),
&["method"],
Expand Down Expand Up @@ -361,6 +373,7 @@ impl Daemon {
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
message_id: Counter::new(),
signal: self.signal.clone(),
rpc_threads: self.rpc_threads.clone(),
latency: self.latency.clone(),
size: self.size.clone(),
})
Expand Down Expand Up @@ -398,33 +411,18 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
} else {
bail!("non-array replies: {:?}", replies);
}
}

Ok(results)
let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req)?;
parse_jsonrpc_reply(reply, method, id)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request(&self, method: &str, params: &Value) -> Result<Value> {
loop {
match self.handle_request_batch(method, params_list) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
match self.handle_request(method, &params) {
Err(e @ Error(ErrorKind::Connection(_), _)) => {
warn!("reconnecting to bitcoind: {}", e.display_chain());
self.signal.wait(Duration::from_secs(3), false)?;
let mut conn = self.conn.lock().unwrap();
*conn = conn.reconnect()?;
Expand All @@ -436,13 +434,47 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
self.retry_request(method, &params)
}

fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
loop {
match self.reconnect() {
Ok(daemon) => break daemon,
Err(e) => {
warn!("failed connecting to RPC daemon: {}", e.display_chain());
}
}
}
}

// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
fn requests_iter<'a>(
&'a self,
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + IndexedParallelIterator + 'a {
self.rpc_threads.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
})
})
}

// bitcoind JSONRPC API:
Expand All @@ -468,12 +500,12 @@ impl Daemon {
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
.requests("getblockhash", &heights)?
.requests("getblockhash", heights)?
.into_iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let mut result = vec![];
for h in self.requests("getblockheader", &params_list)? {
for h in self.requests("getblockheader", params_list)? {
result.push(header_from_value(h)?);
}
Ok(result)
Expand All @@ -495,27 +527,38 @@ impl Daemon {
.iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let values = self.requests("getblock", &params_list)?;
let values = self.requests("getblock", params_list)?;
let mut blocks = vec![];
for value in values {
blocks.push(block_from_value(value)?);
}
Ok(blocks)
}

pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result<Vec<Transaction>> {
let params_list: Vec<Value> = txhashes
/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

let params_list: Vec<Value> = txids
.iter()
.map(|txhash| json!([txhash, /*verbose=*/ false]))
.collect();

let values = self.requests("getrawtransaction", &params_list)?;
let mut txs = vec![];
for value in values {
txs.push(tx_from_value(value)?);
}
assert_eq!(txhashes.len(), txs.len());
Ok(txs)
self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
}

pub fn gettransaction_raw(
Expand Down Expand Up @@ -556,7 +599,7 @@ impl Daemon {
let params_list: Vec<Value> = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect();

Ok(self
.requests("estimatesmartfee", &params_list)?
.requests("estimatesmartfee", params_list)?
.iter()
.zip(conf_targets)
.filter_map(|(reply, target)| {
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ error_chain! {
display("Connection error: {}", msg)
}

RpcError(code: i64, error: serde_json::Value, method: String) {
description("RPC error")
display("{} RPC error {}: {}", method, code, error)
}

Interrupt(sig: i32) {
description("Interruption by external signal")
display("Iterrupted by signal {}", sig)
Expand Down
Loading

0 comments on commit 5168871

Please sign in to comment.