diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index bb516152d..f8d7ab57c 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -50,7 +50,20 @@ module OpenTelemetry module Instrumentation module ActiveJob - class EnqueueSubscriber + class DefaultHandler + def initialize(tracer) + @tracer = tracer + end + + def on_start(name, _id, payload, subscriber) + span = @tracer.start_span(name, attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job))) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire + { span: span, ctx_tokens: tokens } + end + end + + class EnqueueHandler def initialize(tracer) @tracer = tracer end @@ -65,7 +78,7 @@ def on_start(name, _id, payload, subscriber) end end - class PerformSubscriber + class PerformHandler def initialize(tracer) @tracer = tracer end @@ -101,19 +114,22 @@ class Subscriber < ::ActiveSupport::Subscriber def initialize(...) super @tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1') + default_handler = DefaultHandler.new(@tracer) @handlers_by_pattern = { - 'enqueue.active_job' => EnqueueSubscriber.new(@tracer), - 'perform.active_job' => PerformSubscriber.new(@tracer), + 'enqueue.active_job' => EnqueueHandler.new(@tracer), + 'perform.active_job' => PerformHandler.new(@tracer), } + @handlers_by_pattern.default = default_handler end # The methods below are the events the Subscriber is interested in. def enqueue(...); end + def perform_start(...); end def perform(...);end def start(name, id, payload) begin - payload.merge!(__otel: @handlers_by_pattern.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire + payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire rescue StandardError => error OpenTelemetry.handle_error(exception: error) end @@ -147,12 +163,6 @@ def finish(_name, _id, payload) end end - def on_start(name, _id, payload) - span = tracer.start_span(name, attributes: as_otel_semconv_attrs(payload.fetch(:job))) - tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] - OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire - { span: span, ctx_tokens: tokens } - end def as_otel_semconv_attrs(job) test_adapters = %w[async inline]