diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index 979a564e0..036d5f760 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -5,7 +5,9 @@ module OpenTelemetry module Instrumentation module ActiveJob - # Provides helper methods + # Maps ActiveJob Attributes to Semantic Conventions + # + # Some of the more promenant attributes will come from # class AttributeMapper TEST_ADAPTERS = %w[async inline].freeze @@ -39,12 +41,16 @@ def initialize(tracer, mapper) @mapper = mapper end - def start(name, _id, payload) + def start(name, id, payload) + payload.merge!(__otel: on_start(name, id, payload)) + end + + def on_start(name, _id, payload) span = @tracer.start_span(name, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire - payload.merge!(__otel: { span: span, ctx_tokens: tokens }) + { span: span, ctx_tokens: tokens } end def finish(_name, _id, payload) @@ -83,19 +89,19 @@ def finish(_name, _id, payload) # Handles enqueue.active_job class EnqueueHandler < DefaultHandler - def start(name, _id, payload) + def on_start(name, _id, payload) otel_config = ActiveJob::Instrumentation.instance.config span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire - payload.merge!(__otel: { span: span, ctx_tokens: tokens }) + { span: span, ctx_tokens: tokens } end end # Handles perform.active_job class PerformHandler < DefaultHandler - def start(name, _id, payload) + def on_start(name, _id, payload) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) span_context = OpenTelemetry::Trace.current_span(parent_context).context @@ -117,68 +123,44 @@ def start(name, _id, payload) tokens.concat([consumer_context, aj_context].map { |context| OpenTelemetry::Context.attach(context) }) - payload.merge!(__otel: { span: span, ctx_tokens: tokens }) + { span: span, ctx_tokens: tokens } end end # Custom subscriber that handles ActiveJob notifications - class Subscriber < ::ActiveSupport::Subscriber - attr_reader :tracer + class Subscriber + def self.install + return unless Array(@subscriptions).empty? - def initialize(...) - super tracer = Instrumentation.instance.tracer mapper = AttributeMapper.new + default_handler = DefaultHandler.new(tracer, mapper) enqueue_handler = EnqueueHandler.new(tracer, mapper) - - @handlers_by_pattern = { - 'enqueue.active_job' => enqueue_handler, - 'enqueue_at.active_job' => enqueue_handler, - 'perform.active_job' => PerformHandler.new(tracer, mapper) + perform_handler = PerformHandler.new(tracer, mapper) + + # Why no perform_start? + # This event causes much heartache as it is the first in a series of events that is triggered. + # It should not be the ingress span because it does not measure anything. + # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 + # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 + handlers_by_pattern = { + 'enqueue' => enqueue_handler, + 'enqueue_at' => enqueue_handler, + 'enqueue_retry' => default_handler, + 'perform' => perform_handler, + 'retry_stopped' => default_handler, + 'discard' => default_handler } - @handlers_by_pattern.default = default_handler - @call_super if ::ActiveJob.version < Gem::Version.new('7.1') - end - - # The methods below are the events the Subscriber is interested in. - def enqueue_at(...); end - def enqueue(...); end - def enqueue_retry(...); end - # This event causes much heartache as it is the first in a series of events that is triggered. - # It should not be the ingress span because it does not measure anything. - # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 - # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 - # def perform_start(...); end - def perform(...); end - def retry_stopped(...); end - # def discard(...); end - def start(name, id, payload) - @handlers_by_pattern[name].start(name, id, payload) - # This is nuts - super if @call_super - end - - def finish(name, id, payload) - @handlers_by_pattern[name].finish(name, id, payload) - # This is equally nuts - super if @call_super - end - - def self.install - attach_to :active_job - tracer = Instrumentation.instance.tracer - mapper = AttributeMapper.new - default_handler = DefaultHandler.new(tracer, mapper) - @subscriptions = %w[discard.active_job].map do |key| - ActiveSupport::Notifications.subscribe(key, default_handler) + @subscriptions = handlers_by_pattern.map do |key, handler| + ActiveSupport::Notifications.subscribe("#{key}.active_job", handler) end end def self.uninstall - detach_from :active_job @subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) } + @subscriptions = nil end end end