diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index 686a06387..69bef5e11 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -49,6 +49,42 @@ def deserialize(job_data) module OpenTelemetry module Instrumentation module ActiveJob + class EnqueueSubscriber + def on_start(name, _id, payload, subscriber) + span = subscriber.tracer.start_span("#{payload.fetch(:job).queue_name} publish", + kind: :producer, + attributes: subscriber.job_attributes(payload.fetch(:job))) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire + { span: span, ctx_tokens: tokens } + end + end + + class PerformSubscriber + def on_start(name, _id, payload, subscriber) + tokens = [] + parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) + span_context = OpenTelemetry::Trace.current_span(parent_context).context + + if span_context.valid? + tokens << OpenTelemetry::Context.attach(parent_context) + links = [OpenTelemetry::Trace::Link.new(span_context)] + end + + span = subscriber.tracer.start_span( + "#{payload.fetch(:job).queue_name} process", + kind: :consumer, + attributes: subscriber.job_attributes(payload.fetch(:job)), + links: links + ) + + tokens << OpenTelemetry::Context.attach( + OpenTelemetry::Trace.context_with_span(span) + ) + + { span: span, ctx_tokens: tokens } + end + end class GenericSubscriber < ::ActiveSupport::Subscriber TEST_ADAPTERS = %w[async inline] @@ -65,7 +101,7 @@ def perform(...);end def start(name, id, payload) begin - payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload)) # The payload is _not_ transmitted over the wire + payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire rescue StandardError => error OpenTelemetry.handle_error(exception: error) end @@ -106,8 +142,6 @@ def on_start(name, _id, payload) { span: span, ctx_tokens: tokens } end - private - def job_attributes(job) otel_attributes = { 'code.namespace' => job.class.name, @@ -129,43 +163,6 @@ def tracer OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1') end end - - class EnqueueSubscriber - def on_start(name, _id, payload) - span = tracer.start_span("#{payload.fetch(:job).queue_name} publish", - kind: :producer, - attributes: job_attributes(payload.fetch(:job))) - tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] - OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire - { span: span, ctx_tokens: tokens } - end - end - - class PerformSubscriber - def on_start(name, _id, payload) - tokens = [] - parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) - span_context = OpenTelemetry::Trace.current_span(parent_context).context - - if span_context.valid? - tokens << OpenTelemetry::Context.attach(parent_context) - links = [OpenTelemetry::Trace::Link.new(span_context)] - end - - span = tracer.start_span( - "#{payload.fetch(:job).queue_name} process", - kind: :consumer, - attributes: job_attributes(payload.fetch(:job)), - links: links - ) - - tokens << OpenTelemetry::Context.attach( - OpenTelemetry::Trace.context_with_span(span) - ) - - { span: span, ctx_tokens: tokens } - end - end end end end