Skip to content

Commit

Permalink
refactor: Register discard job
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 8, 2023
1 parent b27d408 commit 603c23a
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }

payload.merge!(__otel: { span: span, ctx_tokens: tokens })
end

def finish(_name, _id, payload)
Expand Down Expand Up @@ -88,7 +89,7 @@ def start(name, _id, payload)
span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
payload.merge!(__otel: { span: span, ctx_tokens: tokens })
end
end

Expand Down Expand Up @@ -116,7 +117,7 @@ def start(name, _id, payload)

tokens.concat([consumer_context, aj_context].map { |context| OpenTelemetry::Context.attach(context) })

{ span: span, ctx_tokens: tokens }
payload.merge!(__otel: { span: span, ctx_tokens: tokens })
end
end

Expand Down Expand Up @@ -151,10 +152,10 @@ def enqueue_retry(...); end
# def perform_start(...); end
def perform(...); end
def retry_stopped(...); end
def discard(...); end
# def discard(...); end

def start(name, id, payload)
payload.merge!(__otel: @handlers_by_pattern[name].start(name, id, payload))
@handlers_by_pattern[name].start(name, id, payload)
# This is nuts
super if @call_super
end
Expand All @@ -167,10 +168,16 @@ def finish(name, id, payload)

def self.install
attach_to :active_job
tracer = Instrumentation.instance.tracer
mapper = AttributeMapper.new
default_handler = DefaultHandler.new(tracer, mapper)
@subscriptions ||= []
@subscriptions << ActiveSupport::Notifications.subscribe('discard.active_job', default_handler)
end

def self.uninstall
detach_from :active_job
@subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
end
end
end
Expand Down

0 comments on commit 603c23a

Please sign in to comment.