Skip to content

Commit

Permalink
fix!(instrumentation): Align messaging instrumentation operation names (
Browse files Browse the repository at this point in the history
#648)

* fix!(active_job): rename 'send' operation to 'publish'

* fix!(aws_sdk): rename 'send' operation to 'publish'

* fix!(bunny): rename 'send' operation to 'publish'

* fix!(delayed_job): rename 'send' operation to 'publish'

* fix!(queue): rename 'send' operation to 'publish'

* fix!(racecar): rename 'send' operation to 'publish'

* fix!(rdkafka): rename 'send' operation to 'publish'

* fix!(ruby_kafka): rename 'send' operation to 'publish'

* fix!(sidekiq): rename 'send' operation to 'publish'

---------

Co-authored-by: Sam <[email protected]>
  • Loading branch information
michal-kazmierczak and plantfansam authored Sep 7, 2023
1 parent b14901e commit da351f9
Show file tree
Hide file tree
Showing 27 changed files with 182 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def self.prepended(base)
base.class_eval do
around_enqueue do |job, block|
span_kind = job.class.queue_adapter_name == 'inline' ? :client : :producer
span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} send"
span_name = "#{otel_config[:span_naming] == :job_class ? job.class : job.queue_name} publish"
span_attributes = job_attributes(job)
otel_tracer.in_span(span_name, attributes: span_attributes, kind: span_kind) do
OpenTelemetry.propagation.inject(job.metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
let(:config) { { propagation_style: :link, force_flush: false, span_naming: :queue } }
let(:exporter) { EXPORTER }
let(:spans) { exporter.finished_spans }
let(:send_span) { spans.find { |s| s.name == 'default send' } }
let(:publish_span) { spans.find { |s| s.name == 'default publish' } }
let(:process_span) { spans.find { |s| s.name == 'default process' } }

before do
Expand All @@ -41,7 +41,7 @@
it 'traces enqueuing and processing the job' do
TestJob.perform_later

_(send_span).wont_be_nil
_(publish_span).wont_be_nil
_(process_span).wont_be_nil
end
end
Expand All @@ -50,7 +50,7 @@
it 'only traces processing the job' do
TestJob.perform_now

_(send_span).must_be_nil
_(publish_span).must_be_nil
_(process_span).wont_be_nil
_(process_span.attributes['code.namespace']).must_equal('TestJob')
_(process_span.attributes['code.function']).must_equal('perform_now')
Expand Down Expand Up @@ -97,14 +97,14 @@

TestJob.perform_later

_(send_span.kind).must_equal(:client)
_(publish_span.kind).must_equal(:client)
_(process_span.kind).must_equal(:server)
end

it 'sets correct span kinds for all other jobs' do
TestJob.perform_later

_(send_span.kind).must_equal(:producer)
_(publish_span.kind).must_equal(:producer)
_(process_span.kind).must_equal(:consumer)
end
end
Expand All @@ -113,23 +113,23 @@
it 'sets the messaging.operation attribute only when processing the job' do
TestJob.perform_later

_(send_span.attributes['messaging.operation']).must_be_nil
_(publish_span.attributes['messaging.operation']).must_be_nil
_(process_span.attributes['messaging.operation']).must_equal('process')
end

describe 'net.transport' do
it 'is sets correctly for inline jobs' do
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['net.transport']).must_equal('inproc')
end
end

it 'is set correctly for async jobs' do
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['net.transport']).must_equal('inproc')
end
end
Expand All @@ -139,15 +139,15 @@
it 'is unset for unprioritized jobs' do
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['messaging.active_job.priority']).must_be_nil
end
end

it 'is set for jobs with a priority' do
TestJob.set(priority: 1).perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['messaging.active_job.priority']).must_equal(1)
end
end
Expand All @@ -157,7 +157,7 @@
it 'is unset for jobs that do not specify a wait time' do
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['messaging.active_job.scheduled_at']).must_be_nil
end
end
Expand All @@ -166,8 +166,8 @@
job = TestJob.set(wait: 0.second).perform_later

# Only the sending span is a 'scheduled' thing
_(send_span.attributes['messaging.active_job.scheduled_at']).must_equal(job.scheduled_at)
assert(send_span.attributes['messaging.active_job.scheduled_at'])
_(publish_span.attributes['messaging.active_job.scheduled_at']).must_equal(job.scheduled_at)
assert(publish_span.attributes['messaging.active_job.scheduled_at'])

