Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional structured logs of RPC related events #66

Merged
merged 12 commits into from
Feb 24, 2024
36 changes: 36 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct Config {
pub utxos_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
pub electrum_rpc_logging: Option<RpcLogging>,

#[cfg(feature = "liquid")]
pub parent_network: BNetwork,
Expand All @@ -65,6 +66,10 @@ fn str_to_socketaddr(address: &str, what: &str) -> SocketAddr {
impl Config {
pub fn from_args() -> Config {
let network_help = format!("Select network type ({})", Network::names().join(", "));
let rpc_logging_help = format!(
"Select RPC logging option ({})",
RpcLogging::options().join(", ")
);

let args = App::new("Electrum Rust Server")
.version(crate_version!())
Expand Down Expand Up @@ -181,6 +186,11 @@ impl Config {
.long("electrum-banner")
.help("Welcome banner for the Electrum server, shown in the console to clients.")
.takes_value(true)
).arg(
Arg::with_name("electrum_rpc_logging")
.long("electrum-rpc-logging")
.help(&rpc_logging_help)
.takes_value(true),
);

#[cfg(unix)]
Expand Down Expand Up @@ -381,6 +391,9 @@ impl Config {
electrum_rpc_addr,
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
electrum_banner,
electrum_rpc_logging: m
.value_of("electrum_rpc_logging")
.map(|option| RpcLogging::from(option)),
http_addr,
http_socket_file,
monitoring_addr,
Expand Down Expand Up @@ -420,6 +433,29 @@ impl Config {
}
}

#[derive(Debug, Clone)]
pub enum RpcLogging {
Full,
NoParams,
}

impl RpcLogging {
pub fn options() -> Vec<String> {
return vec!["full".to_string(), "no-params".to_string()];
}
}

impl From<&str> for RpcLogging {
fn from(option: &str) -> Self {
match option {
"full" => RpcLogging::Full,
"no-params" => RpcLogging::NoParams,

_ => panic!("unsupported RPC logging option: {:?}", option),
}
}
}

pub fn get_network_subdir(network: Network) -> Option<&'static str> {
match network {
#[cfg(not(feature = "liquid"))]
Expand Down
72 changes: 66 additions & 6 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;

use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use crypto::digest::Digest;
Expand All @@ -18,7 +19,7 @@ use bitcoin::consensus::encode::serialize_hex;
use elements::encode::serialize_hex;

use crate::chain::Txid;
use crate::config::Config;
use crate::config::{Config, RpcLogging};
use crate::electrum::{get_electrum_height, ProtocolVersion};
use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
Expand Down Expand Up @@ -90,6 +91,14 @@ fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<F
}
}

macro_rules! conditionally_log_rpc_event {
($self:ident, $event:expr) => {
if $self.rpc_logging.is_some() {
$self.log_rpc_event($event);
}
};
}

struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
Expand All @@ -101,6 +110,7 @@ struct Connection {
txs_limit: usize,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
rpc_logging: Option<RpcLogging>,
}

impl Connection {
Expand All @@ -111,6 +121,7 @@ impl Connection {
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
rpc_logging: Option<RpcLogging>,
) -> Connection {
Connection {
query,
Expand All @@ -123,6 +134,7 @@ impl Connection {
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
}
}

Expand Down Expand Up @@ -490,6 +502,17 @@ impl Connection {
Ok(result)
}

fn log_rpc_event(&self, mut log: Value) {
log.as_object_mut().unwrap().insert(
"source".into(),
json!({
"ip": self.addr.ip().to_string(),
"port": self.addr.port(),
}),
);
println!("ELECTRUM-RPC-LOGGER: {}", log);
}

fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
let line = value.to_string() + "\n";
Expand All @@ -508,7 +531,7 @@ impl Connection {
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
match (
cmd.get("method"),
cmd.get("params").unwrap_or_else(|| &empty_params),
cmd.get("id"),
Expand All @@ -517,10 +540,41 @@ impl Connection {
Some(&Value::String(ref method)),
&Value::Array(ref params),
Some(ref id),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
) => {
conditionally_log_rpc_event!(
self,
json!({
"event": "rpc request",
"id": id,
"method": method,
"params": if let Some(RpcLogging::Full) = self.rpc_logging {
json!(params)
} else {
Value::Null
}
})
);

let start_time = Instant::now();
let reply = self.handle_command(method, params, id)?;

conditionally_log_rpc_event!(
self,
json!({
"event": "rpc response",
"method": method,
"payload_size": reply.to_string().as_bytes().len(),
"duration_µs": start_time.elapsed().as_micros(),
"id": id,
})
);

self.send_values(&[reply])?
}
_ => {
bail!("invalid command: {}", cmd)
}
}
}
Message::PeriodicUpdate => {
let values = self
Expand Down Expand Up @@ -563,6 +617,8 @@ impl Connection {

pub fn run(mut self) {
self.stats.clients.inc();
conditionally_log_rpc_event!(self, json!({ "event": "connection established" }));

let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
Expand All @@ -579,6 +635,8 @@ impl Connection {
.sub(self.status_hashes.len() as i64);

debug!("[{}] shutting down connection", self.addr);
conditionally_log_rpc_event!(self, json!({ "event": "connection closed" }));

let _ = self.stream.shutdown(Shutdown::Both);
if let Err(err) = child.join().expect("receiver panicked") {
error!("[{}] receiver failed: {}", self.addr, err);
Expand Down Expand Up @@ -741,6 +799,7 @@ impl RPC {
let garbage_sender = garbage_sender.clone();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let rpc_logging = config.electrum_rpc_logging.clone();

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
Expand All @@ -752,6 +811,7 @@ impl RPC {
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
Expand Down
1 change: 1 addition & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl TestRunner {
utxos_limit: 100,
electrum_txs_limit: 100,
electrum_banner: "".into(),
electrum_rpc_logging: None,

#[cfg(feature = "liquid")]
asset_db_path: None, // XXX
Expand Down
Loading