Skip to content

Commit

Permalink
squash: Add old callbacks in
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Sep 30, 2023
1 parent 891fb32 commit 209aaf9
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
MINIMUM_VERSION = Gem::Version.new('6.0.0')

install do |_config|
require_dependencies
patch_activejob
end

Expand Down Expand Up @@ -62,15 +61,29 @@ def gem_version
::ActiveJob.version
end

def require_dependencies
require_relative 'patches/base'
require_relative 'subscriber'
end

def patch_activejob
require_relative 'latest/patches/base'
::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base)

Subscriber.install
if Gem::Requirement.create('> 7').satisfied_by?(::ActiveJob.version)
require_relative 'latest/subscriber'
Subscriber.install
else
# Rails 6.x intruments code quite differently than Rails 7
#
# `perform_start` is the first in a series of events that is triggered, but it does not measure the underlying `perform` block,
# therefore is not suitable for being an ingress span.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
#
# Although in Rails 6.1 the `perform` block does indeed measure the job's perform method,
# It _does not_ instrument any of the before or after callbacks, which I think is something of interest to our end users.
#
# I will follow up with a specialized Rails 6.1 subscriber that measures all other events but continues to create an ingress span that patches `perform_now`
#
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
require_relative 'v6/patches/active_job_callbacks'
::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module OpenTelemetry
module Instrumentation
module ActiveJob
# Provides helper methods
# Generates a set of semantic conventions attributes basd on the contents of the ActiveSupport Notifications Event Payload
#
class AttributeMapper
TEST_ADAPTERS = %w[async inline].freeze
Expand All @@ -32,13 +32,19 @@ def call(payload)
end
end

# Default handler to creates internal spans for events
# Default handler to creates internal spans for events that would not be used for ingress or egress
class DefaultHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

# Creates a span with `kind: internal` for non-ingress or non-egress events
#
# @param name [String] of the event
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
Expand All @@ -47,13 +53,19 @@ def on_start(name, _id, payload)
end
end

# Handles enqueue.active_job
# Handles enqueue.active_job and enqueue_at.active_job
class EnqueueHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

# Creates an engress span with `kind: producer` as well as injects the context into the job attributes using the specified propagation_style strategy.
#
# @param name [String] of the event either `enqueue.active_job` or `enqueue_at.active_job`
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
Expand All @@ -71,6 +83,12 @@ def initialize(tracer, mapper)
@mapper = mapper
end

# Creates an ingress span with `kind: consumer` as well as extracts the context from the job attributes using the specified propagation_style strategy.
#
# @param name [String] of the event `perform.active_job`
# @param _id [Number] This argument is ignored
# @param payload [Hash<String,Object>] containing the ActiveSupport Notifications Event payload
# @return [Hash<String, Object>] containing the egress span and an array of context tokens to detach later
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
Expand Down Expand Up @@ -119,15 +137,12 @@ def initialize(...)
def enqueue_at(...); end
def enqueue(...); end
def enqueue_retry(...); end
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
# def perform_start(...); end
def perform_start(...); end
def perform(...); end
def retry_stopped(...); end
def discard(...); end

# Overrides ActiveSupport::Subscriber#start to create a span for the current event
def start(name, id, payload)
begin
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload))
Expand All @@ -138,6 +153,7 @@ def start(name, id, payload)
super
end

# Overrides ActiveSupport::Subscriber#finish to finish a span for the current event and cleans up the context stack
def finish(_name, _id, payload)
begin
otel = payload.delete(:__otel)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module ActiveJob
module Patches
# Module to prepend to ActiveJob::Base for instrumentation.
module ActiveJobCallbacks
def self.prepended(base)
base.class_eval do
around_enqueue do |job, block|
span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer
span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} publish"
span_attributes = job_attributes(job)
otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do
OpenTelemetry.propagation.inject(job.__otel_headers)
block.call
end
end
end
end

def perform_now
span_kind = self.class.queue_adapter_name == 'inline' ? :server : :consumer
span_name = "#{otel_config[:span_naming] == :job_class ? self.class : queue_name} process"
span_attributes = job_attributes(self).merge('messaging.operation' => 'process', 'code.function' => 'perform_now')
executions_count = (executions || 0) + 1 # because we run before the count is incremented in ActiveJob::Execution

extracted_context = OpenTelemetry.propagation.extract(__otel_headers)
OpenTelemetry::Context.with_current(extracted_context) do
if otel_config[:propagation_style] == :child
otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do |span|
span.set_attribute('messaging.active_job.executions', executions_count)
super
end
else
span_links = []
if otel_config[:propagation_style] == :link
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
span_links << OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
end

root_span = otel_tracer.start_root_span(span_name, attributes: span_attributes, links: span_links, kind: span_kind)
OpenTelemetry::Trace.with_span(root_span) do |span|
span.set_attribute('messaging.active_job.executions', executions_count)
super
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
root_span.finish
end
end
end
ensure
# We may be in a job system (eg: resque) that forks and kills worker processes often.
# We don't want to lose spans by not flushing any span processors, so we optionally force it here.
OpenTelemetry.tracer_provider.force_flush if otel_config[:force_flush]
end

private

def job_attributes(job)
otel_attributes = {
'code.namespace' => job.class.name,
'messaging.destination_kind' => 'queue',
'messaging.system' => job.class.queue_adapter_name,
'messaging.destination' => job.queue_name,
'messaging.message_id' => job.job_id,
'messaging.active_job.provider_job_id' => job.provider_job_id,
'messaging.active_job.scheduled_at' => job.scheduled_at,
'messaging.active_job.priority' => job.priority
}

otel_attributes['net.transport'] = 'inproc' if %w[async inline].include?(job.class.queue_adapter_name)

otel_attributes.compact
end

def otel_tracer
ActiveJob::Instrumentation.instance.tracer
end

def otel_config
ActiveJob::Instrumentation.instance.config
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
it 'handles jobs queued without instrumentation' do # e.g. during a rolling deployment
job = TestJob.new
serialized_job = job.serialize
serialized_job.delete('metadata')
serialized_job.delete('__otel_headers')

job = TestJob.new
job.deserialize(serialized_job) # should not raise an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

require_relative '../../../lib/opentelemetry/instrumentation/active_job'

describe OpenTelemetry::Instrumentation::ActiveJob::Subscriber do
describe 'OpenTelemetry::Instrumentation::ActiveJob::Subscriber' do
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance }
# Technically these are the defaults. But ActiveJob seems to act oddly if you re-install
# the instrumentation over and over again - so we manipulate instance variables to
Expand All @@ -21,7 +21,7 @@
let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } }

before do
OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall
OpenTelemetry::Instrumentation::ActiveJob::Subscriber.uninstall if defined?(OpenTelemetry::Instrumentation::ActiveJob::Subscriber)
instrumentation.instance_variable_set(:@config, config)
instrumentation.instance_variable_set(:@installed, false)

Expand Down

0 comments on commit 209aaf9

Please sign in to comment.