Skip to content

Commit

Permalink
Merge branch 'main' into release/multi/20240723151827
Browse files Browse the repository at this point in the history
  • Loading branch information
kaylareopelle authored Jul 23, 2024
2 parents df2b0be + df6e43f commit 654f934
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ module ActiveJob
module Handlers
# Handles `enqueue.active_job` and `enqueue_at.active_job` to generate egress spans
class Enqueue < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} publish" }
else
->(job) { "#{job.queue_name} publish" }
end
end

# Overrides the `Default#start_span` method to create an egress span
# and registers it with the current context
#
Expand All @@ -28,10 +19,22 @@ def initialize(...)
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
span = tracer.start_span(@span_name_formatter.call(job), kind: :producer, attributes: @mapper.call(payload))
span = tracer.start_span(span_name(job), kind: :producer, attributes: @mapper.call(payload))
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
end

private

def span_name(job)
prefix = if @config[:span_naming] == :job_class
job.class.name
else
job.queue_name
end

"#{prefix} publish"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ module ActiveJob
module Handlers
# Handles perform.active_job to generate ingress spans
class Perform < Default
def initialize(...)
super
@span_name_formatter = if @config[:span_naming] == :job_class
->(job) { "#{job.class.name} process" }
else
->(job) { "#{job.queue_name} process" }
end
end

# Overrides the `Default#start_span` method to create an ingress span
# and registers it with the current context
#
Expand All @@ -30,7 +21,7 @@ def start_span(name, _id, payload)
job = payload.fetch(:job)
parent_context = OpenTelemetry.propagation.extract(job.__otel_headers)

span_name = @span_name_formatter.call(job)
span_name = span_name(job)

# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
Expand All @@ -57,6 +48,18 @@ def attach_consumer_context(span)

OpenTelemetry::Context.attach(internal_context)
end

private

def span_name(job)
prefix = if @config[:span_naming] == :job_class
job.class.name
else
job.queue_name
end

"#{prefix} process"
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,22 @@
_(CallbacksJob.context_after).must_be :valid?
end
end

describe 'with a configuration modified after installation' do
let(:job_class) { TestJob }
let(:publish_span) { spans.find { |s| s.name == "#{job_class.name} publish" } }
let(:process_span) { spans.find { |s| s.name == "#{job_class.name} process" } }

before do
instance_config = instrumentation.instance_variable_get(:@config)
instance_config[:span_naming] = :job_class
end

it 'uses the updated configuration' do
TestJob.perform_later

_(publish_span).wont_be_nil
_(process_span).wont_be_nil
end
end
end
1 change: 1 addition & 0 deletions instrumentation/aws_lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ To run the example:
* `bundle install`
2. Run the sample client script
* `ruby trace_demonstration.rb`
* or `bundle exec ruby trace_demonstration.rb`

This will run SNS publish command, printing OpenTelemetry traces to the console as it goes.

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/aws_lambda/example/trace_demonstration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ def otel_wrapper(event:, context:)
"version" => "1.0"
}

context = MockLambdaContext.new(aws_request_id: "aws_request_id",invoked_function_arn: "invoked_function_arn",function_name: "function")
context = MockLambdaContext.new(aws_request_id: "aws_request_id",invoked_function_arn: "arn:aws:lambda:us-west-2:123456789012:function:HelloWorld",function_name: "function")

otel_wrapper(event: event, context: context) # you should see Success before the trace
Original file line number Diff line number Diff line change
Expand Up @@ -40,33 +40,25 @@ def call_wrapped(event:, context:)
original_handler_error = nil
original_response = nil
OpenTelemetry::Context.with_current(parent_context) do
span_attributes = otel_attributes(event, context)
span = tracer.start_span(
@original_handler,
attributes: span_attributes,
kind: span_kind
)

begin
response = call_original_handler(event: event, context: context)
status_code = response['statusCode'] || response[:statusCode] if response.is_a?(Hash)
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, status_code) if status_code
rescue StandardError => e
original_handler_error = e
ensure
original_response = response
tracer.in_span(@original_handler, attributes: otel_attributes(event, context), kind: span_kind) do |span|
begin
response = call_original_handler(event: event, context: context)
status_code = response['statusCode'] || response[:statusCode] if response.is_a?(Hash)
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, status_code) if status_code
rescue StandardError => e
original_handler_error = e
ensure
original_response = response
end
if original_handler_error
span.record_exception(original_handler_error)
span.status = OpenTelemetry::Trace::Status.error(original_handler_error.message)
end
end
rescue StandardError => e
OpenTelemetry.logger.error("aws-lambda instrumentation #{e.class}: #{e.message}")
ensure
if original_handler_error
span&.record_exception(original_handler_error)
span&.status = OpenTelemetry::Trace::Status.error(original_handler_error.message)
end
span&.finish
OpenTelemetry.tracer_provider.force_flush(timeout: @flush_timeout)
end

OpenTelemetry.tracer_provider.force_flush(timeout: @flush_timeout)

raise original_handler_error if original_handler_error

original_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,25 @@
end
end
end

describe 'validate_if_span_is_registered' do
it 'add_span_attributes_to_lambda_span' do
stub = proc do
span = OpenTelemetry::Trace.current_span
span.set_attribute('test.attribute', 320)
end

otel_wrapper = OpenTelemetry::Instrumentation::AwsLambda::Handler.new
otel_wrapper.stub(:call_original_handler, stub) do
otel_wrapper.call_wrapped(event: sqs_record, context: context)

_(last_span.name).must_equal 'sample.test'
_(last_span.kind).must_equal :consumer
_(last_span.status.code).must_equal 1
_(last_span.hex_parent_span_id).must_equal '0000000000000000'

_(last_span.attributes['test.attribute']).must_equal 320
end
end
end
end

0 comments on commit 654f934

Please sign in to comment.