Skip to content

Commit

Permalink
Set max batch size (#183)
Browse files Browse the repository at this point in the history
* set max batch size

* fix lint issue

* added test

* fix comments

* fix lints
  • Loading branch information
shunsukew authored May 27, 2024
1 parent aa48c3a commit 7d63591
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 8 deletions.
1 change: 1 addition & 0 deletions benches/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ fn config() -> Config {
listen_address: SUBWAY_SERVER_ADDR.to_string(),
port: SUBWAY_SERVER_PORT,
max_connections: 1024 * 1024,
max_batch_size: None,
request_timeout_seconds: 120,
http_methods: Vec::new(),
cors: None,
Expand Down
1 change: 1 addition & 0 deletions configs/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extensions:
port: 9944
listen_address: '0.0.0.0'
max_connections: 2000
max_batch_size: 10
http_methods:
- path: /health
method: system_health
Expand Down
1 change: 1 addition & 0 deletions configs/eth_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ extensions:
port: 8545
listen_address: '0.0.0.0'
max_connections: 2000
max_batch_size: 10
cors: all

middlewares:
Expand Down
12 changes: 10 additions & 2 deletions src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use hyper::server::conn::AddrStream;
use hyper::service::Service;
use hyper::service::{make_service_fn, service_fn};
use jsonrpsee::server::{
middleware::rpc::RpcServiceBuilder, stop_channel, ws, RandomStringIdProvider, RpcModule, ServerBuilder,
ServerHandle,
middleware::rpc::RpcServiceBuilder, stop_channel, ws, BatchRequestConfig, RandomStringIdProvider, RpcModule,
ServerBuilder, ServerHandle,
};
use jsonrpsee::Methods;

Expand Down Expand Up @@ -61,6 +61,7 @@ pub struct ServerConfig {
pub port: u16,
pub listen_address: String,
pub max_connections: u32,
pub max_batch_size: Option<u32>,
#[serde(default)]
pub http_methods: Vec<HttpMethodsConfig>,
#[serde(default = "default_request_timeout_seconds")]
Expand Down Expand Up @@ -176,10 +177,17 @@ impl SubwayServerBuilder {
.map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))),
);

let batch_request_config = match config.max_batch_size {
Some(0) => BatchRequestConfig::Disabled,
Some(max_size) => BatchRequestConfig::Limit(max_size),
None => BatchRequestConfig::Unlimited,
};

let service_builder = ServerBuilder::default()
.set_rpc_middleware(rpc_middleware)
.set_http_middleware(http_middleware)
.max_connections(config.max_connections)
.set_batch_request_config(batch_request_config)
.set_id_provider(RandomStringIdProvider::new(16))
.to_service_builder();

