Skip to content

Commit

Permalink
Max concurrent GrpcStore streams (#656)
Browse files Browse the repository at this point in the history
Add a configuration option to limit the number of concurrent streams that are
active with the upstream GrpcStore.

With this, we also introduce reconnection on error since Tonic doesn't do
that.
  • Loading branch information
chrisstaite-menlo authored Feb 5, 2024
1 parent a9526b1 commit 7548d4b
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
// Copyright 2023-2024 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,6 +134,12 @@ pub struct GrpcScheduler {
/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// Limit the number of simultaneous upstream requests to this many. A
/// value of zero is treated as unlimited. If the limit is reached the
/// request is queued.
#[serde(default)]
pub max_concurrent_requests: usize,
}

#[derive(Deserialize, Debug)]
Expand Down
8 changes: 7 additions & 1 deletion nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
// Copyright 2023-2024 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -546,6 +546,12 @@ pub struct GrpcStore {
/// Retry configuration to use when a network request fails.
#[serde(default)]
pub retry: Retry,

/// Limit the number of simultaneous upstream requests to this many. A
/// value of zero is treated as unlimited. If the limit is reached the
/// request is queued.
#[serde(default)]
pub max_concurrent_requests: usize,
}

/// The possible error codes that might occur on an upstream request.
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ rust_library(
"//nativelink-proto",
"//nativelink-store",
"//nativelink-util",
"@crates//:async-lock",
"@crates//:blake3",
"@crates//:futures",
"@crates//:hashbrown",
Expand Down
1 change: 1 addition & 0 deletions nativelink-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ nativelink-proto = { path = "../nativelink-proto" }
# files somewhere else.
nativelink-store = { path = "../nativelink-store" }

async-lock = "3.2.0"
async-trait = "0.1.74"
blake3 = "1.5.0"
prost = "0.12.3"
Expand Down
46 changes: 28 additions & 18 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
// Copyright 2023-2024 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use nativelink_util::grpc_utils::ConnectionManager;
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::tls_utils;
use parking_lot::Mutex;
Expand All @@ -36,17 +37,16 @@ use rand::Rng;
use tokio::select;
use tokio::sync::watch;
use tokio::time::sleep;
use tonic::{transport, Request, Streaming};
use tonic::{Request, Streaming};
use tracing::{error, info, warn};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

pub struct GrpcScheduler {
capabilities_client: CapabilitiesClient<transport::Channel>,
execution_client: ExecutionClient<transport::Channel>,
platform_property_managers: Mutex<HashMap<String, Arc<PlatformPropertyManager>>>,
retrier: Retrier,
connection_manager: ConnectionManager,
}

impl GrpcScheduler {
Expand All @@ -69,16 +69,15 @@ impl GrpcScheduler {
config: &nativelink_config::schedulers::GrpcScheduler,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
) -> Result<Self, Error> {
let channel = transport::Channel::balance_list(std::iter::once(tls_utils::endpoint(&config.endpoint)?));
let endpoint = tls_utils::endpoint(&config.endpoint)?;
Ok(Self {
capabilities_client: CapabilitiesClient::new(channel.clone()),
execution_client: ExecutionClient::new(channel),
platform_property_managers: Mutex::new(HashMap::new()),
retrier: Retrier::new(
Arc::new(|duration| Box::pin(sleep(duration))),
Arc::new(jitter_fn),
config.retry.to_owned(),
),
connection_manager: ConnectionManager::new(std::iter::once(endpoint), config.max_concurrent_requests),
})
}

Expand Down Expand Up @@ -150,14 +149,17 @@ impl ActionScheduler for GrpcScheduler {

self.perform_request(instance_name, |instance_name| async move {
// Not in the cache, lookup the capabilities with the upstream.
let capabilities = self
.capabilities_client
.clone()
let (connection, channel) = self.connection_manager.get_connection().await;
let capabilities_result = CapabilitiesClient::new(channel)
.get_capabilities(GetCapabilitiesRequest {
instance_name: instance_name.to_string(),
})
.await?
.into_inner();
.await
.err_tip(|| "Retrieving upstream GrpcScheduler capabilities");
if let Err(err) = &capabilities_result {
connection.on_error(err);
}
let capabilities = capabilities_result?.into_inner();
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
capabilities
.execution_capabilities
Expand Down Expand Up @@ -195,11 +197,15 @@ impl ActionScheduler for GrpcScheduler {
};
let result_stream = self
.perform_request(request, |request| async move {
self.execution_client
.clone()
let (connection, channel) = self.connection_manager.get_connection().await;
let result = ExecutionClient::new(channel)
.execute(Request::new(request))
.await
.err_tip(|| "Sending action to upstream scheduler")
.err_tip(|| "Sending action to upstream scheduler");
if let Err(err) = &result {
connection.on_error(err);
}
result
})
.await?
.into_inner();
Expand All @@ -215,11 +221,15 @@ impl ActionScheduler for GrpcScheduler {
};
let result_stream = self
.perform_request(request, |request| async move {
self.execution_client
.clone()
let (connection, channel) = self.connection_manager.get_connection().await;
let result = ExecutionClient::new(channel)
.wait_execution(Request::new(request))
.await
.err_tip(|| "While getting wait_execution stream")
.err_tip(|| "While getting wait_execution stream");
if let Err(err) = &result {
connection.on_error(err);
}
result
})
.and_then(|result_stream| Self::stream_state(result_stream.into_inner()))
.await;
Expand Down
Loading

0 comments on commit 7548d4b

Please sign in to comment.