diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 31b54678e8..1f54769dca 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -12,10 +12,29 @@ module OpenTelemetry module Instrumentation module ActiveJob - # Custom subscriber that handles ActiveJob notifications + # Module that contains custom event handlers, which are used to generate spans per event module Handlers module_function + # Subscribes Event Handlers to relevant ActiveJob notifications + # + # The following events are recorded as spans: + # - enqueue + # - enqueue_at + # - enqueue_retry + # - perform + # - retry_stopped + # - discard + # + # Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span, + # while internal spans keep their ActiveSupport event name. + # + # @note this method is not thread safe and should not be used in a multi-threaded context + # @note Why no perform_start? + # This event causes much heartache as it is the first in a series of events that is triggered. + # It should not be the ingress span because it does not measure anything. + # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 + # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 def install return unless Array(@subscriptions).empty? @@ -26,11 +45,6 @@ def install enqueue_handler = Handlers::Enqueue.new(tracer, mapper) perform_handler = Handlers::Perform.new(tracer, mapper) - # Why no perform_start? - # This event causes much heartache as it is the first in a series of events that is triggered. - # It should not be the ingress span because it does not measure anything. - # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 - # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 handlers_by_pattern = { 'enqueue' => enqueue_handler, 'enqueue_at' => enqueue_handler, @@ -45,6 +59,8 @@ def install end end + # Removes Event Handler Subscriptions for ActiveJob notifications + # @note this method is not thread safe and sholud not be used in a multi-threaded context def uninstall @subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) } @subscriptions = nil diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb index ad1c7bb8c6..130456311c 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/default.rb @@ -9,25 +9,45 @@ module Instrumentation module ActiveJob module Handlers # Default handler to creates internal spans for events + # This class provides default template methods that derived classes may override to generate spans and register contexts. class Default def initialize(tracer, mapper) @tracer = tracer @mapper = mapper end + # Invoked by ActiveSupport::Notifications at the start of the instrumentation block + # It amends the otel context of a Span and Context tokens to the payload + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] the payload passed as a method argument def start(name, id, payload) payload.merge!(__otel: on_start(name, id, payload)) rescue StandardError => e OpenTelemetry.handle_error(exception: e) end - def on_start(name, _id, payload) + # Creates a span and registers it with the current context + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) span = @tracer.start_span(name, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] { span: span, ctx_tokens: tokens } end + # Creates a span and registers it with the current context + # + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens def finish(_name, _id, payload) otel = payload.delete(:__otel) span = otel&.fetch(:span) @@ -41,6 +61,9 @@ def finish(_name, _id, payload) finish_span(span, tokens) end + # Finishes the provided spans and also detaches the associated contexts + # @param span [OpenTelemetry::Trace::Span] + # @param tokens [Array] to unregister def finish_span(span, tokens) # closes the span after all attributes have been finalized begin diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb index a4831adc94..8e285db059 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/enqueue.rb @@ -10,7 +10,7 @@ module ActiveJob module Handlers # Handles enqueue.active_job class Enqueue < Default - def on_start(name, _id, payload) + def start_span(name, _id, payload) otel_config = ActiveJob::Instrumentation.instance.config span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb index 4da52fc41b..b54d75b387 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -10,7 +10,7 @@ module ActiveJob module Handlers # Handles perform.active_job class Perform < Default - def on_start(name, _id, payload) + def start_span(name, _id, payload) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)