Expand Down
95 changes: 89 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,9 @@ pub async fn build(config: Config) -> anyhow::Result<SubwayServerHandle> {
#[cfg(test)]
mod tests {
use jsonrpsee::{
core::client::ClientT,
core::{client::ClientT, params::BatchRequestBuilder},
rpc_params,
server::ServerBuilder,
server::ServerHandle,
server::{ServerBuilder, ServerHandle},
ws_client::{WsClient, WsClientBuilder},
RpcModule,
};
Expand All @@ -244,7 +243,12 @@ mod tests {
const PHO: &str = "call_pho";
const BAR: &str = "bar";

async fn subway_server(endpoint: String, port: u16, request_timeout_seconds: Option<u64>) -> SubwayServerHandle {
async fn subway_server(
endpoint: String,
port: u16,
request_timeout_seconds: Option<u64>,
max_batch_size: Option<u32>,
) -> SubwayServerHandle {
let config = Config {
extensions: ExtensionsConfig {
client: Some(ClientConfig {
Expand All @@ -255,6 +259,7 @@ mod tests {
listen_address: "127.0.0.1".to_string(),
port,
max_connections: 1024,
max_batch_size,
request_timeout_seconds: request_timeout_seconds.unwrap_or(10),
http_methods: Vec::new(),
cors: None,
Expand Down Expand Up @@ -337,7 +342,7 @@ mod tests {
#[tokio::test]
async fn null_param_works() {
let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9955").await;
let subway_server = subway_server(endpoint, 9944, None).await;
let subway_server = subway_server(endpoint, 9944, None, None).await;
let url = format!("ws://{}", subway_server.addr);
let client = ws_client(&url).await;
assert_eq!(BAR, client.request::<String, _>(PHO, rpc_params!()).await.unwrap());
Expand All @@ -349,7 +354,7 @@ mod tests {
async fn request_timeout() {
let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9956").await;
// server with 1 second timeout
let subway_server = subway_server(endpoint, 9945, Some(1)).await;
let subway_server = subway_server(endpoint, 9945, Some(1), None).await;
let url = format!("ws://{}", subway_server.addr);
// client with default 60 second timeout
let client = ws_client(&url).await;
Expand All @@ -375,4 +380,82 @@ mod tests {
subway_server.handle.stop().unwrap();
upstream_dummy_server_handle.stop().unwrap();
}

#[tokio::test]
async fn batch_requests_works() {
let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9957").await;

// Server with max batch size 3
let subway_server = subway_server(endpoint, 9946, None, Some(3)).await;
let url = format!("ws://{}", subway_server.addr);
let client = ws_client(&url).await;

// Sending 3 request in a batch
let mut batch = BatchRequestBuilder::new();
batch.insert(PHO, rpc_params!()).unwrap();
batch.insert(PHO, rpc_params!()).unwrap();
batch.insert(PHO, rpc_params!()).unwrap();

let res = client.batch_request::<String>(batch).await.unwrap();
assert_eq!(res.num_successful_calls(), 3);

upstream_dummy_server_handle.stop().unwrap();
}

#[tokio::test]
async fn batch_requests_exceeds_max_size_errors() {
let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9958").await;

// Server with max batch size 3
let subway_server = subway_server(endpoint, 9947, None, Some(3)).await;
let url = format!("ws://{}", subway_server.addr);
let client = ws_client(&url).await;

// Sending 4 request in a batch
let mut batch = BatchRequestBuilder::new();
batch.insert(PHO, rpc_params!()).unwrap();
batch.insert(PHO, rpc_params!()).unwrap();
batch.insert(PHO, rpc_params!()).unwrap();
batch.insert(PHO, rpc_params!()).unwrap();

// Due to the limitation of jsonrpsee client implementation,
// we can't check the error message when response batch id is `null`.
// E.g.
// Raw response - `{"jsonrpc":"2.0","error":{"code":-32010,"message":"The batch request was too large","data":"Exceeded max limit of 3"},"id":null}`
// Jsonrpsee client response - `Err(RestartNeeded(InvalidRequestId(NotPendingRequest("null"))))`
//
// Checking if error is returned for now.
let res = client.batch_request::<String>(batch).await;

assert!(res.is_err());

upstream_dummy_server_handle.stop().unwrap();
}

#[tokio::test]
async fn batch_requests_disabled_errors() {
let (endpoint, upstream_dummy_server_handle) = upstream_dummy_server("127.0.0.1:9959").await;

// Server with max batch size 0 (disabled)
let subway_server = subway_server(endpoint, 9948, None, Some(0)).await;
let url = format!("ws://{}", subway_server.addr);
let client = ws_client(&url).await;

// Sending 1 request in a batch
let mut batch = BatchRequestBuilder::new();
batch.insert(PHO, rpc_params!()).unwrap();

// Due to the limitation of jsonrpsee client implementation,
// we can't check the error message when response batch id is `null`.
// E.g.
// Raw response - `{"jsonrpc":"2.0","error":{"code":-32005,"message":"Batched requests are not supported by this server"},"id":null}`
// Jsonrpsee client response - `Err(RestartNeeded(InvalidRequestId(NotPendingRequest("null"))))`
//
// Checking if error is returned for now.
let res = client.batch_request::<String>(batch).await;

assert!(res.is_err());

upstream_dummy_server_handle.stop().unwrap();
}
}
1 change: 1 addition & 0 deletions src/tests/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn merge_subscription_works() {
listen_address: "0.0.0.0".to_string(),
port: 0,
max_connections: 10,
max_batch_size: None,
request_timeout_seconds: 120,
http_methods: Vec::new(),
cors: None,
Expand Down
1 change: 1 addition & 0 deletions src/tests/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn upstream_error_propagate() {
listen_address: "0.0.0.0".to_string(),
port: 0,
max_connections: 10,
max_batch_size: None,
request_timeout_seconds: 120,
http_methods: Vec::new(),
cors: None,
Expand Down

0 comments on commit 7d63591

Please sign in to comment.