Skip to content

Commit

Permalink
refactor: extract to files
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Oct 9, 2023
1 parent 465aa3c commit 7c9e20c
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

module OpenTelemetry
module Instrumentation
module ActiveJob
# Default handler to creates internal spans for events
class DefaultHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

def start(name, id, payload)
payload.merge!(__otel: on_start(name, id, payload))
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]

{ span: span, ctx_tokens: tokens }
end

def finish(_name, _id, payload)
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
tokens = otel&.fetch(:ctx_tokens)

exception = payload[:error] || payload[:exception_object]
on_exception(exception, span) if exception
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
ensure
finish_span(span, tokens)
end

def finish_span(span, tokens)
# closes the span after all attributes have been finalized
begin
span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET
span&.finish
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

# pops the context stack
tokens&.reverse&.each do |token|
OpenTelemetry::Context.detach(token)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
end

# Records exceptions on spans and sets Span statuses to `Error`
#
# Handled exceptions are recorded on internal spans related to the event. E.g. `discard` events are recorded on the `discard.active_job` span
# Handled exceptions _are not_ copied to the ingress span, but it does set the status to `Error` making it easier to know that a job has failed
# Unhandled exceptions bubble up to the ingress span and are recorded there.
#
# @param [Exception] exception to report as a Span Event
# @param [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
def on_exception(exception, span)
status = OpenTelemetry::Trace::Status.error(exception.message)
OpenTelemetry::Instrumentation::ActiveJob.current_span.status = status
span&.record_exception(exception)
span&.status = status
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module OpenTelemetry
module Instrumentation
module ActiveJob
# Handles enqueue.active_job
class EnqueueHandler < DefaultHandler
def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

module OpenTelemetry
module Instrumentation
module ActiveJob
# Handles perform.active_job
class PerformHandler < DefaultHandler
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)

span_name = span_name_from(payload)

# TODO: Refactor into a propagation strategy
propagation_style = otel_config[:propagation_style]
if propagation_style == :child
tokens << OpenTelemetry::Context.attach(parent_context)
span = @tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload))
else
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
span = @tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links)
end

tokens.concat(attach_consumer_context(span))

{ span: span, ctx_tokens: tokens }
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
# 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK.
# @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
# @return [Array] Context tokens that must be detached when finished
def attach_consumer_context(span)
consumer_context = OpenTelemetry::Trace.context_with_span(span)
internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context)

[consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) }
end

# TODO: refactor into a strategy
def span_name_from(payload)
"#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} process"
end

def otel_config
ActiveJob::Instrumentation.instance.config
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# frozen_string_literal: true

require_relative 'default_handler'
require_relative 'enqueue_handler'
require_relative 'perform_handler'

module OpenTelemetry
module Instrumentation
module ActiveJob
Expand Down Expand Up @@ -28,131 +32,6 @@ def call(payload)
end
end

# Default handler to creates internal spans for events
class DefaultHandler
def initialize(tracer, mapper)
@tracer = tracer
@mapper = mapper
end

def start(name, id, payload)
payload.merge!(__otel: on_start(name, id, payload))
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]

{ span: span, ctx_tokens: tokens }
end

def finish(_name, _id, payload)
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
tokens = otel&.fetch(:ctx_tokens)

exception = payload[:error] || payload[:exception_object]
on_exception(exception, span) if exception
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
ensure
finish_span(span, tokens)
end

def finish_span(span, tokens)
# closes the span after all attributes have been finalized
begin
span&.status = OpenTelemetry::Trace::Status.ok if span&.status&.code == OpenTelemetry::Trace::Status::UNSET
span&.finish
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

# pops the context stack
tokens&.reverse&.each do |token|
OpenTelemetry::Context.detach(token)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
end

# Records exceptions on spans and sets Span statuses to `Error`
#
# Handled exceptions are recorded on internal spans related to the event. E.g. `discard` events are recorded on the `discard.active_job` span
# Handled exceptions _are not_ copied to the ingress span, but it does set the status to `Error` making it easier to know that a job has failed
# Unhandled exceptions bubble up to the ingress span and are recorded there.
#
# @param [Exception] exception to report as a Span Event
# @param [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
def on_exception(exception, span)
status = OpenTelemetry::Trace::Status.error(exception.message)
OpenTelemetry::Instrumentation::ActiveJob.current_span.status = status
span&.record_exception(exception)
span&.status = status
end
end

# Handles enqueue.active_job
class EnqueueHandler < DefaultHandler
def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
span = @tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

# Handles perform.active_job
class PerformHandler < DefaultHandler
def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)

span_name = span_name_from(payload)

# TODO: Refactor into a propagation strategy
propagation_style = otel_config[:propagation_style]
if propagation_style == :child
tokens << OpenTelemetry::Context.attach(parent_context)
span = @tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload))
else
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
span = @tracer.start_root_span(span_name, kind: :consumer, attributes: @mapper.call(payload), links: links)
end

tokens.concat(attach_consumer_context(span))

{ span: span, ctx_tokens: tokens }
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
# 2. Registers the ingress span as the "active" span, which is the default behavior of the SDK.
# @param span [OpenTelemetry::Trace::Span] the currently active span used to record the exception and set the status
# @return [Array] Context tokens that must be detached when finished
def attach_consumer_context(span)
consumer_context = OpenTelemetry::Trace.context_with_span(span)
internal_context = OpenTelemetry::Instrumentation::ActiveJob.context_with_span(span, parent_context: consumer_context)

[consumer_context, internal_context].map { |context| OpenTelemetry::Context.attach(context) }
end

# TODO: refactor into a strategy
def span_name_from(payload)
"#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} process"
end

def otel_config
ActiveJob::Instrumentation.instance.config
end
end

# Custom subscriber that handles ActiveJob notifications
class Subscriber
def self.install
Expand Down

0 comments on commit 7c9e20c

Please sign in to comment.