Skip to content

Commit

Permalink
feat(active_job): Normalize event messages
Browse files Browse the repository at this point in the history
  • Loading branch information
lavoiesl committed Jul 22, 2024
1 parent cb76703 commit baf2497
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Instrumentation
module ActiveJob
# Module that contains custom event handlers, which are used to generate spans per event
module Handlers
EVENT_NAMESPACE = 'active_job'.freeze

module_function

# Subscribes Event Handlers to relevant ActiveJob notifications
Expand Down Expand Up @@ -57,7 +59,7 @@ def subscribe
}

@subscriptions = handlers_by_pattern.map do |key, handler|
::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler)
::ActiveSupport::Notifications.subscribe("#{key}.#{EVENT_NAMESPACE}", handler)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def start(name, id, payload)
# @param payload [Hash] containing job run information
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
span = tracer.start_span(name, attributes: @mapper.call(payload))
job = payload.fetch(:job)
event_name = name.delete_suffix(".#{EVENT_NAMESPACE}")
span_name = span_name(job, event_name)
span = tracer.start_span(span_name, attributes: @mapper.call(payload))
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))

{ span: span, ctx_token: token }
Expand Down Expand Up @@ -106,6 +109,18 @@ def on_exception(exception, span)
def tracer
OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance.tracer
end

private

def span_name(job, event_name)
prefix = if @config[:span_naming] == :job_class
job.class.name
else
job.queue_name
end

"#{prefix} #{event_name}"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module ActiveJob
module Handlers
# Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans
class Enqueue < Default
EVENT_NAME = 'publish'.freeze

# Overrides the `Default#start_span` method to create an egress span
# and registers it with the current context
#
Expand All @@ -19,22 +21,11 @@ class Enqueue < Default
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
span = tracer.start_span(span_name(job), kind: :producer, attributes: @mapper.call(payload))
span_name = span_name(job, EVENT_NAME)
span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
end

private

def span_name(job)
prefix = if @config[:span_naming] == :job_class
job.class.name
else
job.queue_name
end

"#{prefix} publish"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module ActiveJob
module Handlers
# Handles perform.active_job to generate ingress spans
class Perform < Default
EVENT_NAME = 'process'.freeze

# Overrides the `Default#start_span` method to create an ingress span
# and registers it with the current context
#
Expand All @@ -19,10 +21,9 @@ class Perform < Default
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
span_name = span_name(job, EVENT_NAME)
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)

span_name = span_name(job)

# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
if propagation_style == :child
Expand All @@ -48,18 +49,6 @@ def attach_consumer_context(span)

OpenTelemetry::Context.attach(internal_context)
end

private

def span_name(job)
prefix = if @config[:span_naming] == :job_class
job.class.name
else
job.queue_name
end

"#{prefix} process"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
let(:spans) { exporter.finished_spans }
let(:publish_span) { spans.find { |s| s.name == 'default publish' } }
let(:process_span) { spans.find { |s| s.name == 'default process' } }
let(:discard_span) { spans.find { |s| s.name == 'discard.active_job' } }
let(:discard_span) { spans.find { |s| s.name == 'default discard' } }

before do
OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
let(:spans) { exporter.finished_spans }
let(:publish_span) { spans.find { |s| s.name == 'default publish' } }
let(:process_span) { spans.find { |s| s.name == 'default process' } }
let(:retry_span) { spans.find { |s| s.name == 'retry_stopped.active_job' } }
let(:retry_span) { spans.find { |s| s.name == 'default retry_stopped' } }

before do
OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe
Expand Down

0 comments on commit baf2497

Please sign in to comment.