Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable tracing for development #832

Draft
wants to merge 1 commit into
base: tracing-base
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tracing = "0.1"
tracing-opentelemetry = "0.25"
tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter", "registry"] }
url = "2.2"
uuid = { version = "1.1", features = ["v4"] }
Expand Down
56 changes: 55 additions & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ mod prometheus_server;
#[cfg(feature = "otel")]
pub use metrics::{default_buckets_for, MetricsCallBuffer};
#[cfg(feature = "otel")]
use opentelemetry::{
self,
trace::{SpanKind, Tracer, TracerProvider},
KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
#[cfg(feature = "otel")]
use opentelemetry_sdk;
use otel::default_resource_instance;
#[cfg(feature = "otel")]
pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter};

pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};
Expand Down Expand Up @@ -166,6 +176,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
let mut console_pretty_layer = None;
let mut console_compact_layer = None;
let mut forward_layer = None;
let mut export_layer = None;
// ===================================

let tracing_sub = opts.logging.map(|logger| {
Expand Down Expand Up @@ -207,10 +218,53 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
Some(CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)));
}
};

let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("telemetry")
.worker_threads(2)
.enable_all()
.build()
.unwrap();

// create otel export layer
runtime.block_on(async {
let tracer_cfg = opentelemetry_sdk::trace::Config::default()
.with_resource(default_resource_instance().clone());
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("grpc://localhost:4317".to_string()),
// .with_metadata(MetadataMap::from_headers(headers.try_into()?)),
)
.with_trace_config(tracer_cfg)
// Using install_simple instead for now because install_batch is not producing spans and is emitting this error message:
// OpenTelemetry trace error occurred. cannot send message to batch processor as the channel is closed
// .install_batch(opentelemetry_sdk::runtime::Tokio)
.install_simple()
.unwrap();
opentelemetry::global::set_tracer_provider(provider.clone());

let tracer = provider.tracer_builder("sdk-core").build();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
// .with_filter(EnvFilter::new(&tracing.filter))
export_layer = Some(opentelemetry);

let tracer = provider.tracer("sdk-core");

let _span = tracer
.span_builder("telemetry_init")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);
});
Copy link
Contributor Author

@dandavison dandavison Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This span is being emitted, but the ones in workflow_machines.rs, managed_run.rs, workflow_stream.rs aren't; not sure why yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing this has to do with separate runtimes.

I can't remember exactly how I had this working with a separate one previously. Might need to review those diffs some more

Comment on lines +222 to +261
Copy link
Member

@Sushisource Sushisource Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all definitely needs to be protected behind a flag. (Which I guess you mentioned hehe)


let reg = tracing_subscriber::registry()
.with(console_pretty_layer)
.with(console_compact_layer)
.with(forward_layer);
.with(forward_layer)
.with(export_layer);

#[cfg(feature = "tokio-console")]
let reg = reg.with(console_subscriber::spawn());
Expand Down
2 changes: 1 addition & 1 deletion core/src/telemetry/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl GaugeF64 for MemoryGauge<f64> {
}
}

fn default_resource_instance() -> &'static Resource {
pub(crate) fn default_resource_instance() -> &'static Resource {
use once_cell::sync::OnceCell;

static INSTANCE: OnceCell<Resource> = OnceCell::new();
Expand Down
13 changes: 13 additions & 0 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
mod local_acts;

use opentelemetry::{
self,
trace::{SpanKind, Tracer},
KeyValue,
};

use super::{
cancel_external_state_machine::new_external_cancel,
cancel_workflow_state_machine::cancel_workflow,
Expand Down Expand Up @@ -526,6 +532,13 @@ impl WorkflowMachines {
/// Apply the next (unapplied) entire workflow task from history to these machines. Will replay
/// any events that need to be replayed until caught up to the newest WFT.
pub(crate) fn apply_next_wft_from_history(&mut self) -> Result<usize> {
let tracer = opentelemetry::global::tracer("apply_next_wft_from_history-tracer");
let _span = tracer
.span_builder("apply_next_wft_from_history")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This span isn't being emitted; I'm not sure why currently.


// If we have already seen the terminal event for the entire workflow in a previous WFT,
// then we don't need to do anything here, and in fact we need to avoid re-applying the
// final WFT.
Expand Down
8 changes: 8 additions & 0 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
MetricsContext,
};
use futures_util::future::AbortHandle;
use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue};
use std::{
collections::HashSet,
mem,
Expand Down Expand Up @@ -370,6 +371,13 @@ impl ManagedRun {
used_flags: Vec<u32>,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
) -> Result<RunUpdateAct, NextPageReq> {
let tracer = opentelemetry::global::tracer("successful_completion-tracer");
let _span = tracer
.span_builder("successful_completion")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This span isn't being emitted; I'm not sure why currently.


let activation_was_only_eviction = self.activation_is_eviction();
let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() {
(
Expand Down
8 changes: 8 additions & 0 deletions core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
MetricsContext,
};
use futures::{stream, stream::PollNext, Stream, StreamExt};
use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue};
use std::{collections::VecDeque, fmt::Debug, future, sync::Arc};
use temporal_sdk_core_api::errors::PollWfError;
use temporal_sdk_core_protos::coresdk::workflow_activation::remove_from_cache::EvictionReason;
Expand Down Expand Up @@ -252,6 +253,13 @@ impl WFStream {
}

fn process_completion(&mut self, complete: NewOrFetchedComplete) -> Vec<ActivationOrAuto> {
let tracer = opentelemetry::global::tracer("process_completion-tracer");
let _span = tracer
.span_builder("process_completion")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("temporal.worker", true)])
.start(&tracer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This span isn't being emitted; I'm not sure why currently.


let rh = if let Some(rh) = self.runs.get_mut(complete.run_id()) {
rh
} else {
Expand Down
Loading