# The processing span isn't a 'scheduled' thing
_(process_span.attributes['messaging.active_job.scheduled_at']).must_be_nil
Expand All @@ -185,15 +185,15 @@
ActiveJob::Base.queue_adapter = :inline
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['messaging.system']).must_equal('inline')
end
end

it 'is set correctly for the async adapter' do
TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['messaging.system']).must_equal('async')
end
end
Expand Down Expand Up @@ -232,7 +232,7 @@
it 'generally sets other attributes as expected' do
job = TestJob.perform_later

[send_span, process_span].each do |span|
[publish_span, process_span].each do |span|
_(span.attributes['code.namespace']).must_equal('TestJob')
_(span.attributes['messaging.destination_kind']).must_equal('queue')
_(span.attributes['messaging.system']).must_equal('async')
Expand All @@ -245,8 +245,8 @@
describe 'when queue - default' do
it 'names spans according to the job queue' do
TestJob.set(queue: :foo).perform_later
send_span = exporter.finished_spans.find { |s| s.name == 'foo send' }
_(send_span).wont_be_nil
publish_span = exporter.finished_spans.find { |s| s.name == 'foo publish' }
_(publish_span).wont_be_nil

process_span = exporter.finished_spans.find { |s| s.name == 'foo process' }
_(process_span).wont_be_nil
Expand All @@ -258,8 +258,8 @@

it 'names span according to the job class' do
TestJob.set(queue: :foo).perform_later
send_span = exporter.finished_spans.find { |s| s.name == 'TestJob send' }
_(send_span).wont_be_nil
publish_span = exporter.finished_spans.find { |s| s.name == 'TestJob publish' }
_(publish_span).wont_be_nil

