From b88835aedadd1f83bb82a7681075fc5cd77204b7 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Wed, 28 May 2025 15:51:26 +0200 Subject: [PATCH 01/10] feat: otlp tracing --- Cargo.lock | 192 ++++++++++++++++-- crates/agglayer-config/src/lib.rs | 6 +- .../src/{log.rs => tracing.rs} | 67 +++--- crates/agglayer-node/src/lib.rs | 9 +- crates/agglayer-telemetry/Cargo.toml | 13 +- crates/agglayer-telemetry/src/lib.rs | 1 + crates/agglayer-telemetry/src/traces.rs | 167 +++++++++++++++ tests/integrations/src/agglayer_setup.rs | 4 +- 8 files changed, 399 insertions(+), 60 deletions(-) rename crates/agglayer-config/src/{log.rs => tracing.rs} (55%) create mode 100644 crates/agglayer-telemetry/src/traces.rs diff --git a/Cargo.lock b/Cargo.lock index d3d833e1..22ba70a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -666,18 +666,23 @@ dependencies = [ name = "agglayer-telemetry" version = "0.1.0" dependencies = [ + "agglayer-config", + "anyhow", "axum 0.8.3", "buildstructor", "futures", "lazy_static", - "opentelemetry", - "opentelemetry-prometheus", - "opentelemetry_sdk", - "prometheus", + "opentelemetry 0.29.1", + "opentelemetry-otlp", + "opentelemetry-prometheus 0.29.1", + "opentelemetry_sdk 0.29.0", + "prometheus 0.14.0", "thiserror 2.0.12", "tokio", "tokio-util", "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -689,10 +694,10 @@ dependencies = [ "buildstructor", "futures", "lazy_static", - "opentelemetry", - "opentelemetry-prometheus", - "opentelemetry_sdk", - "prometheus", + "opentelemetry 0.27.1", + "opentelemetry-prometheus 0.27.0", + "opentelemetry_sdk 0.27.1", + "prometheus 0.13.4", "thiserror 2.0.12", "tokio", "tokio-util", @@ -2209,7 +2214,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.12.1", "lazy_static", "lazycell", "proc-macro2", @@ -2229,7 +2234,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -2249,7 +2254,7 @@ dependencies = [ "bitflags 2.9.0", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -6228,7 +6233,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.3.0", "proc-macro2", "quote", "syn 2.0.100", @@ -6366,6 +6371,54 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry 0.29.1", + "reqwest 0.12.15", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +dependencies = [ + "futures-core", + "http 1.3.1", + "opentelemetry 0.29.1", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk 0.29.0", + "prost 0.13.5", + "reqwest 0.12.15", + "thiserror 2.0.12", + "tokio", + "tonic 0.12.3", + "tracing", +] + [[package]] name = "opentelemetry-prometheus" version = "0.27.0" @@ -6373,13 +6426,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b834e966ea5e2d03dfe5f2253f03d22cce21403ee940265070eeee96cee0bcc" dependencies = [ "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "prometheus", - "protobuf", + "opentelemetry 0.27.1", + "opentelemetry_sdk 0.27.1", + "prometheus 0.13.4", + "protobuf 2.28.0", "tracing", ] +[[package]] +name = "opentelemetry-prometheus" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "098a71a4430bb712be6130ed777335d2e5b19bc8566de5f2edddfce906def6ab" +dependencies = [ + "once_cell", + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", + "prometheus 0.14.0", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", + "prost 0.13.5", + "tonic 0.12.3", +] + [[package]] name = "opentelemetry_sdk" version = "0.27.1" @@ -6391,7 +6469,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry", + "opentelemetry 0.27.1", "percent-encoding", "rand 0.8.5", "serde_json", @@ -6399,6 +6477,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry 0.29.1", + "percent-encoding", + "rand 0.9.0", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7313,10 +7411,25 @@ dependencies = [ "lazy_static", "memchr", "parking_lot", - "protobuf", + "protobuf 2.28.0", "thiserror 1.0.69", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf 3.7.2", + "thiserror 2.0.12", +] + [[package]] name = "proptest" version = "1.6.0" @@ -7375,7 +7488,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -7408,7 +7521,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.100", @@ -7438,6 +7551,26 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "prover-config" version = "0.1.0" @@ -7850,6 +7983,7 @@ dependencies = [ "base64 0.22.1", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2 0.4.8", @@ -10274,6 +10408,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/crates/agglayer-config/src/lib.rs b/crates/agglayer-config/src/lib.rs index c8cd6f2f..8f0e35ad 100644 --- a/crates/agglayer-config/src/lib.rs +++ b/crates/agglayer-config/src/lib.rs @@ -24,23 +24,23 @@ pub mod certificate_orchestrator; pub mod epoch; pub(crate) mod l1; pub(crate) mod l2; -pub mod log; pub mod outbound; pub mod rate_limiting; pub(crate) mod rpc; pub mod shutdown; pub mod storage; pub(crate) mod telemetry; +pub mod tracing; mod with; pub use auth::{AuthConfig, GcpKmsConfig, LocalConfig, PrivateKey}; pub use epoch::Epoch; pub use l1::L1; pub use l2::L2; -pub use log::Log; use prover::default_prover_entrypoint; pub use rate_limiting::RateLimitingConfig; pub use rpc::RpcConfig; +pub use tracing::Tracing; /// The Agglayer configuration. #[serde_with::serde_as] @@ -65,7 +65,7 @@ pub struct Config { /// The log configuration. #[serde(default)] - pub log: Log, + pub log: Tracing, /// The local RPC server configuration. #[serde(default)] diff --git a/crates/agglayer-config/src/log.rs b/crates/agglayer-config/src/tracing.rs similarity index 55% rename from crates/agglayer-config/src/log.rs rename to crates/agglayer-config/src/tracing.rs index a1b6935d..f0bd41d8 100644 --- a/crates/agglayer-config/src/log.rs +++ b/crates/agglayer-config/src/tracing.rs @@ -3,24 +3,32 @@ use std::{fmt::Display, path::PathBuf}; use serde::{Deserialize, Deserializer, Serialize}; use tracing_subscriber::{fmt::writer::BoxMakeWriter, EnvFilter}; -/// The log configuration. +/// The tracing configuration. #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub struct Log { +pub struct Tracing { /// The `RUST_LOG` environment variable will take precedence over the - /// configuration log level. + /// configuration tracing level. #[serde(default)] - pub level: LogLevel, + pub level: TracingLevel, #[serde(default)] - pub outputs: Vec, + pub outputs: Vec, #[serde(default)] - pub format: LogFormat, + pub format: TracingFormat, + /// Socket of the open telemetry agent endpoint. + /// If not provided open telemetry will not be used. + #[serde(skip_serializing_if = "Option::is_none")] + pub otlp_agent: Option, + /// Otlp service name. + /// If not provided open telemetry will not be used. + #[serde(skip_serializing_if = "Option::is_none")] + pub otlp_service_name: Option, } /// The log format. #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy, PartialEq, Eq)] #[serde(rename_all = "lowercase")] -pub enum LogFormat { +pub enum TracingFormat { #[default] Pretty, Json, @@ -29,7 +37,7 @@ pub enum LogFormat { /// The log level. #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy, PartialEq, Eq)] #[serde(rename_all = "lowercase")] -pub enum LogLevel { +pub enum TracingLevel { Trace, Debug, #[default] @@ -39,23 +47,23 @@ pub enum LogLevel { Fatal, } -impl Display for LogLevel { +impl Display for TracingLevel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let level = match self { - LogLevel::Trace => "trace", - LogLevel::Debug => "debug", - LogLevel::Info => "info", - LogLevel::Warn => "warn", - LogLevel::Error => "error", - LogLevel::Fatal => "fatal", + TracingLevel::Trace => "trace", + TracingLevel::Debug => "debug", + TracingLevel::Info => "info", + TracingLevel::Warn => "warn", + TracingLevel::Error => "error", + TracingLevel::Fatal => "fatal", }; write!(f, "{level}") } } -impl From for EnvFilter { - fn from(value: LogLevel) -> Self { +impl From for EnvFilter { + fn from(value: TracingLevel) -> Self { EnvFilter::new(format!("warn,agglayer={value},pessimistic_proof={value}")) } } @@ -69,14 +77,15 @@ impl From for EnvFilter { /// appropriate enum variant. If the string is not recognized to be either /// `stdout` or `stderr`, it is assumed to be a file path. #[derive(Serialize, Debug, Clone, Default, PartialEq, Eq)] -pub enum LogOutput { +pub enum TracingOutput { #[default] Stdout, Stderr, File(PathBuf), + Otlp, } -impl<'de> Deserialize<'de> for LogOutput { +impl<'de> Deserialize<'de> for TracingOutput { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, @@ -85,22 +94,24 @@ impl<'de> Deserialize<'de> for LogOutput { // If the string is not recognized to be either `stdout` or `stderr`, // it is assumed to be a file path. match s.as_str() { - "stdout" => Ok(LogOutput::Stdout), - "stderr" => Ok(LogOutput::Stderr), - _ => Ok(LogOutput::File(PathBuf::from(s))), + "stdout" => Ok(TracingOutput::Stdout), + "stderr" => Ok(TracingOutput::Stderr), + "otlp" => Ok(TracingOutput::Otlp), + _ => Ok(TracingOutput::File(PathBuf::from(s))), } } } -impl LogOutput { - /// Get a [`BoxMakeWriter`] for the log output. +impl TracingOutput { + /// Get a [`BoxMakeWriter`] for the tracing output. /// - /// This can be used to plug the log output into the tracing subscriber. + /// This can be used to plug the tracing output into the tracing subscriber. pub fn as_make_writer(&self) -> BoxMakeWriter { match self { - LogOutput::Stdout => BoxMakeWriter::new(std::io::stdout), - LogOutput::Stderr => BoxMakeWriter::new(std::io::stderr), - LogOutput::File(path) => { + TracingOutput::Stdout => BoxMakeWriter::new(std::io::stdout), + TracingOutput::Stderr => BoxMakeWriter::new(std::io::stderr), + TracingOutput::Otlp => BoxMakeWriter::new(std::io::stdout), + TracingOutput::File(path) => { let appender = tracing_appender::rolling::never(".", path); BoxMakeWriter::new(appender) } diff --git a/crates/agglayer-node/src/lib.rs b/crates/agglayer-node/src/lib.rs index 42d3841a..910285d0 100644 --- a/crates/agglayer-node/src/lib.rs +++ b/crates/agglayer-node/src/lib.rs @@ -5,7 +5,6 @@ use anyhow::{bail, Result}; use node::Node; use tokio_util::sync::CancellationToken; use tracing::{debug, info}; -mod logging; mod epoch_synchronizer; mod node; @@ -48,8 +47,8 @@ pub fn main( bail!("Received cancellation signal before starting the node."); } - // Initialize the logger - logging::tracing(&config.log); + // // Initialize the logger + // logging::tracing(&config.log); info!("Starting agglayer node version info: {}", version); @@ -64,6 +63,10 @@ pub fn main( .enable_all() .build()?; + // Initialize the tracing + metrics_runtime + .block_on(async { agglayer_telemetry::traces::setup_tracing(&config.log, version) })?; + // Create the metrics server. let metric_server = metrics_runtime.block_on( MetricsBuilder::builder() diff --git a/crates/agglayer-telemetry/Cargo.toml b/crates/agglayer-telemetry/Cargo.toml index 3de78d2d..f4899ef0 100644 --- a/crates/agglayer-telemetry/Cargo.toml +++ b/crates/agglayer-telemetry/Cargo.toml @@ -7,15 +7,20 @@ edition.workspace = true workspace = true [dependencies] +anyhow.workspace = true +agglayer-config.workspace = true axum.workspace = true buildstructor.workspace = true futures.workspace = true lazy_static.workspace = true -opentelemetry = { version = "0.27.1", features = ["metrics"] } -opentelemetry-prometheus = "0.27.0" -opentelemetry_sdk = { version = "0.27.1", features = ["metrics"] } -prometheus = "0.13.3" +opentelemetry = { version = "0.29.1", features = ["metrics"] } +opentelemetry-otlp = { version = "0.29.0", features = ["trace", "grpc-tonic"]} +opentelemetry-prometheus = "0.29.1" +opentelemetry_sdk = { version = "0.29.0", features = ["metrics", "rt-tokio", "trace"] } +prometheus = "0.14.0" thiserror.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } tracing.workspace = true +tracing-opentelemetry = "0.30.0" +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } diff --git a/crates/agglayer-telemetry/src/lib.rs b/crates/agglayer-telemetry/src/lib.rs index 343a2683..1e402c5c 100644 --- a/crates/agglayer-telemetry/src/lib.rs +++ b/crates/agglayer-telemetry/src/lib.rs @@ -21,6 +21,7 @@ use crate::{ mod constant; mod error; +pub mod traces; pub use error::Error; pub use opentelemetry::KeyValue; diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs new file mode 100644 index 00000000..36c6ab05 --- /dev/null +++ b/crates/agglayer-telemetry/src/traces.rs @@ -0,0 +1,167 @@ +use std::time::Duration; + +use agglayer_config::tracing::{TracingFormat, TracingOutput}; +use opentelemetry::{global, trace::TracerProvider, KeyValue}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + propagation::TraceContextPropagator, + trace::{BatchConfigBuilder, BatchSpanProcessor, Sampler, SpanLimits}, + Resource, +}; +use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; + +pub fn setup_tracing(config: &agglayer_config::Tracing, version: &str) -> anyhow::Result<()> { + let writer = config.outputs.first().cloned().unwrap_or_default(); + + let mut layers = Vec::new(); + + // Setup instrumentation if both otlp agent url and + // otlp service name are provided as arguments + if config + .outputs + .iter() + .any(|output| *output == TracingOutput::Otlp) + { + if let (Some(otlp_agent), Some(otlp_service_name)) = + (&config.otlp_agent, &config.otlp_service_name) + { + let resources = build_resources(otlp_service_name, version); + let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp_agent) + .build()?; + + let batch_processor_config = BatchConfigBuilder::default() + .with_scheduled_delay(match std::env::var("OTLP_BATCH_SCHEDULED_DELAY") { + Ok(v) => Duration::from_millis(v.parse::().unwrap_or(5_000)), + _ => Duration::from_millis(5_000), + }) + .with_max_queue_size(match std::env::var("OTLP_BATCH_MAX_QUEUE_SIZE") { + Ok(v) => v.parse::().unwrap_or(2048), + _ => 2048, + }) + .with_max_export_batch_size( + match std::env::var("OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE") { + Ok(v) => v.parse::().unwrap_or(512), + _ => 512, + }, + ); + + let span_limits_default = SpanLimits::default(); + + let trace_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_sampler(Sampler::AlwaysOn) + .with_max_events_per_span(match std::env::var("OTLP_MAX_EVENTS_PER_SPAN") { + Ok(v) => v + .parse::() + .unwrap_or(span_limits_default.max_events_per_span), + _ => span_limits_default.max_events_per_span, + }) + .with_max_attributes_per_span(match std::env::var("OTLP_MAX_ATTRIBUTES_PER_SPAN") { + Ok(v) => v + .parse::() + .unwrap_or(span_limits_default.max_attributes_per_span), + _ => span_limits_default.max_attributes_per_span, + }) + .with_max_links_per_span(match std::env::var("OTLP_MAX_LINK_PER_SPAN") { + Ok(v) => v + .parse::() + .unwrap_or(span_limits_default.max_links_per_span), + _ => span_limits_default.max_links_per_span, + }) + .with_max_attributes_per_event( + match std::env::var("OTLP_MAX_ATTRIBUTES_PER_EVENT") { + Ok(v) => v + .parse::() + .unwrap_or(span_limits_default.max_attributes_per_event), + _ => span_limits_default.max_attributes_per_event, + }, + ) + .with_max_attributes_per_link(match std::env::var("OTLP_MAX_ATTRIBUTES_PER_LINK") { + Ok(v) => v + .parse::() + .unwrap_or(span_limits_default.max_attributes_per_link), + _ => span_limits_default.max_attributes_per_link, + }) + .with_resource(Resource::builder().with_attributes(resources).build()) + .with_span_processor( + BatchSpanProcessor::builder(otlp_exporter) + .with_batch_config(batch_processor_config.build()) + .build(), + ) + .build(); + + let tracer = trace_provider + .tracer("agglayer-otlp"); + + let _ = global::set_tracer_provider(trace_provider); + + layers.push( + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into()), + ) + .boxed(), + ); + + global::set_text_map_propagator(TraceContextPropagator::new()); + } else { + anyhow::bail!("Otlp tracing requires both otlp agent url and otlp service provided"); + } + } + + layers.push(match config.format { + TracingFormat::Pretty => tracing_subscriber::fmt::layer() + .pretty() + .with_writer(writer.as_make_writer()) + .with_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into())) + .boxed(), + + TracingFormat::Json => tracing_subscriber::fmt::layer() + .json() + .with_writer(writer.as_make_writer()) + .with_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into())) + .boxed(), + }); + + // We are using try_init because integration test may try + // to initialize this multiple times. + _ = tracing_subscriber::Registry::default() + .with(layers) + .try_init(); + + tracing::info!("Tracing initialized with config: {config:?}"); + + Ok(()) +} + +fn build_resources(otlp_service_name: &str, version: &str) -> Vec { + let mut resources = Vec::new(); + + resources.push(KeyValue::new("service.name", otlp_service_name.to_string())); + resources.push(KeyValue::new("service.version", version.to_string())); + + let custom_resources: Vec<_> = std::env::var("AGGLAYER_OTLP_TAGS") + .unwrap_or_default() + .split(',') + // NOTE: limit to 10 tags to avoid exploit + .take(10) + .filter_map(|tag_raw| { + let mut v = tag_raw.splitn(2, '='); + match (v.next(), v.next()) { + (Some(key), Some(value)) if !key.trim().is_empty() && !value.trim().is_empty() => { + Some(KeyValue::new( + key.trim().to_string(), + value.trim().to_string(), + )) + } + _ => None, + } + }) + .collect(); + + resources.extend(custom_resources); + + resources +} diff --git a/tests/integrations/src/agglayer_setup.rs b/tests/integrations/src/agglayer_setup.rs index 7a20c3f2..aee64cfd 100644 --- a/tests/integrations/src/agglayer_setup.rs +++ b/tests/integrations/src/agglayer_setup.rs @@ -1,6 +1,6 @@ use std::{path::Path, time::Duration}; -use agglayer_config::{log::LogLevel, Config}; +use agglayer_config::{tracing::TracingLevel, Config}; use agglayer_prover::fake::FakeProver; use ethers::{ core::k256::ecdsa::SigningKey, @@ -110,7 +110,7 @@ pub async fn start_agglayer( config.rpc.admin_port = admin_addr.port(); config.telemetry.addr = next_available_addr(); - config.log.level = LogLevel::Debug; + config.log.level = TracingLevel::Debug; config.l1.node_url = l1.rpc.parse().unwrap(); config.l1.ws_node_url = l1.ws.parse().unwrap(); config.l1.rollup_manager_contract = "0x0B306BF915C4d645ff596e518fAf3F9669b97016" From bb6085dac65b0e86a22294fa57bce12e962cd7af Mon Sep 17 00:00:00 2001 From: atanmarko Date: Thu, 29 May 2025 13:37:51 +0200 Subject: [PATCH 02/10] fix: format --- crates/agglayer-telemetry/src/traces.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index 36c6ab05..ac50f76b 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -91,8 +91,7 @@ pub fn setup_tracing(config: &agglayer_config::Tracing, version: &str) -> anyhow ) .build(); - let tracer = trace_provider - .tracer("agglayer-otlp"); + let tracer = trace_provider.tracer("agglayer-otlp"); let _ = global::set_tracer_provider(trace_provider); From 7f61e3abc07610cee395b44741019f1283e96be5 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Thu, 29 May 2025 16:26:51 +0200 Subject: [PATCH 03/10] fix: clippy --- crates/agglayer-telemetry/src/traces.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index ac50f76b..52a9b5af 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -18,9 +18,7 @@ pub fn setup_tracing(config: &agglayer_config::Tracing, version: &str) -> anyhow // Setup instrumentation if both otlp agent url and // otlp service name are provided as arguments if config - .outputs - .iter() - .any(|output| *output == TracingOutput::Otlp) + .outputs.contains(&TracingOutput::Otlp) { if let (Some(otlp_agent), Some(otlp_service_name)) = (&config.otlp_agent, &config.otlp_service_name) From cb8c91fe200f2724e2920ac885408246af315cd1 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Fri, 30 May 2025 10:33:20 +0200 Subject: [PATCH 04/10] fix: format --- crates/agglayer-telemetry/src/traces.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index 52a9b5af..3bbdad06 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -17,9 +17,7 @@ pub fn setup_tracing(config: &agglayer_config::Tracing, version: &str) -> anyhow // Setup instrumentation if both otlp agent url and // otlp service name are provided as arguments - if config - .outputs.contains(&TracingOutput::Otlp) - { + if config.outputs.contains(&TracingOutput::Otlp) { if let (Some(otlp_agent), Some(otlp_service_name)) = (&config.otlp_agent, &config.otlp_service_name) { From d29bda239a133380b56cf635ea396cf429ea68c7 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Mon, 2 Jun 2025 12:08:27 +0200 Subject: [PATCH 05/10] fix: comment --- crates/agglayer-config/src/tracing.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/agglayer-config/src/tracing.rs b/crates/agglayer-config/src/tracing.rs index f0bd41d8..ad090151 100644 --- a/crates/agglayer-config/src/tracing.rs +++ b/crates/agglayer-config/src/tracing.rs @@ -110,6 +110,7 @@ impl TracingOutput { match self { TracingOutput::Stdout => BoxMakeWriter::new(std::io::stdout), TracingOutput::Stderr => BoxMakeWriter::new(std::io::stderr), + // For OTLP, output traces also to stdout TracingOutput::Otlp => BoxMakeWriter::new(std::io::stdout), TracingOutput::File(path) => { let appender = tracing_appender::rolling::never(".", path); From 6cc1b46c182d0dbfd54f0cfbecd67fd00a5db4e1 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Mon, 2 Jun 2025 12:53:29 +0200 Subject: [PATCH 06/10] chore: rename struct --- crates/agglayer-config/src/lib.rs | 4 ++-- crates/agglayer-config/src/tracing.rs | 2 +- crates/agglayer-telemetry/src/traces.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/agglayer-config/src/lib.rs b/crates/agglayer-config/src/lib.rs index 8f0e35ad..72c53d3b 100644 --- a/crates/agglayer-config/src/lib.rs +++ b/crates/agglayer-config/src/lib.rs @@ -40,7 +40,7 @@ pub use l2::L2; use prover::default_prover_entrypoint; pub use rate_limiting::RateLimitingConfig; pub use rpc::RpcConfig; -pub use tracing::Tracing; +pub use tracing::TracingConfig; /// The Agglayer configuration. #[serde_with::serde_as] @@ -65,7 +65,7 @@ pub struct Config { /// The log configuration. #[serde(default)] - pub log: Tracing, + pub log: TracingConfig, /// The local RPC server configuration. #[serde(default)] diff --git a/crates/agglayer-config/src/tracing.rs b/crates/agglayer-config/src/tracing.rs index ad090151..b7e12144 100644 --- a/crates/agglayer-config/src/tracing.rs +++ b/crates/agglayer-config/src/tracing.rs @@ -6,7 +6,7 @@ use tracing_subscriber::{fmt::writer::BoxMakeWriter, EnvFilter}; /// The tracing configuration. #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub struct Tracing { +pub struct TracingConfig { /// The `RUST_LOG` environment variable will take precedence over the /// configuration tracing level. #[serde(default)] diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index 3bbdad06..4ed29054 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -10,7 +10,7 @@ use opentelemetry_sdk::{ }; use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; -pub fn setup_tracing(config: &agglayer_config::Tracing, version: &str) -> anyhow::Result<()> { +pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> anyhow::Result<()> { let writer = config.outputs.first().cloned().unwrap_or_default(); let mut layers = Vec::new(); From 26bf94abf5b5c9fdb89c3fd232f32bb72ac35780 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Wed, 4 Jun 2025 13:57:35 +0200 Subject: [PATCH 07/10] fix: review --- Cargo.toml | 5 + crates/agglayer-config/src/tracing.rs | 4 +- crates/agglayer-node/src/lib.rs | 3 - crates/agglayer-node/src/logging.rs | 31 ----- crates/agglayer-telemetry/Cargo.toml | 10 +- crates/agglayer-telemetry/src/traces.rs | 158 ++++++++++++++---------- 6 files changed, 105 insertions(+), 106 deletions(-) delete mode 100644 crates/agglayer-node/src/logging.rs diff --git a/Cargo.toml b/Cargo.toml index 725f8a71..25fbba49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,11 @@ insta = { git = "https://github.com/freyskeyd/insta", branch = "chore/updating-d jsonrpsee = { version = "0.24.7", features = ["full"] } lazy_static = "1.5" mockall = "0.13.1" +opentelemetry = { version = "0.29.1", features = ["metrics"] } +opentelemetry-otlp = { version = "0.29.0", features = ["trace", "grpc-tonic"]} +opentelemetry-prometheus = "0.29.1" +opentelemetry_sdk = { version = "0.29.0", features = ["metrics", "rt-tokio", "trace"] } +prometheus = "0.14.0" parking_lot = "0.12.3" pin-project = "1.1" prost = "0.13.4" diff --git a/crates/agglayer-config/src/tracing.rs b/crates/agglayer-config/src/tracing.rs index b7e12144..387dfb42 100644 --- a/crates/agglayer-config/src/tracing.rs +++ b/crates/agglayer-config/src/tracing.rs @@ -110,8 +110,8 @@ impl TracingOutput { match self { TracingOutput::Stdout => BoxMakeWriter::new(std::io::stdout), TracingOutput::Stderr => BoxMakeWriter::new(std::io::stderr), - // For OTLP, output traces also to stdout - TracingOutput::Otlp => BoxMakeWriter::new(std::io::stdout), + // OTLP uses its own export mechanism, not a writer + TracingOutput::Otlp => BoxMakeWriter::new(std::io::sink), TracingOutput::File(path) => { let appender = tracing_appender::rolling::never(".", path); BoxMakeWriter::new(appender) diff --git a/crates/agglayer-node/src/lib.rs b/crates/agglayer-node/src/lib.rs index 910285d0..e15c9a1d 100644 --- a/crates/agglayer-node/src/lib.rs +++ b/crates/agglayer-node/src/lib.rs @@ -47,9 +47,6 @@ pub fn main( bail!("Received cancellation signal before starting the node."); } - // // Initialize the logger - // logging::tracing(&config.log); - info!("Starting agglayer node version info: {}", version); let node_runtime = tokio::runtime::Builder::new_multi_thread() diff --git a/crates/agglayer-node/src/logging.rs b/crates/agglayer-node/src/logging.rs deleted file mode 100644 index 95ee6b86..00000000 --- a/crates/agglayer-node/src/logging.rs +++ /dev/null @@ -1,31 +0,0 @@ -use agglayer_config::log::LogFormat; -use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; - -pub(crate) fn tracing(config: &agglayer_config::Log) { - // TODO: Support multiple outputs. - let writer = config.outputs.first().cloned().unwrap_or_default(); - - let layer = match config.format { - LogFormat::Pretty => { - let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into()); - - tracing_subscriber::fmt::layer() - .pretty() - .with_writer(writer.as_make_writer()) - .with_filter(filter) - .boxed() - } - - LogFormat::Json => tracing_subscriber::fmt::layer() - .json() - .with_writer(writer.as_make_writer()) - .with_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into())) - .boxed(), - }; - - // We are using try_init because integration test may try to initialize this - // multiple times. - _ = tracing_subscriber::Registry::default() - .with(layer) - .try_init(); -} diff --git a/crates/agglayer-telemetry/Cargo.toml b/crates/agglayer-telemetry/Cargo.toml index f4899ef0..a6484b48 100644 --- a/crates/agglayer-telemetry/Cargo.toml +++ b/crates/agglayer-telemetry/Cargo.toml @@ -13,11 +13,11 @@ axum.workspace = true buildstructor.workspace = true futures.workspace = true lazy_static.workspace = true -opentelemetry = { version = "0.29.1", features = ["metrics"] } -opentelemetry-otlp = { version = "0.29.0", features = ["trace", "grpc-tonic"]} -opentelemetry-prometheus = "0.29.1" -opentelemetry_sdk = { version = "0.29.0", features = ["metrics", "rt-tokio", "trace"] } -prometheus = "0.14.0" +opentelemetry.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry-prometheus.workspace = true +opentelemetry_sdk.workspace = true +prometheus.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index 4ed29054..eca6296c 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -1,6 +1,7 @@ use std::time::Duration; use agglayer_config::tracing::{TracingFormat, TracingOutput}; +use anyhow::anyhow; use opentelemetry::{global, trace::TracerProvider, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ @@ -10,17 +11,25 @@ use opentelemetry_sdk::{ }; use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; -pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> anyhow::Result<()> { - let writer = config.outputs.first().cloned().unwrap_or_default(); +pub const OTLP_BATCH_SCHEDULED_DELAY: Duration = Duration::from_millis(5_000); +pub const OTLP_BATCH_MAX_QUEUE_SIZE: usize = 2048; +pub const OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE: usize = 512; +pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> anyhow::Result<()> { let mut layers = Vec::new(); - // Setup instrumentation if both otlp agent url and - // otlp service name are provided as arguments - if config.outputs.contains(&TracingOutput::Otlp) { - if let (Some(otlp_agent), Some(otlp_service_name)) = - (&config.otlp_agent, &config.otlp_service_name) - { + for writer in &config.outputs { + // Setup instrumentation if both otlp agent url and + // otlp service name are provided as arguments + if writer == &TracingOutput::Otlp { + let (Some(otlp_agent), Some(otlp_service_name)) = + (&config.otlp_agent, &config.otlp_service_name) + else { + anyhow::bail!( + "Otlp tracing requires both otlp agent url and otlp service provided" + ); + }; + let resources = build_resources(otlp_service_name, version); let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() @@ -29,56 +38,67 @@ pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> let batch_processor_config = BatchConfigBuilder::default() .with_scheduled_delay(match std::env::var("OTLP_BATCH_SCHEDULED_DELAY") { - Ok(v) => Duration::from_millis(v.parse::().unwrap_or(5_000)), - _ => Duration::from_millis(5_000), + Ok(v) => { + if let Ok(millis) = v.parse::() { + Duration::from_millis(millis) + } else { + OTLP_BATCH_SCHEDULED_DELAY + } + } + _ => OTLP_BATCH_SCHEDULED_DELAY, }) .with_max_queue_size(match std::env::var("OTLP_BATCH_MAX_QUEUE_SIZE") { - Ok(v) => v.parse::().unwrap_or(2048), - _ => 2048, + Ok(v) => v.parse::().unwrap_or(OTLP_BATCH_MAX_QUEUE_SIZE), + _ => OTLP_BATCH_MAX_QUEUE_SIZE, }) .with_max_export_batch_size( match std::env::var("OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE") { - Ok(v) => v.parse::().unwrap_or(512), - _ => 512, + Ok(v) => v + .parse::() + .unwrap_or(OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE), + _ => OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE, }, ); - let span_limits_default = SpanLimits::default(); + let span_limits = { + let mut span_limits = SpanLimits::default(); + if let Ok(max_events) = std::env::var("OTLP_MAX_EVENTS_PER_SPAN") { + if let Ok(value) = max_events.parse::() { + span_limits.max_events_per_span = value; + } + } + + if let Ok(max_attributes) = std::env::var("OTLP_MAX_ATTRIBUTES_PER_SPAN") { + if let Ok(value) = max_attributes.parse::() { + span_limits.max_attributes_per_span = value; + } + } + if let Ok(max_links_per_span) = std::env::var("OTLP_MAX_LINKS_PER_SPAN") { + if let Ok(value) = max_links_per_span.parse::() { + span_limits.max_links_per_span = value; + } + } + + if let Ok(max_attributes_per_event) = std::env::var("OTLP_MAX_ATTRIBUTES_PER_EVENT") + { + if let Ok(value) = max_attributes_per_event.parse::() { + span_limits.max_attributes_per_event = value; + } + } + + if let Ok(max_attributes_per_link) = std::env::var("OTLP_MAX_ATTRIBUTES_PER_LINK") { + if let Ok(value) = max_attributes_per_link.parse::() { + span_limits.max_attributes_per_link = value; + } + } + span_limits + }; + + // Ensure that the span limits are not too low let trace_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_sampler(Sampler::AlwaysOn) - .with_max_events_per_span(match std::env::var("OTLP_MAX_EVENTS_PER_SPAN") { - Ok(v) => v - .parse::() - .unwrap_or(span_limits_default.max_events_per_span), - _ => span_limits_default.max_events_per_span, - }) - .with_max_attributes_per_span(match std::env::var("OTLP_MAX_ATTRIBUTES_PER_SPAN") { - Ok(v) => v - .parse::() - .unwrap_or(span_limits_default.max_attributes_per_span), - _ => span_limits_default.max_attributes_per_span, - }) - .with_max_links_per_span(match std::env::var("OTLP_MAX_LINK_PER_SPAN") { - Ok(v) => v - .parse::() - .unwrap_or(span_limits_default.max_links_per_span), - _ => span_limits_default.max_links_per_span, - }) - .with_max_attributes_per_event( - match std::env::var("OTLP_MAX_ATTRIBUTES_PER_EVENT") { - Ok(v) => v - .parse::() - .unwrap_or(span_limits_default.max_attributes_per_event), - _ => span_limits_default.max_attributes_per_event, - }, - ) - .with_max_attributes_per_link(match std::env::var("OTLP_MAX_ATTRIBUTES_PER_LINK") { - Ok(v) => v - .parse::() - .unwrap_or(span_limits_default.max_attributes_per_link), - _ => span_limits_default.max_attributes_per_link, - }) + .with_span_limits(span_limits) .with_resource(Resource::builder().with_attributes(resources).build()) .with_span_processor( BatchSpanProcessor::builder(otlp_exporter) @@ -102,29 +122,32 @@ pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> global::set_text_map_propagator(TraceContextPropagator::new()); } else { - anyhow::bail!("Otlp tracing requires both otlp agent url and otlp service provided"); + layers.push(match config.format { + TracingFormat::Pretty => tracing_subscriber::fmt::layer() + .pretty() + .with_writer(writer.as_make_writer()) + .with_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into()), + ) + .boxed(), + + TracingFormat::Json => tracing_subscriber::fmt::layer() + .json() + .with_writer(writer.as_make_writer()) + .with_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into()), + ) + .boxed(), + }); } } - layers.push(match config.format { - TracingFormat::Pretty => tracing_subscriber::fmt::layer() - .pretty() - .with_writer(writer.as_make_writer()) - .with_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into())) - .boxed(), - - TracingFormat::Json => tracing_subscriber::fmt::layer() - .json() - .with_writer(writer.as_make_writer()) - .with_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| config.level.into())) - .boxed(), - }); - // We are using try_init because integration test may try // to initialize this multiple times. - _ = tracing_subscriber::Registry::default() + tracing_subscriber::Registry::default() .with(layers) - .try_init(); + .try_init() + .map_err(|e| anyhow!("Unable to initialize tracing subscriber: {e:?}"))?; tracing::info!("Tracing initialized with config: {config:?}"); @@ -151,7 +174,12 @@ fn build_resources(otlp_service_name: &str, version: &str) -> Vec { value.trim().to_string(), )) } - _ => None, + _ => { + eprint!( + "Invalid AGGLAYER_OTLP_TAGS entry: {tag_raw}. Expected format: key=value" + ); + None + } } }) .collect(); From c7a6641cb95205e14517a3d246d7b82e9ca602c2 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Wed, 4 Jun 2025 14:01:45 +0200 Subject: [PATCH 08/10] fix: review 2 --- crates/agglayer-telemetry/src/traces.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index eca6296c..86a78b43 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -163,8 +163,6 @@ fn build_resources(otlp_service_name: &str, version: &str) -> Vec { let custom_resources: Vec<_> = std::env::var("AGGLAYER_OTLP_TAGS") .unwrap_or_default() .split(',') - // NOTE: limit to 10 tags to avoid exploit - .take(10) .filter_map(|tag_raw| { let mut v = tag_raw.splitn(2, '='); match (v.next(), v.next()) { From 627f929fd5b2cb308e927e6309f55c1e4674de80 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Wed, 4 Jun 2025 18:10:56 +0200 Subject: [PATCH 09/10] fix: review 2 --- Cargo.toml | 2 +- crates/agglayer-telemetry/src/traces.rs | 70 +++++++++++++++---------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25fbba49..d9d8e579 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,9 +100,9 @@ opentelemetry = { version = "0.29.1", features = ["metrics"] } opentelemetry-otlp = { version = "0.29.0", features = ["trace", "grpc-tonic"]} opentelemetry-prometheus = "0.29.1" opentelemetry_sdk = { version = "0.29.0", features = ["metrics", "rt-tokio", "trace"] } -prometheus = "0.14.0" parking_lot = "0.12.3" pin-project = "1.1" +prometheus = "0.14.0" prost = "0.13.4" rand = "0.9.0" rstest = "0.22.0" diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index 86a78b43..c6973365 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -11,9 +11,6 @@ use opentelemetry_sdk::{ }; use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; -pub const OTLP_BATCH_SCHEDULED_DELAY: Duration = Duration::from_millis(5_000); -pub const OTLP_BATCH_MAX_QUEUE_SIZE: usize = 2048; -pub const OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE: usize = 512; pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> anyhow::Result<()> { let mut layers = Vec::new(); @@ -36,47 +33,61 @@ pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> .with_endpoint(otlp_agent) .build()?; - let batch_processor_config = BatchConfigBuilder::default() - .with_scheduled_delay(match std::env::var("OTLP_BATCH_SCHEDULED_DELAY") { - Ok(v) => { - if let Ok(millis) = v.parse::() { - Duration::from_millis(millis) - } else { - OTLP_BATCH_SCHEDULED_DELAY - } - } - _ => OTLP_BATCH_SCHEDULED_DELAY, - }) - .with_max_queue_size(match std::env::var("OTLP_BATCH_MAX_QUEUE_SIZE") { - Ok(v) => v.parse::().unwrap_or(OTLP_BATCH_MAX_QUEUE_SIZE), - _ => OTLP_BATCH_MAX_QUEUE_SIZE, - }) - .with_max_export_batch_size( - match std::env::var("OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE") { - Ok(v) => v - .parse::() - .unwrap_or(OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE), - _ => OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE, - }, - ); + let mut batch_processor_config = BatchConfigBuilder::default(); + + if let Ok(otlp_batch_scheduled_delay) = std::env::var("OTLP_BATCH_SCHEDULED_DELAY") { + if let Ok(millis) = otlp_batch_scheduled_delay.parse::() { + batch_processor_config = + batch_processor_config.with_scheduled_delay(Duration::from_millis(millis)); + } else { + eprintln!("Failed to parse OTLP_BATCH_SCHEDULED_DELAY, using default value"); + } + } + + if let Ok(otlp_batch_max_queue_size) = std::env::var("OTLP_BATCH_MAX_QUEUE_SIZE") { + if let Ok(size) = otlp_batch_max_queue_size.parse::() { + batch_processor_config = batch_processor_config.with_max_queue_size(size); + } else { + eprintln!("Failed to parse OTLP_BATCH_MAX_QUEUE_SIZE, using default value"); + } + } + + if let Ok(otlp_batch_max_exporter_batch_size) = + std::env::var("OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE") + { + if let Ok(size) = otlp_batch_max_exporter_batch_size.parse::() { + batch_processor_config = + batch_processor_config.with_max_export_batch_size(size); + } else { + eprintln!( + "Failed to parse OTLP_BATCH_MAX_EXPORTER_BATCH_SIZE, using default value" + ); + } + } let span_limits = { let mut span_limits = SpanLimits::default(); if let Ok(max_events) = std::env::var("OTLP_MAX_EVENTS_PER_SPAN") { if let Ok(value) = max_events.parse::() { span_limits.max_events_per_span = value; + } else { + eprintln!("Failed to parse OTLP_MAX_EVENTS_PER_SPAN"); } } if let Ok(max_attributes) = std::env::var("OTLP_MAX_ATTRIBUTES_PER_SPAN") { if let Ok(value) = max_attributes.parse::() { span_limits.max_attributes_per_span = value; + } else { + eprintln!("Failed to parse OTLP_MAX_ATTRIBUTES_PER_SPAN"); } } if let Ok(max_links_per_span) = std::env::var("OTLP_MAX_LINKS_PER_SPAN") { if let Ok(value) = max_links_per_span.parse::() { span_limits.max_links_per_span = value; + } else { + eprintln!("Failed to parse OTLP_MAX_LINKS_PER_SPAN"); } } @@ -84,18 +95,21 @@ pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> { if let Ok(value) = max_attributes_per_event.parse::() { span_limits.max_attributes_per_event = value; + } else { + eprintln!("Failed to parse OTLP_MAX_ATTRIBUTES_PER_EVENT"); } } if let Ok(max_attributes_per_link) = std::env::var("OTLP_MAX_ATTRIBUTES_PER_LINK") { if let Ok(value) = max_attributes_per_link.parse::() { span_limits.max_attributes_per_link = value; + } else { + eprintln!("Failed to parse OTLP_MAX_ATTRIBUTES_PER_LINK"); } } span_limits }; - // Ensure that the span limits are not too low let trace_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_sampler(Sampler::AlwaysOn) .with_span_limits(span_limits) @@ -173,7 +187,7 @@ fn build_resources(otlp_service_name: &str, version: &str) -> Vec { )) } _ => { - eprint!( + eprintln!( "Invalid AGGLAYER_OTLP_TAGS entry: {tag_raw}. Expected format: key=value" ); None From 5975e1ce04d32044c87c7a98184d2487e8dd7fd5 Mon Sep 17 00:00:00 2001 From: atanmarko Date: Wed, 4 Jun 2025 18:23:29 +0200 Subject: [PATCH 10/10] fix: format --- crates/agglayer-telemetry/src/traces.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/agglayer-telemetry/src/traces.rs b/crates/agglayer-telemetry/src/traces.rs index c6973365..eaccb619 100644 --- a/crates/agglayer-telemetry/src/traces.rs +++ b/crates/agglayer-telemetry/src/traces.rs @@ -11,7 +11,6 @@ use opentelemetry_sdk::{ }; use tracing_subscriber::{prelude::*, util::SubscriberInitExt, EnvFilter}; - pub fn setup_tracing(config: &agglayer_config::TracingConfig, version: &str) -> anyhow::Result<()> { let mut layers = Vec::new();