Skip to content

Commit

Permalink
fix: bad refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin authored Sep 24, 2023
1 parent d8b874b commit 57f5d18
Showing 1 changed file with 37 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,42 @@ def deserialize(job_data)
module OpenTelemetry
module Instrumentation
module ActiveJob
class EnqueueSubscriber
def on_start(name, _id, payload, subscriber)
span = subscriber.tracer.start_span("#{payload.fetch(:job).queue_name} publish",
kind: :producer,
attributes: subscriber.job_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class PerformSubscriber
def on_start(name, _id, payload, subscriber)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
span_context = OpenTelemetry::Trace.current_span(parent_context).context

if span_context.valid?
tokens << OpenTelemetry::Context.attach(parent_context)
links = [OpenTelemetry::Trace::Link.new(span_context)]
end

span = subscriber.tracer.start_span(
"#{payload.fetch(:job).queue_name} process",
kind: :consumer,
attributes: subscriber.job_attributes(payload.fetch(:job)),
links: links
)

tokens << OpenTelemetry::Context.attach(
OpenTelemetry::Trace.context_with_span(span)
)

{ span: span, ctx_tokens: tokens }
end
end

class GenericSubscriber < ::ActiveSupport::Subscriber
TEST_ADAPTERS = %w[async inline]
Expand All @@ -65,7 +101,7 @@ def perform(...);end

def start(name, id, payload)
begin
payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload)) # The payload is _not_ transmitted over the wire
payload.merge!(__otel: EVENT_HANDLERS.fetch(name).on_start(name, id, payload, self)) # The payload is _not_ transmitted over the wire
rescue StandardError => error
OpenTelemetry.handle_error(exception: error)
end
Expand Down Expand Up @@ -106,8 +142,6 @@ def on_start(name, _id, payload)
{ span: span, ctx_tokens: tokens }
end

private

def job_attributes(job)
otel_attributes = {
'code.namespace' => job.class.name,
Expand All @@ -129,43 +163,6 @@ def tracer
OpenTelemetry.tracer_provider.tracer('otel-active_job', '0.0.1')
end
end

class EnqueueSubscriber
def on_start(name, _id, payload)
span = tracer.start_span("#{payload.fetch(:job).queue_name} publish",
kind: :producer,
attributes: job_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

class PerformSubscriber
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
span_context = OpenTelemetry::Trace.current_span(parent_context).context

if span_context.valid?
tokens << OpenTelemetry::Context.attach(parent_context)
links = [OpenTelemetry::Trace::Link.new(span_context)]
end

span = tracer.start_span(
"#{payload.fetch(:job).queue_name} process",
kind: :consumer,
attributes: job_attributes(payload.fetch(:job)),
links: links
)

tokens << OpenTelemetry::Context.attach(
OpenTelemetry::Trace.context_with_span(span)
)

{ span: span, ctx_tokens: tokens }
end
end
end
end
end

0 comments on commit 57f5d18

Please sign in to comment.