From 209aaf961e67597795900c4131c6c54a0aad6329 Mon Sep 17 00:00:00 2001 From: Ariel Valentin Date: Sat, 30 Sep 2023 10:54:10 -0500 Subject: [PATCH] squash: Add old callbacks in --- .../active_job/instrumentation.rb | 27 ++++-- .../active_job/{ => latest}/patches/base.rb | 0 .../active_job/{ => latest}/subscriber.rb | 32 +++++-- .../v6/patches/active_job_callbacks.rb | 96 +++++++++++++++++++ .../active_job/patches/base_test.rb | 2 +- .../active_job/subscriber_test.rb | 4 +- 6 files changed, 143 insertions(+), 18 deletions(-) rename instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/{ => latest}/patches/base.rb (100%) rename instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/{ => latest}/subscriber.rb (74%) create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb index 8b721ec01f..d687c1fb83 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb @@ -12,7 +12,6 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base MINIMUM_VERSION = Gem::Version.new('6.0.0') install do |_config| - require_dependencies patch_activejob end @@ -62,15 +61,29 @@ def gem_version ::ActiveJob.version end - def require_dependencies - require_relative 'patches/base' - require_relative 'subscriber' - end - def patch_activejob + require_relative 'latest/patches/base' ::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base) - Subscriber.install + if Gem::Requirement.create('> 7').satisfied_by?(::ActiveJob.version) + require_relative 'latest/subscriber' + Subscriber.install + else + # Rails 6.x intruments code quite differently than Rails 7 + # + # `perform_start` is the first in a series of events that is triggered, but it does not measure the underlying `perform` block, + # therefore is not suitable for being an ingress span. + # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 + # + # Although in Rails 6.1 the `perform` block does indeed measure the job's perform method, + # It _does not_ instrument any of the before or after callbacks, which I think is something of interest to our end users. + # + # I will follow up with a specialized Rails 6.1 subscriber that measures all other events but continues to create an ingress span that patches `perform_now` + # + # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 + require_relative 'v6/patches/active_job_callbacks' + ::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks) + end end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/patches/base.rb similarity index 100% rename from instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb rename to instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/patches/base.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb similarity index 74% rename from instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb rename to instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb index b672f2b21c..1e3b916f51 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb @@ -5,7 +5,7 @@ module OpenTelemetry module Instrumentation module ActiveJob - # Provides helper methods + # Generates a set of semantic conventions attributes basd on the contents of the ActiveSupport Notifications Event Payload # class AttributeMapper TEST_ADAPTERS = %w[async inline].freeze @@ -32,13 +32,19 @@ def call(payload) end end - # Default handler to creates internal spans for events + # Default handler to creates internal spans for events that would not be used for ingress or egress class DefaultHandler def initialize(tracer, mapper) @tracer = tracer @mapper = mapper end + # Creates a span with `kind: internal` for non-ingress or non-egress events + # + # @param name [String] of the event + # @param _id [Number] This argument is ignored + # @param payload [Hash] containing the ActiveSupport Notifications Event payload + # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) span = @tracer.start_span(name, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] @@ -47,13 +53,19 @@ def on_start(name, _id, payload) end end - # Handles enqueue.active_job + # Handles enqueue.active_job and enqueue_at.active_job class EnqueueHandler def initialize(tracer, mapper) @tracer = tracer @mapper = mapper end + # Creates an engress span with `kind: producer` as well as injects the context into the job attributes using the specified propagation_style strategy. + # + # @param name [String] of the event either `enqueue.active_job` or `enqueue_at.active_job` + # @param _id [Number] This argument is ignored + # @param payload [Hash] containing the ActiveSupport Notifications Event payload + # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) otel_config = ActiveJob::Instrumentation.instance.config span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" @@ -71,6 +83,12 @@ def initialize(tracer, mapper) @mapper = mapper end + # Creates an ingress span with `kind: consumer` as well as extracts the context from the job attributes using the specified propagation_style strategy. + # + # @param name [String] of the event `perform.active_job` + # @param _id [Number] This argument is ignored + # @param payload [Hash] containing the ActiveSupport Notifications Event payload + # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) @@ -119,15 +137,12 @@ def initialize(...) def enqueue_at(...); end def enqueue(...); end def enqueue_retry(...); end - # This event causes much heartache as it is the first in a series of events that is triggered. - # It should not be the ingress span because it does not measure anything. - # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 - # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 - # def perform_start(...); end + def perform_start(...); end def perform(...); end def retry_stopped(...); end def discard(...); end + # Overrides ActiveSupport::Subscriber#start to create a span for the current event def start(name, id, payload) begin payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) @@ -138,6 +153,7 @@ def start(name, id, payload) super end + # Overrides ActiveSupport::Subscriber#finish to finish a span for the current event and cleans up the context stack def finish(_name, _id, payload) begin otel = payload.delete(:__otel) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb new file mode 100644 index 0000000000..988c6ced4b --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Patches + # Module to prepend to ActiveJob::Base for instrumentation. + module ActiveJobCallbacks + def self.prepended(base) + base.class_eval do + around_enqueue do |job, block| + span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer + span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} publish" + span_attributes = job_attributes(job) + otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do + OpenTelemetry.propagation.inject(job.__otel_headers) + block.call + end + end + end + end + + def perform_now + span_kind = self.class.queue_adapter_name == 'inline' ? :server : :consumer + span_name = "#{otel_config[:span_naming] == :job_class ? self.class : queue_name} process" + span_attributes = job_attributes(self).merge('messaging.operation' => 'process', 'code.function' => 'perform_now') + executions_count = (executions || 0) + 1 # because we run before the count is incremented in ActiveJob::Execution + + extracted_context = OpenTelemetry.propagation.extract(__otel_headers) + OpenTelemetry::Context.with_current(extracted_context) do + if otel_config[:propagation_style] == :child + otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do |span| + span.set_attribute('messaging.active_job.executions', executions_count) + super + end + else + span_links = [] + if otel_config[:propagation_style] == :link + span_context = OpenTelemetry::Trace.current_span(extracted_context).context + span_links << OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? + end + + root_span = otel_tracer.start_root_span(span_name, attributes: span_attributes, links: span_links, kind: span_kind) + OpenTelemetry::Trace.with_span(root_span) do |span| + span.set_attribute('messaging.active_job.executions', executions_count) + super + rescue Exception => e # rubocop:disable Lint/RescueException + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") + raise e + ensure + root_span.finish + end + end + end + ensure + # We may be in a job system (eg: resque) that forks and kills worker processes often. + # We don't want to lose spans by not flushing any span processors, so we optionally force it here. + OpenTelemetry.tracer_provider.force_flush if otel_config[:force_flush] + end + + private + + def job_attributes(job) + otel_attributes = { + 'code.namespace' => job.class.name, + 'messaging.destination_kind' => 'queue', + 'messaging.system' => job.class.queue_adapter_name, + 'messaging.destination' => job.queue_name, + 'messaging.message_id' => job.job_id, + 'messaging.active_job.provider_job_id' => job.provider_job_id, + 'messaging.active_job.scheduled_at' => job.scheduled_at, + 'messaging.active_job.priority' => job.priority + } + + otel_attributes['net.transport'] = 'inproc' if %w[async inline].include?(job.class.queue_adapter_name) + + otel_attributes.compact + end + + def otel_tracer + ActiveJob::Instrumentation.instance.tracer + end + + def otel_config + ActiveJob::Instrumentation.instance.config + end + end + end + end + end +end diff --git a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb b/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb index 95a41d3cb3..3c767bc8fb 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb +++ b/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb @@ -25,7 +25,7 @@ it 'handles jobs queued without instrumentation' do # e.g. during a rolling deployment job = TestJob.new serialized_job = job.serialize - serialized_job.delete('metadata') + serialized_job.delete('__otel_headers') job = TestJob.new job.deserialize(serialized_job) # should not raise an error diff --git a/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb b/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb index aad2bf2d97..5dbd0ed619 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb +++ b/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb @@ -8,7 +8,7 @@ require_relative '../../../lib/opentelemetry/instrumentation/active_job' -describe OpenTelemetry::Instrumentation::ActiveJob::Subscriber do +describe 'OpenTelemetry::Instrumentation::ActiveJob::Subscriber' do let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } # Technically these are the defaults. But ActiveJob seems to act oddly if you re-install # the instrumentation over and over again - so we manipulate instance variables to @@ -21,7 +21,7 @@ let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } } before do - OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall + OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall if defined?(OpenTelemetry::Instrumentation::ActiveJob::Subscriber) instrumentation.instance_variable_set(:@config, config) instrumentation.instance_variable_set(:@installed, false)