From f5b140d8b8e59d67634d0eb3051a803288a87f78 Mon Sep 17 00:00:00 2001 From: Ariel Valentin Date: Fri, 6 Oct 2023 22:21:01 -0500 Subject: [PATCH] Revert "squash: Add old callbacks in" This reverts commit 209aaf961e67597795900c4131c6c54a0aad6329. --- .../active_job/instrumentation.rb | 27 ++---- .../active_job/{latest => }/patches/base.rb | 0 .../active_job/{latest => }/subscriber.rb | 32 ++----- .../v6/patches/active_job_callbacks.rb | 96 ------------------- .../active_job/patches/base_test.rb | 2 +- .../active_job/subscriber_test.rb | 4 +- 6 files changed, 18 insertions(+), 143 deletions(-) rename instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/{latest => }/patches/base.rb (100%) rename instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/{latest => }/subscriber.rb (74%) delete mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb index d687c1fb83..8b721ec01f 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/instrumentation.rb @@ -12,6 +12,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base MINIMUM_VERSION = Gem::Version.new('6.0.0') install do |_config| + require_dependencies patch_activejob end @@ -61,29 +62,15 @@ def gem_version ::ActiveJob.version end + def require_dependencies + require_relative 'patches/base' + require_relative 'subscriber' + end + def patch_activejob - require_relative 'latest/patches/base' ::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base) - if Gem::Requirement.create('> 7').satisfied_by?(::ActiveJob.version) - require_relative 'latest/subscriber' - Subscriber.install - else - # Rails 6.x intruments code quite differently than Rails 7 - # - # `perform_start` is the first in a series of events that is triggered, but it does not measure the underlying `perform` block, - # therefore is not suitable for being an ingress span. - # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 - # - # Although in Rails 6.1 the `perform` block does indeed measure the job's perform method, - # It _does not_ instrument any of the before or after callbacks, which I think is something of interest to our end users. - # - # I will follow up with a specialized Rails 6.1 subscriber that measures all other events but continues to create an ingress span that patches `perform_now` - # - # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 - require_relative 'v6/patches/active_job_callbacks' - ::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks) - end + Subscriber.install end end end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/patches/base.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb similarity index 100% rename from instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/patches/base.rb rename to instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/patches/base.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb similarity index 74% rename from instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb rename to instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb index 1e3b916f51..b672f2b21c 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/latest/subscriber.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb @@ -5,7 +5,7 @@ module OpenTelemetry module Instrumentation module ActiveJob - # Generates a set of semantic conventions attributes basd on the contents of the ActiveSupport Notifications Event Payload + # Provides helper methods # class AttributeMapper TEST_ADAPTERS = %w[async inline].freeze @@ -32,19 +32,13 @@ def call(payload) end end - # Default handler to creates internal spans for events that would not be used for ingress or egress + # Default handler to creates internal spans for events class DefaultHandler def initialize(tracer, mapper) @tracer = tracer @mapper = mapper end - # Creates a span with `kind: internal` for non-ingress or non-egress events - # - # @param name [String] of the event - # @param _id [Number] This argument is ignored - # @param payload [Hash] containing the ActiveSupport Notifications Event payload - # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) span = @tracer.start_span(name, attributes: @mapper.call(payload)) tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] @@ -53,19 +47,13 @@ def on_start(name, _id, payload) end end - # Handles enqueue.active_job and enqueue_at.active_job + # Handles enqueue.active_job class EnqueueHandler def initialize(tracer, mapper) @tracer = tracer @mapper = mapper end - # Creates an engress span with `kind: producer` as well as injects the context into the job attributes using the specified propagation_style strategy. - # - # @param name [String] of the event either `enqueue.active_job` or `enqueue_at.active_job` - # @param _id [Number] This argument is ignored - # @param payload [Hash] containing the ActiveSupport Notifications Event payload - # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) otel_config = ActiveJob::Instrumentation.instance.config span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish" @@ -83,12 +71,6 @@ def initialize(tracer, mapper) @mapper = mapper end - # Creates an ingress span with `kind: consumer` as well as extracts the context from the job attributes using the specified propagation_style strategy. - # - # @param name [String] of the event `perform.active_job` - # @param _id [Number] This argument is ignored - # @param payload [Hash] containing the ActiveSupport Notifications Event payload - # @return [Hash] containing the egress span and an array of context tokens to detach later def on_start(name, _id, payload) tokens = [] parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) @@ -137,12 +119,15 @@ def initialize(...) def enqueue_at(...); end def enqueue(...); end def enqueue_retry(...); end - def perform_start(...); end + # This event causes much heartache as it is the first in a series of events that is triggered. + # It should not be the ingress span because it does not measure anything. + # https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14 + # https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19 + # def perform_start(...); end def perform(...); end def retry_stopped(...); end def discard(...); end - # Overrides ActiveSupport::Subscriber#start to create a span for the current event def start(name, id, payload) begin payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) @@ -153,7 +138,6 @@ def start(name, id, payload) super end - # Overrides ActiveSupport::Subscriber#finish to finish a span for the current event and cleans up the context stack def finish(_name, _id, payload) begin otel = payload.delete(:__otel) diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb deleted file mode 100644 index 988c6ced4b..0000000000 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/v6/patches/active_job_callbacks.rb +++ /dev/null @@ -1,96 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module Instrumentation - module ActiveJob - module Patches - # Module to prepend to ActiveJob::Base for instrumentation. - module ActiveJobCallbacks - def self.prepended(base) - base.class_eval do - around_enqueue do |job, block| - span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer - span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} publish" - span_attributes = job_attributes(job) - otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do - OpenTelemetry.propagation.inject(job.__otel_headers) - block.call - end - end - end - end - - def perform_now - span_kind = self.class.queue_adapter_name == 'inline' ? :server : :consumer - span_name = "#{otel_config[:span_naming] == :job_class ? self.class : queue_name} process" - span_attributes = job_attributes(self).merge('messaging.operation' => 'process', 'code.function' => 'perform_now') - executions_count = (executions || 0) + 1 # because we run before the count is incremented in ActiveJob::Execution - - extracted_context = OpenTelemetry.propagation.extract(__otel_headers) - OpenTelemetry::Context.with_current(extracted_context) do - if otel_config[:propagation_style] == :child - otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do |span| - span.set_attribute('messaging.active_job.executions', executions_count) - super - end - else - span_links = [] - if otel_config[:propagation_style] == :link - span_context = OpenTelemetry::Trace.current_span(extracted_context).context - span_links << OpenTelemetry::Trace::Link.new(span_context) if span_context.valid? - end - - root_span = otel_tracer.start_root_span(span_name, attributes: span_attributes, links: span_links, kind: span_kind) - OpenTelemetry::Trace.with_span(root_span) do |span| - span.set_attribute('messaging.active_job.executions', executions_count) - super - rescue Exception => e # rubocop:disable Lint/RescueException - span.record_exception(e) - span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}") - raise e - ensure - root_span.finish - end - end - end - ensure - # We may be in a job system (eg: resque) that forks and kills worker processes often. - # We don't want to lose spans by not flushing any span processors, so we optionally force it here. - OpenTelemetry.tracer_provider.force_flush if otel_config[:force_flush] - end - - private - - def job_attributes(job) - otel_attributes = { - 'code.namespace' => job.class.name, - 'messaging.destination_kind' => 'queue', - 'messaging.system' => job.class.queue_adapter_name, - 'messaging.destination' => job.queue_name, - 'messaging.message_id' => job.job_id, - 'messaging.active_job.provider_job_id' => job.provider_job_id, - 'messaging.active_job.scheduled_at' => job.scheduled_at, - 'messaging.active_job.priority' => job.priority - } - - otel_attributes['net.transport'] = 'inproc' if %w[async inline].include?(job.class.queue_adapter_name) - - otel_attributes.compact - end - - def otel_tracer - ActiveJob::Instrumentation.instance.tracer - end - - def otel_config - ActiveJob::Instrumentation.instance.config - end - end - end - end - end -end diff --git a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb b/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb index 3c767bc8fb..95a41d3cb3 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb +++ b/instrumentation/active_job/test/instrumentation/active_job/patches/base_test.rb @@ -25,7 +25,7 @@ it 'handles jobs queued without instrumentation' do # e.g. during a rolling deployment job = TestJob.new serialized_job = job.serialize - serialized_job.delete('__otel_headers') + serialized_job.delete('metadata') job = TestJob.new job.deserialize(serialized_job) # should not raise an error diff --git a/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb b/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb index 5dbd0ed619..aad2bf2d97 100644 --- a/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb +++ b/instrumentation/active_job/test/instrumentation/active_job/subscriber_test.rb @@ -8,7 +8,7 @@ require_relative '../../../lib/opentelemetry/instrumentation/active_job' -describe 'OpenTelemetry::Instrumentation::ActiveJob::Subscriber' do +describe OpenTelemetry::Instrumentation::ActiveJob::Subscriber do let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } # Technically these are the defaults. But ActiveJob seems to act oddly if you re-install # the instrumentation over and over again - so we manipulate instance variables to @@ -21,7 +21,7 @@ let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } } before do - OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall if defined?(OpenTelemetry::Instrumentation::ActiveJob::Subscriber) + OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall instrumentation.instance_variable_set(:@config, config) instrumentation.instance_variable_set(:@installed, false)