Skip to content

Commit

Permalink
Adapt code for newer otlp version
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc committed Jul 26, 2024
1 parent 701bd87 commit badd45d
Show file tree
Hide file tree
Showing 15 changed files with 523 additions and 519 deletions.
341 changes: 180 additions & 161 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ num = "0.4.0"
num_cpus = "1.13"
num_enum = "0.7.2"
once_cell = "1"
opentelemetry = "0.20.0"
opentelemetry-otlp = "0.13.0"
opentelemetry-semantic-conventions = "0.12.0"
opentelemetry = "0.24.0"
opentelemetry_sdk = "0.24.0"
opentelemetry-otlp = "0.17.0"
opentelemetry-semantic-conventions = "0.16.0"
pin-project-lite = "0.2.13"
pretty_assertions = "1"
prost = "0.12.1"
Expand Down Expand Up @@ -179,7 +180,7 @@ tower = "0.4.13"
tower-http = "0.5.2"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.21.0"
tracing-opentelemetry = "0.25.0"
time = "0.3.36" # Has to be same as used by `tracing-subscriber`
url = "2"
web3 = "0.19.0"
Expand Down
4 changes: 3 additions & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ async fn main() -> anyhow::Result<()> {
if !opt.enable_consensus {
config.consensus = None;
}
// Note: when old code will be removed, observability must be build within
// tokio context.
let _guard = config.observability.build_observability()?;

// Build L1 and L2 clients.
Expand Down Expand Up @@ -856,7 +858,7 @@ async fn main() -> anyhow::Result<()> {
// We run the node from a different thread, since the current thread is in tokio context.
std::thread::spawn(move || {
let node =
ExternalNodeBuilder::new(config).build(opt.components.0.into_iter().collect())?;
ExternalNodeBuilder::new(config)?.build(opt.components.0.into_iter().collect())?;
node.run()?;
anyhow::Ok(())
})
Expand Down
10 changes: 5 additions & 5 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ pub(crate) struct ExternalNodeBuilder {
}

impl ExternalNodeBuilder {
pub fn new(config: ExternalNodeConfig) -> Self {
Self {
node: ZkStackServiceBuilder::new(),
pub fn new(config: ExternalNodeConfig) -> anyhow::Result<Self> {
Ok(Self {
node: ZkStackServiceBuilder::new().context("Cannot create ZkStackServiceBuilder")?,
config,
}
})
}

fn add_sigint_handler_layer(mut self) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -587,7 +587,7 @@ impl ExternalNodeBuilder {
}
}

Ok(self.node.build()?)
Ok(self.node.build())
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/bin/external_node/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn external_node_basics(components_str: &'static str) {

let node_handle = tokio::task::spawn_blocking(move || {
std::thread::spawn(move || {
let mut node = ExternalNodeBuilder::new(env.config);
let mut node = ExternalNodeBuilder::new(env.config)?;
inject_test_layers(
&mut node,
env.sigint_receiver,
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn node_reacts_to_stop_signal_during_initial_reorg_detection() {

let mut node_handle = tokio::task::spawn_blocking(move || {
std::thread::spawn(move || {
let mut node = ExternalNodeBuilder::new(env.config);
let mut node = ExternalNodeBuilder::new(env.config)?;
inject_test_layers(
&mut node,
env.sigint_receiver,
Expand Down Expand Up @@ -133,7 +133,7 @@ async fn running_tree_without_core_is_not_allowed() {

let node_handle = tokio::task::spawn_blocking(move || {
std::thread::spawn(move || {
let mut node = ExternalNodeBuilder::new(env.config);
let mut node = ExternalNodeBuilder::new(env.config)?;
inject_test_layers(
&mut node,
env.sigint_receiver,
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn running_tree_api_without_tree_is_not_allowed() {

let node_handle = tokio::task::spawn_blocking(move || {
std::thread::spawn(move || {
let mut node = ExternalNodeBuilder::new(env.config);
let mut node = ExternalNodeBuilder::new(env.config)?;
inject_test_layers(
&mut node,
env.sigint_receiver,
Expand Down
18 changes: 11 additions & 7 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,6 @@ fn main() -> anyhow::Result<()> {
}
};

let observability_config = configs
.observability
.clone()
.context("observability config")?;
let _observability_guard = observability_config.install()?;

let wallets = match opt.wallets_path {
None => tmp_config.wallets(),
Some(path) => {
Expand Down Expand Up @@ -162,8 +156,18 @@ fn main() -> anyhow::Result<()> {
.context("failed decoding genesis YAML config")?
}
};
let observability_config = configs
.observability
.clone()
.context("observability config")?;

let node = MainNodeBuilder::new(configs, wallets, genesis, contracts_config, secrets);
let node = MainNodeBuilder::new(configs, wallets, genesis, contracts_config, secrets)?;

let _observability_guard = {
// Observability initialization should be performed within tokio context.
let _context_guard = node.runtime_handle().enter();
observability_config.install()?
};

if opt.genesis {
// If genesis is requested, we don't need to run the node.
Expand Down
16 changes: 10 additions & 6 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,19 @@ impl MainNodeBuilder {
genesis_config: GenesisConfig,
contracts_config: ContractsConfig,
secrets: Secrets,
) -> Self {
Self {
node: ZkStackServiceBuilder::new(),
) -> anyhow::Result<Self> {
Ok(Self {
node: ZkStackServiceBuilder::new().context("Cannot create ZkStackServiceBuilder")?,
configs,
wallets,
genesis_config,
contracts_config,
secrets,
}
})
}

pub fn runtime_handle(&self) -> tokio::runtime::Handle {
self.node.runtime_handle()
}

fn add_sigint_handler_layer(mut self) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -589,7 +593,7 @@ impl MainNodeBuilder {
.add_query_eth_client_layer()?
.add_storage_initialization_layer(LayerKind::Task)?;

Ok(self.node.build()?)
Ok(self.node.build())
}

/// Builds the node with the specified components.
Expand Down Expand Up @@ -701,7 +705,7 @@ impl MainNodeBuilder {
}
}
}
Ok(self.node.build()?)
Ok(self.node.build())
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/bin/zksync_tee_prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ fn main() -> anyhow::Result<()> {

let prometheus_config = PrometheusConfig::from_env()?;

let mut builder = ZkStackServiceBuilder::new();
let mut builder_mut = builder
let mut builder = ZkStackServiceBuilder::new()?;
builder
.add_layer(SigintHandlerLayer)
.add_layer(TeeProverLayer::new(
tee_prover_config.api_url,
Expand All @@ -50,9 +50,9 @@ fn main() -> anyhow::Result<()> {
if let Some(gateway) = prometheus_config.gateway_endpoint() {
let exporter_config =
PrometheusExporterConfig::push(gateway, prometheus_config.push_interval());
builder_mut = builder_mut.add_layer(PrometheusExporterLayer(exporter_config));
builder.add_layer(PrometheusExporterLayer(exporter_config));
}

builder_mut.build()?.run()?;
builder.build().run()?;
Ok(())
}
4 changes: 3 additions & 1 deletion core/lib/vlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ thiserror.workspace = true
sentry.workspace = true
serde.workspace = true
serde_json.workspace = true
opentelemetry = { workspace = true, features = ["rt-tokio-current-thread", "trace"] }
opentelemetry = { workspace = true, features = ["trace"] }
opentelemetry_sdk = { workspace = true, features = [ "rt-tokio" ] }
opentelemetry-otlp = { workspace = true, features = [
"http-proto",
"reqwest-client",
] }
opentelemetry-semantic-conventions.workspace = true
vise.workspace = true
vise-exporter.workspace = true
url.workspace = true
91 changes: 47 additions & 44 deletions core/lib/vlog/src/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::str::FromStr;

use opentelemetry::{
sdk::{
propagation::TraceContextPropagator,
trace::{self, RandomIdGenerator, Sampler},
Resource,
},
KeyValue,
};
use opentelemetry::{trace::TracerProvider, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
propagation::TraceContextPropagator,
trace::{RandomIdGenerator, Sampler},
Resource,
};
use opentelemetry_semantic_conventions::resource::{
K8S_NAMESPACE_NAME, K8S_POD_NAME, SERVICE_NAME,
};
use tracing_subscriber::{registry::LookupSpan, EnvFilter, Layer};
use url::Url;

/// Information about the service.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -66,17 +65,17 @@ impl ServiceDescriptor {
}

fn into_otlp_resource(self) -> Resource {
let mut resource = vec![];
let mut attributes = vec![];
if let Some(pod_name) = self.k8s_pod_name {
resource.push(KeyValue::new(K8S_POD_NAME, pod_name));
attributes.push(KeyValue::new(K8S_POD_NAME, pod_name));
}
if let Some(pod_namespace) = self.k8s_namespace_name {
resource.push(KeyValue::new(K8S_NAMESPACE_NAME, pod_namespace));
attributes.push(KeyValue::new(K8S_NAMESPACE_NAME, pod_namespace));
}
if let Some(service_name) = self.service_name {
resource.push(KeyValue::new(SERVICE_NAME, service_name));
attributes.push(KeyValue::new(SERVICE_NAME, service_name));
}
Resource::new(resource)
Resource::new(attributes)
}
}

Expand All @@ -85,7 +84,7 @@ pub struct OpenTelemetry {
/// Enables export of span data of specified level (and above) using opentelemetry exporters.
pub opentelemetry_level: OpenTelemetryLevel,
/// Opentelemetry HTTP collector endpoint.
pub otlp_endpoint: String,
pub otlp_endpoint: Url,
/// Information about service
pub service: Option<ServiceDescriptor>,
}
Expand All @@ -94,10 +93,12 @@ impl OpenTelemetry {
pub fn new(
opentelemetry_level: &str,
otlp_endpoint: String,
) -> Result<Self, OpenTelemetryLevelError> {
) -> Result<Self, OpenTelemetryLayerError> {
Ok(Self {
opentelemetry_level: opentelemetry_level.parse()?,
otlp_endpoint,
otlp_endpoint: otlp_endpoint
.parse()
.map_err(|e| OpenTelemetryLayerError::InvalidUrl(otlp_endpoint, e))?,
service: None,
})
}
Expand All @@ -123,31 +124,31 @@ impl OpenTelemetry {
.add_directive("otel::tracing=trace".parse().unwrap())
.add_directive("otel=debug".parse().unwrap());

let resource = self
.service
.unwrap_or_default()
.fill_from_env()
.into_otlp_resource();

// We can't know if we will be running within tokio context, so we will spawn
// a separate thread for the exporter.
let runtime = opentelemetry::runtime::TokioCurrentThread;

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(self.otlp_endpoint),
)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource),
)
.install_batch(runtime)
.unwrap();
let service = self.service.unwrap_or_default().fill_from_env();
let service_name = service
.service_name
.clone()
.unwrap_or_else(|| "zksync_vlog".to_string());
let resource = service.into_otlp_resource();

let exporter = opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(self.otlp_endpoint)
.build_span_exporter()
.expect("Failed to create OTLP exporter"); // URL is validated.

let config = opentelemetry_sdk::trace::Config::default()
.with_id_generator(RandomIdGenerator::default())
.with_sampler(Sampler::AlwaysOn)
.with_resource(resource);

let provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(config)
.build();

// TODO: Version and other metadata
let tracer = provider.tracer_builder(service_name).build();

opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
tracing_opentelemetry::layer()
Expand All @@ -168,21 +169,23 @@ pub enum OpenTelemetryLevel {

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum OpenTelemetryLevelError {
pub enum OpenTelemetryLayerError {
#[error("Invalid OpenTelemetry level format")]
InvalidFormat,
#[error("Invalid URL: \"{0}\" - {1}")]
InvalidUrl(String, url::ParseError),
}

impl FromStr for OpenTelemetryLevel {
type Err = OpenTelemetryLevelError;
type Err = OpenTelemetryLayerError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"off" => Ok(OpenTelemetryLevel::OFF),
"info" => Ok(OpenTelemetryLevel::INFO),
"debug" => Ok(OpenTelemetryLevel::DEBUG),
"trace" => Ok(OpenTelemetryLevel::TRACE),
_ => Err(OpenTelemetryLevelError::InvalidFormat),
_ => Err(OpenTelemetryLayerError::InvalidFormat),
}
}
}
10 changes: 5 additions & 5 deletions core/node/node_framework/examples/showcase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ impl WiringLayer for TasksLayer {
}

fn main() -> anyhow::Result<()> {
ZkStackServiceBuilder::new()
.add_layer(DatabaseLayer)
.add_layer(TasksLayer)
.build()?
.run()?;
let mut builder = ZkStackServiceBuilder::new()?;

builder.add_layer(DatabaseLayer).add_layer(TasksLayer);

builder.build().run()?;
Ok(())
}
Loading

0 comments on commit badd45d

Please sign in to comment.