process_span = exporter.finished_spans.find { |s| s.name == 'TestJob process' }
_(process_span).wont_be_nil
Expand Down Expand Up @@ -309,11 +309,11 @@
it 'creates span links in separate traces' do
TestJob.perform_later

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(send_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(send_span.span_id)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
Expand All @@ -322,11 +322,11 @@
BaggageJob.perform_later
end

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(send_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(send_span.span_id)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
Expand All @@ -339,8 +339,8 @@

_(process_span.total_recorded_links).must_equal(0)

_(send_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(send_span.span_id)
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
Expand All @@ -350,8 +350,8 @@
end
_(process_span.total_recorded_links).must_equal(0)

_(send_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(send_span.span_id)
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
Expand All @@ -364,8 +364,8 @@

_(process_span.total_recorded_links).must_equal(0)

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(process_span.parent_span_id).wont_equal(send_span.span_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)
_(process_span.parent_span_id).wont_equal(publish_span.span_id)
end

it 'still propagates baggage' do
Expand All @@ -376,8 +376,8 @@

_(process_span.total_recorded_links).must_equal(0)

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(process_span.parent_span_id).wont_equal(send_span.span_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)
_(process_span.parent_span_id).wont_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def span_kind(client_method)
def span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} send"
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
sns_client.publish message: 'msg', phone_number: '123456'

_(last_span.attributes['messaging.destination']).must_equal 'phone_number'
_(last_span.name).must_equal 'phone_number send'
_(last_span.name).must_equal 'phone_number publish'
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def self.with_send_span(channel, tracer, exchange, routing_key, &block)
attributes = basic_attributes(channel, channel.connection, exchange, routing_key)
destination = destination_name(exchange, routing_key)

tracer.in_span("#{destination} send", attributes: attributes, kind: :producer, &block)
tracer.in_span("#{destination} publish", attributes: attributes, kind: :producer, &block)
end

def self.with_process_span(channel, tracer, delivery_info, properties, &block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@

_(spans.size >= 3).must_equal(true)

send_span = spans.find { |span| span.name == "#{topic}.ruby.news send" }
_(send_span).wont_be_nil
_(send_span.kind).must_equal(:producer)
_(send_span.attributes['messaging.system']).must_equal('rabbitmq')
_(send_span.attributes['messaging.destination']).must_equal(topic)
_(send_span.attributes['messaging.destination_kind']).must_equal('topic')
_(send_span.attributes['messaging.protocol']).must_equal('AMQP')
_(send_span.attributes['messaging.protocol_version']).must_equal('0.9.1')
_(send_span.attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(send_span.attributes['net.peer.name']).must_equal(host)
_(send_span.attributes['net.peer.port']).must_equal(port.to_i)
publish_span = spans.find { |span| span.name == "#{topic}.ruby.news publish" }
_(publish_span).wont_be_nil
_(publish_span.kind).must_equal(:producer)
_(publish_span.attributes['messaging.system']).must_equal('rabbitmq')
_(publish_span.attributes['messaging.destination']).must_equal(topic)
_(publish_span.attributes['messaging.destination_kind']).must_equal('topic')
_(publish_span.attributes['messaging.protocol']).must_equal('AMQP')
_(publish_span.attributes['messaging.protocol_version']).must_equal('0.9.1')
_(publish_span.attributes['messaging.rabbitmq.routing_key']).must_equal('ruby.news')
_(publish_span.attributes['net.peer.name']).must_equal(host)
_(publish_span.attributes['net.peer.port']).must_equal(port.to_i)

receive_span = spans.find { |span| span.name == "#{topic}.ruby.news receive" }
_(receive_span).wont_be_nil
Expand All @@ -78,7 +78,7 @@
_(process_span.trace_id).must_equal(receive_span.trace_id)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
_(linked_span_context.span_id).must_equal(send_span.span_id)
_(linked_span_context.trace_id).must_equal(publish_span.trace_id)
_(linked_span_context.span_id).must_equal(publish_span.span_id)
end
end unless ENV['OMIT_SERVICES']
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@

_(spans.size >= 3).must_equal(true)

send_span = spans.find { |span| span.name == "#{topic}.ruby.news send" }
_(send_span).wont_be_nil
_(send_span.kind).must_equal(:producer)
publish_span = spans.find { |span| span.name == "#{topic}.ruby.news publish" }
_(publish_span).wont_be_nil
_(publish_span.kind).must_equal(:producer)

receive_span = spans.find { |span| span.name == "#{topic}.ruby.news receive" }
_(receive_span).wont_be_nil
Expand All @@ -68,7 +68,7 @@
_(process_span.trace_id).must_equal(receive_span.trace_id)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
_(linked_span_context.span_id).must_equal(send_span.span_id)
_(linked_span_context.trace_id).must_equal(publish_span.trace_id)
_(linked_span_context.span_id).must_equal(publish_span.span_id)
end
end unless ENV['OMIT_SERVICES']
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@

queue.pop { |_delivery_info, _metadata, _payload| break }

send_span = spans.find { |span| span.name == ".#{queue_name} send" }
_(send_span).wont_be_nil
publish_span = spans.find { |span| span.name == ".#{queue_name} publish" }
_(publish_span).wont_be_nil

receive_span = spans.find { |span| span.name == ".#{queue_name} receive" }
_(receive_span).wont_be_nil
Expand All @@ -58,7 +58,7 @@
_(process_span.kind).must_equal(:consumer)

linked_span_context = process_span.links.first.span_context
_(linked_span_context.trace_id).must_equal(send_span.trace_id)
_(linked_span_context.trace_id).must_equal(publish_span.trace_id)
end

it 'traces messages returned' do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def instrument_enqueue(job, &block)
return block.call(job) unless enabled?

attributes = build_attributes(job)
attributes['messaging.operation'] = 'send'
attributes['messaging.operation'] = 'publish'
attributes.compact!

tracer.in_span("#{job_queue(job)} send", attributes: attributes, kind: :producer) do |span|
tracer.in_span("#{job_queue(job)} publish", attributes: attributes, kind: :producer) do |span|
yield job
span.set_attribute('messaging.message_id', job.id.to_s)
add_events(span, job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ def job_data
_(exporter.finished_spans.size).must_equal 1

_(span).must_be_kind_of OpenTelemetry::SDK::Trace::SpanData
_(span.name).must_equal 'default send'
_(span.name).must_equal 'default publish'
_(span.attributes['messaging.system']).must_equal 'delayed_job'
_(span.attributes['messaging.destination']).must_equal 'default'
_(span.attributes['messaging.destination_kind']).must_equal 'queue'
_(span.attributes['messaging.delayed_job.name']).must_equal 'BasicPayload'
_(span.attributes['messaging.delayed_job.priority']).must_equal 0
_(span.attributes['messaging.operation']).must_equal 'send'
_(span.attributes['messaging.operation']).must_equal 'publish'
_(span.attributes['messaging.message_id']).must_be_kind_of String

_(span.events.size).must_equal 2
Expand Down Expand Up @@ -122,7 +122,7 @@ def job_data
_(exporter.finished_spans).must_equal []
job_enqueue
_(exporter.finished_spans.size).must_equal 1
_(exporter.finished_spans.first.name).must_equal 'default send'
_(exporter.finished_spans.first.name).must_equal 'default publish'
job_run
_(exporter.finished_spans.size).must_equal 2

Expand Down
Loading

0 comments on commit da351f9

Please sign in to comment.