diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/default_handler.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/default_handler.rb new file mode 100644 index 000000000..ae6dbfaa8 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/default_handler.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +module OpenTelemetry + module Instrumentation + module ActiveJob + # Default handler to creates internal spans for events + class DefaultHandler + def initialize(tracer, mapper) + @tracer = tracer + @mapper = mapper + end + + def start(name, id, payload) + payload.merge!(__otel: on_start(name, id, payload)) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + def on_start(name, _id, payload) + span = @tracer.start_span(name, attributes: @mapper.call(payload)) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + + { span: span, ctx_tokens: tokens } + end + + def finish(_name, _id, payload) + otel = payload.delete(:__otel) + span = otel&.fetch(:span) + tokens = otel&.fetch(:ctx_tokens) + + exception = payload[:error] || payload[:exception_object] + on_exception(exception, span) if exception + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + ensure + finish_span(span, tokens) + end + + def finish_span(span, tokens) + # closes the span after all attributes have been finalized + begin + span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET + span&.finish + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + # pops the context stack + tokens&.reverse&.each do |token| + OpenTelemetry::Context.detach(token) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + end + + # Records exceptions on spans and sets Span statuses to `Error` + # + # Handled exceptions are recorded on internal spans related to the event. E.g. `discard` events are recorded on the `discard.active_job` span + # Handled exceptions _are not_ copied to the ingress span, but it does set the status to `Error` making it easier to know that a job has failed + # Unhandled exceptions bubble up to the ingress span and are recorded there. + # + # @param [Exception] exception to report as a Span Event + # @param [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status + def on_exception(exception, span) + status = OpenTelemetry::Trace::Status.error(exception.message) + OpenTelemetry::Instrumentation::ActiveJob.current_span.status = status + span&.record_exception(exception) + span&.status = status + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/enqueue_handler.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/enqueue_handler.rb new file mode 100644 index 000000000..c1057a489 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/enqueue_handler.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module OpenTelemetry + module Instrumentation + module ActiveJob + # Handles enqueue.active_job + class EnqueueHandler < DefaultHandler + def on_start(name, _id, payload) + otel_config = ActiveJob::Instrumentation.instance.config + span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" + span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) + tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] + OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire + { span: span, ctx_tokens: tokens } + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/perform_handler.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/perform_handler.rb new file mode 100644 index 000000000..85d7a30fd --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/perform_handler.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module OpenTelemetry + module Instrumentation + module ActiveJob + # Handles perform.active_job + class PerformHandler < DefaultHandler + def on_start(name, _id, payload) + tokens = [] + parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) + + span_name = span_name_from(payload) + + # TODO: Refactor into a propagation strategy + propagation_style = otel_config[:propagation_style] + if propagation_style == :child + tokens << OpenTelemetry::Context.attach(parent_context) + span = @tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload)) + else + span_context = OpenTelemetry::Trace.current_span(parent_context).context + links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link + span = @tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links) + end + + tokens.concat(attach_consumer_context(span)) + + { span: span, ctx_tokens: tokens } + end + + # This method attaches a span to multiple contexts: + # 1. Registers the ingress span as the top level ActiveJob span. + # This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error + # 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK. + # @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status + # @return [Array] Context tokens that must be detached when finished + def attach_consumer_context(span) + consumer_context = OpenTelemetry::Trace.context_with_span(span) + internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context) + + [consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) } + end + + # TODO: refactor into a strategy + def span_name_from(payload) + "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} process" + end + + def otel_config + ActiveJob::Instrumentation.instance.config + end + end + end + end +end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index b0d2ba42f..e42426a3e 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -1,5 +1,9 @@ # frozen_string_literal: true +require_relative 'default_handler' +require_relative 'enqueue_handler' +require_relative 'perform_handler' + module OpenTelemetry module Instrumentation module ActiveJob @@ -28,131 +32,6 @@ def call(payload) end end - # Default handler to creates internal spans for events - class DefaultHandler - def initialize(tracer, mapper) - @tracer = tracer - @mapper = mapper - end - - def start(name, id, payload) - payload.merge!(__otel: on_start(name, id, payload)) - rescue StandardError => e - OpenTelemetry.handle_error(exception: e) - end - - def on_start(name, _id, payload) - span = @tracer.start_span(name, attributes: @mapper.call(payload)) - tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] - - { span: span, ctx_tokens: tokens } - end - - def finish(_name, _id, payload) - otel = payload.delete(:__otel) - span = otel&.fetch(:span) - tokens = otel&.fetch(:ctx_tokens) - - exception = payload[:error] || payload[:exception_object] - on_exception(exception, span) if exception - rescue StandardError => e - OpenTelemetry.handle_error(exception: e) - ensure - finish_span(span, tokens) - end - - def finish_span(span, tokens) - # closes the span after all attributes have been finalized - begin - span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET - span&.finish - rescue StandardError => e - OpenTelemetry.handle_error(exception: e) - end - - # pops the context stack - tokens&.reverse&.each do |token| - OpenTelemetry::Context.detach(token) - rescue StandardError => e - OpenTelemetry.handle_error(exception: e) - end - end - - # Records exceptions on spans and sets Span statuses to `Error` - # - # Handled exceptions are recorded on internal spans related to the event. E.g. `discard` events are recorded on the `discard.active_job` span - # Handled exceptions _are not_ copied to the ingress span, but it does set the status to `Error` making it easier to know that a job has failed - # Unhandled exceptions bubble up to the ingress span and are recorded there. - # - # @param [Exception] exception to report as a Span Event - # @param [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status - def on_exception(exception, span) - status = OpenTelemetry::Trace::Status.error(exception.message) - OpenTelemetry::Instrumentation::ActiveJob.current_span.status = status - span&.record_exception(exception) - span&.status = status - end - end - - # Handles enqueue.active_job - class EnqueueHandler < DefaultHandler - def on_start(name, _id, payload) - otel_config = ActiveJob::Instrumentation.instance.config - span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" - span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload)) - tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] - OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire - { span: span, ctx_tokens: tokens } - end - end - - # Handles perform.active_job - class PerformHandler < DefaultHandler - def on_start(name, _id, payload) - tokens = [] - parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) - - span_name = span_name_from(payload) - - # TODO: Refactor into a propagation strategy - propagation_style = otel_config[:propagation_style] - if propagation_style == :child - tokens << OpenTelemetry::Context.attach(parent_context) - span = @tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload)) - else - span_context = OpenTelemetry::Trace.current_span(parent_context).context - links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link - span = @tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links) - end - - tokens.concat(attach_consumer_context(span)) - - { span: span, ctx_tokens: tokens } - end - - # This method attaches a span to multiple contexts: - # 1. Registers the ingress span as the top level ActiveJob span. - # This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error - # 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK. - # @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status - # @return [Array] Context tokens that must be detached when finished - def attach_consumer_context(span) - consumer_context = OpenTelemetry::Trace.context_with_span(span) - internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context) - - [consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) } - end - - # TODO: refactor into a strategy - def span_name_from(payload) - "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} process" - end - - def otel_config - ActiveJob::Instrumentation.instance.config - end - end - # Custom subscriber that handles ActiveJob notifications class Subscriber def self.install