Skip to content

Commit

Permalink
feat: add default handler
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin authored Sep 24, 2023
1 parent 63b902f commit daf5d1b
Showing 1 changed file with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ module OpenTelemetry
module Instrumentation
module ActiveJob

class EnqueueSubscriber
class DefaultHandler
def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload, subscriber)
span = @tracer.start_span(name, attributes: subscriber.as_otel_semconv_attrs(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class EnqueueHandler
def initialize(tracer)
@tracer = tracer
end
Expand All @@ -65,7 +78,7 @@ def on_start(name, _id, payload, subscriber)
end
end

class PerformSubscriber
class PerformHandler
def initialize(tracer)
@tracer = tracer
end
Expand Down Expand Up @@ -101,19 +114,22 @@ class Subscriber < ::ActiveSupport::Subscriber
def initialize(...)
super
@tracer = OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')
default_handler = DefaultHandler.new(@tracer)
@handlers_by_pattern = {
'enqueue.active_job' => EnqueueSubscriber.new(@tracer),
'perform.active_job' => PerformSubscriber.new(@tracer),
'enqueue.active_job' => EnqueueHandler.new(@tracer),
'perform.active_job' => PerformHandler.new(@tracer),
}
@handlers_by_pattern.default = default_handler
end

# The methods below are the events the Subscriber is interested in.
def enqueue(...); end
def perform_start(...); end
def perform(...);end

def start(name, id, payload)
begin
payload.merge!(__otel: @handlers_by_pattern.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
rescue StandardError => error
OpenTelemetry.handle_error(exception: error)
end
Expand Down Expand Up @@ -147,12 +163,6 @@ def finish(_name, _id, payload)
end
end

def on_start(name, _id, payload)
span = tracer.start_span(name, attributes: as_otel_semconv_attrs(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end

def as_otel_semconv_attrs(job)
test_adapters = %w[async inline]
Expand Down

0 comments on commit daf5d1b

Please sign in to comment.