Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export cache traces #389

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 94 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ ctrlc = { version = "3.4.5", features = ["termination"] }
dirs = "5.0.1"
error-stack = "0.5.0"
etcd-client = { version = "0.14.0", features = ["tls", "tls-roots"] }
foyer = { git = "https://github.com/foyer-rs/foyer.git", rev = "d49c480" } # 0.12-dev
fastrace = "0.7"
fastrace-opentelemetry = "0.7"
foyer = { version = "0.12.1", features = ["tracing"] }
futures = "0.3.30"
hex = { version = "0.4.3", features = ["serde"] }
memmap2 = "0.9.4"
mimalloc = "0.1.43"
opentelemetry = { version = "0.24.0", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.24.1", features = [
opentelemetry = { version = "0.25", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.25", features = [
"trace",
"metrics",
"rt-tokio",
] }
opentelemetry-otlp = { version = "0.17.0", features = [
opentelemetry-otlp = { version = "0.25", features = [
"trace",
"metrics",
"grpc-tonic",
Expand All @@ -75,7 +77,7 @@ tempfile = "3.13.0"
tempdir = "0.3.7"
testcontainers = "0.22.0"
time = { version = "0.3.36", features = ["formatting", "local-offset"] }
tokio = { version = "1.39.2", features = ["full"] }
tokio = { version = "1.38", features = ["full"] }
tokio-stream = { version = "0.1.15", features = ["sync", "net"] }
tokio-util = "0.7.11"
tonic = { version = "0.12.1", features = ["tls", "tls-roots", "prost"] }
Expand All @@ -87,7 +89,7 @@ tracing = { version = "0.1.40", features = [
"release_max_level_debug",
"valuable",
] }
tracing-opentelemetry = "0.25.0"
tracing-opentelemetry = "0.26"
tracing-subscriber = { version = "0.3.18", features = [
"std",
"env-filter",
Expand Down
2 changes: 1 addition & 1 deletion beaconchain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-beaconchain"
version = "2.0.0-beta.1"
version = "2.0.0-beta.2"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
8 changes: 4 additions & 4 deletions common/src/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use error_stack::{Result, ResultExt};
use foyer::{
AdmissionPicker, AdmitAllPicker, CacheEntry, Compression, DirectFsDeviceOptions, Engine,
HybridCache, HybridCacheBuilder, HybridFetch, LargeEngineOptions, RateLimitPicker, RecoverMode,
RuntimeConfig, TokioRuntimeConfig,
RuntimeOptions, TokioRuntimeOptions,
};

#[derive(Debug)]
Expand Down Expand Up @@ -168,12 +168,12 @@ impl FileCacheArgs {
.storage(Engine::Large)
.with_compression(compression)
.with_admission_picker(admission_picker)
.with_runtime_config(RuntimeConfig::Separated {
read_runtime_config: TokioRuntimeConfig {
.with_runtime_options(RuntimeOptions::Separated {
read_runtime_options: TokioRuntimeOptions {
worker_threads: self.cache_runtime_read_threads,
max_blocking_threads: self.cache_runtime_read_threads * 2,
},
write_runtime_config: TokioRuntimeConfig {
write_runtime_options: TokioRuntimeOptions {
worker_threads: self.cache_runtime_write_threads,
max_blocking_threads: self.cache_runtime_write_threads * 2,
},
Expand Down
21 changes: 21 additions & 0 deletions common/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ where
chain_view: tokio::sync::watch::Receiver<Option<ChainView>>,
fragment_id_to_name: HashMap<FragmentId, String>,
block_store: BlockStoreReader,
options: StreamServiceOptions,
ct: CancellationToken,
}

Expand All @@ -60,13 +61,22 @@ where
chain_view,
fragment_id_to_name,
block_store,
options,
ct,
}
}

pub fn into_service(self) -> dna_stream_server::DnaStreamServer<Self> {
dna_stream_server::DnaStreamServer::new(self)
}

pub fn current_stream_count(&self) -> usize {
self.options.max_concurrent_streams - self.stream_semaphore.available_permits()
}

pub fn current_stream_available(&self) -> usize {
self.stream_semaphore.available_permits()
}
}

#[tonic::async_trait]
Expand All @@ -78,6 +88,7 @@ where
Box<dyn Stream<Item = tonic::Result<StreamDataResponse, tonic::Status>> + Send + 'static>,
>;

#[tracing::instrument(name = "stream::status", skip_all)]
async fn status(
&self,
_request: tonic::Request<StatusRequest>,
Expand All @@ -94,10 +105,17 @@ where
Ok(tonic::Response::new(response))
}

#[tracing::instrument(
name = "stream::stream_data",
skip_all,
fields(stream_count, stream_available)
)]
async fn stream_data(
&self,
request: tonic::Request<StreamDataRequest>,
) -> tonic::Result<tonic::Response<Self::StreamDataStream>, tonic::Status> {
let current_span = tracing::Span::current();

let request = request.into_inner();
info!(request = ?request, "stream data request");

Expand All @@ -118,6 +136,9 @@ where
Ok(Ok(permit)) => permit,
};

current_span.record("stream_count", self.current_stream_count());
current_span.record("stream_available", self.current_stream_available());

// Validate starting cursor by checking it's in range.
// The block could be reorged but that's handled by the `DataStream`.
let starting_cursor = if let Some(cursor) = request.starting_cursor {
Expand Down
2 changes: 1 addition & 1 deletion evm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-evm"
version = "2.0.0-beta.1"
version = "2.0.0-beta.2"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions observability/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ license.workspace = true

[dependencies]
error-stack.workspace = true
fastrace.workspace = true
fastrace-opentelemetry.workspace = true
nu-ansi-term = "0.50.1"
opentelemetry.workspace = true
opentelemetry-otlp.workspace = true
Expand Down
28 changes: 26 additions & 2 deletions observability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
mod dna_fmt;

use std::borrow::Cow;
use std::time::Duration;

use error_stack::{Result, ResultExt};
use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use fastrace::collector::Config;
use fastrace_opentelemetry::OpenTelemetryReporter;
use opentelemetry::trace::{SpanKind, TracerProvider};
use opentelemetry::{global, InstrumentationLibrary};
use opentelemetry_sdk::resource::{ResourceDetector, SdkProvidedResourceDetector};
use tracing::Subscriber;

pub use opentelemetry::metrics::{ObservableCounter, ObservableGauge};
Expand Down Expand Up @@ -83,10 +87,30 @@ where
S: Subscriber + Send + Sync,
for<'a> S: LookupSpan<'a>,
{
let package_name = package_name.into();
let version = version.into();

// filter traces by crate/level
let otel_env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("INFO"));

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.build_span_exporter()
.change_context(OpenTelemetryInitError)
.attach_printable("failed to create span exporter")?;
let resource = SdkProvidedResourceDetector.detect(Duration::from_secs(1));
let instrumentation_lib = InstrumentationLibrary::builder(package_name.clone())
.with_version(version.clone())
.build();
let reporter = OpenTelemetryReporter::new(
exporter,
SpanKind::Server,
Cow::Owned(resource),
instrumentation_lib,
);
fastrace::set_reporter(reporter, Config::default());

// Both tracer and meter are configured with environment variables.
let meter = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
Expand Down
2 changes: 1 addition & 1 deletion starknet/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-starknet"
version = "2.0.0-beta.1"
version = "2.0.0-beta.2"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
Loading