Skip to content

Commit

Permalink
Revert "squash: Add old callbacks in"
Browse files Browse the repository at this point in the history
This reverts commit 209aaf9.
  • Loading branch information
arielvalentin committed Oct 7, 2023
1 parent 209aaf9 commit f5b140d
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
MINIMUM_VERSION = Gem::Version.new('6.0.0')

install do |_config|
require_dependencies
patch_activejob
end

Expand Down Expand Up @@ -61,29 +62,15 @@ def gem_version
::ActiveJob.version
end

def require_dependencies
require_relative 'patches/base'
require_relative 'subscriber'
end

def patch_activejob
require_relative 'latest/patches/base'
::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base)

if Gem::Requirement.create('> 7').satisfied_by?(::ActiveJob.version)
require_relative 'latest/subscriber'
Subscriber.install
else
# Rails 6.x intruments code quite differently than Rails 7
#
# `perform_start` is the first in a series of events that is triggered, but it does not measure the underlying `perform` block,
# therefore is not suitable for being an ingress span.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
#
# Although in Rails 6.1 the `perform` block does indeed measure the job's perform method,
# It _does not_ instrument any of the before or after callbacks, which I think is something of interest to our end users.
#
# I will follow up with a specialized Rails 6.1 subscriber that measures all other events but continues to create an ingress span that patches `perform_now`
#
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
require_relative 'v6/patches/active_job_callbacks'
::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks)
end
Subscriber.install
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module OpenTelemetry
module Instrumentation
module ActiveJob
# Generates a set of semantic conventions attributes basd on the contents of the ActiveSupport Notifications Event Payload
# Provides helper methods
#
class AttributeMapper
TEST_ADAPTERS = %w[async inline].freeze
Expand All @@ -32,19 +32,13 @@ def call(payload)
end
end

# Default handler to creates internal spans for events that would not be used for ingress or egress
# Default handler to creates internal spans for events
class DefaultHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

# Creates a span with `kind: internal` for non-ingress or non-egress events
#
# @param name [String] of the event
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
Expand All @@ -53,19 +47,13 @@ def on_start(name, _id, payload)
end
end

# Handles enqueue.active_job and enqueue_at.active_job
# Handles enqueue.active_job
class EnqueueHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

# Creates an engress span with `kind: producer` as well as injects the context into the job attributes using the specified propagation_style strategy.
#
# @param name [String] of the event either `enqueue.active_job` or `enqueue_at.active_job`
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
Expand All @@ -83,12 +71,6 @@ def initialize(tracer, mapper)
@mapper = mapper
end

# Creates an ingress span with `kind: consumer` as well as extracts the context from the job attributes using the specified propagation_style strategy.
#
# @param name [String] of the event `perform.active_job`
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
Expand Down Expand Up @@ -137,12 +119,15 @@ def initialize(...)
def enqueue_at(...); end
def enqueue(...); end
def enqueue_retry(...); end
def perform_start(...); end
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
# def perform_start(...); end
def perform(...); end
def retry_stopped(...); end
def discard(...); end

# Overrides ActiveSupport::Subscriber#start to create a span for the current event
def start(name, id, payload)
begin
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload))
Expand All @@ -153,7 +138,6 @@ def start(name, id, payload)
super
end

# Overrides ActiveSupport::Subscriber#finish to finish a span for the current event and cleans up the context stack
def finish(_name, _id, payload)
begin
otel = payload.delete(:__otel)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
it 'handles jobs queued without instrumentation' do # e.g. during a rolling deployment
job = TestJob.new
serialized_job = job.serialize
serialized_job.delete('__otel_headers')
serialized_job.delete('metadata')

job = TestJob.new
job.deserialize(serialized_job) # should not raise an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

require_relative '../../../lib/opentelemetry/instrumentation/active_job'

describe 'OpenTelemetry::Instrumentation::ActiveJob::Subscriber' do
describe OpenTelemetry::Instrumentation::ActiveJob::Subscriber do
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance }
# Technically these are the defaults. But ActiveJob seems to act oddly if you re-install
# the instrumentation over and over again - so we manipulate instance variables to
Expand All @@ -21,7 +21,7 @@
let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } }

before do
OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall if defined?(OpenTelemetry::Instrumentation::ActiveJob::Subscriber)
OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall
instrumentation.instance_variable_set(:@config, config)
instrumentation.instance_variable_set(:@installed, false)

Expand Down

0 comments on commit f5b140d

Please sign in to comment.