diff --git a/core/Cargo.toml b/core/Cargo.toml index b85a16b17..a88797f9d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -67,6 +67,7 @@ tokio-util = { version = "0.7", features = ["io", "io-util"] } tokio-stream = "0.1" tonic = { workspace = true, features = ["tls", "tls-roots"] } tracing = "0.1" +tracing-opentelemetry = "0.25" tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter", "registry"] } url = "2.2" uuid = { version = "1.1", features = ["v4"] } diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index e6886618c..ef69fb4cd 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -11,6 +11,16 @@ mod prometheus_server; #[cfg(feature = "otel")] pub use metrics::{default_buckets_for, MetricsCallBuffer}; #[cfg(feature = "otel")] +use opentelemetry::{ + self, + trace::{SpanKind, Tracer, TracerProvider}, + KeyValue, +}; +use opentelemetry_otlp::WithExportConfig; +#[cfg(feature = "otel")] +use opentelemetry_sdk; +use otel::default_resource_instance; +#[cfg(feature = "otel")] pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer}; @@ -166,6 +176,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result Result { } } -fn default_resource_instance() -> &'static Resource { +pub(crate) fn default_resource_instance() -> &'static Resource { use once_cell::sync::OnceCell; static INSTANCE: OnceCell = OnceCell::new(); diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 0e7907e1c..9cef88a0f 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1,5 +1,11 @@ mod local_acts; +use opentelemetry::{ + self, + trace::{SpanKind, Tracer}, + KeyValue, +}; + use super::{ cancel_external_state_machine::new_external_cancel, cancel_workflow_state_machine::cancel_workflow, @@ -526,6 +532,13 @@ impl WorkflowMachines { /// Apply the next (unapplied) entire workflow task from history to these machines. Will replay /// any events that need to be replayed until caught up to the newest WFT. pub(crate) fn apply_next_wft_from_history(&mut self) -> Result { + let tracer = opentelemetry::global::tracer("apply_next_wft_from_history-tracer"); + let _span = tracer + .span_builder("apply_next_wft_from_history") + .with_kind(SpanKind::Server) + .with_attributes([KeyValue::new("temporal.worker", true)]) + .start(&tracer); + // If we have already seen the terminal event for the entire workflow in a previous WFT, // then we don't need to do anything here, and in fact we need to avoid re-applying the // final WFT. diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 914025ff7..f46a98471 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -20,6 +20,7 @@ use crate::{ MetricsContext, }; use futures_util::future::AbortHandle; +use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue}; use std::{ collections::HashSet, mem, @@ -370,6 +371,13 @@ impl ManagedRun { used_flags: Vec, resp_chan: Option>, ) -> Result { + let tracer = opentelemetry::global::tracer("successful_completion-tracer"); + let _span = tracer + .span_builder("successful_completion") + .with_kind(SpanKind::Server) + .with_attributes([KeyValue::new("temporal.worker", true)]) + .start(&tracer); + let activation_was_only_eviction = self.activation_is_eviction(); let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() { ( diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index e9f371d1b..afc794fb1 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -9,6 +9,7 @@ use crate::{ MetricsContext, }; use futures::{stream, stream::PollNext, Stream, StreamExt}; +use opentelemetry::{trace::SpanKind, trace::Tracer, KeyValue}; use std::{collections::VecDeque, fmt::Debug, future, sync::Arc}; use temporal_sdk_core_api::errors::PollWfError; use temporal_sdk_core_protos::coresdk::workflow_activation::remove_from_cache::EvictionReason; @@ -252,6 +253,13 @@ impl WFStream { } fn process_completion(&mut self, complete: NewOrFetchedComplete) -> Vec { + let tracer = opentelemetry::global::tracer("process_completion-tracer"); + let _span = tracer + .span_builder("process_completion") + .with_kind(SpanKind::Server) + .with_attributes([KeyValue::new("temporal.worker", true)]) + .start(&tracer); + let rh = if let Some(rh) = self.runs.get_mut(complete.run_id()) { rh } else {