Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: All AWS services emit traces #1150

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,32 @@ module Instrumentation
module AwsSdk
# Generates Spans for all interactions with AwsSdk
class Handler < Seahorse::Client::Handler
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'

def call(context)
return super unless context

service_name = service_name(context)
service_id = service_name(context)
operation = context.operation&.name
client_method = "#{service_name}.#{operation}"
attributes = {
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name
}
attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB'
MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS'
MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS'
client_method = "#{service_id}.#{operation}"
kaylareopelle marked this conversation as resolved.
Show resolved Hide resolved

tracer.in_span(
span_name(context, client_method, service_id),
attributes: attributes(context, client_method, service_id, operation),
kind: span_kind(client_method, service_id)
) do |span|
if instrumentation_config[:inject_messaging_context] &&
%w[SQS SNS].include?(service_id)
MessagingHelper.inject_context(context, client_method)
end

tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span|
inject_context(context, client_method)
if instrumentation_config[:suppress_internal_instrumentation]
OpenTelemetry::Common::Utilities.untraced { super }
else
super
end.tap do |response|
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code)
span.set_attribute(
OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code
)

if (err = response.error)
span.record_exception(err)
Expand All @@ -65,48 +61,40 @@ def service_name(context)
context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1]
end

SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze
def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method
return unless instrumentation_config[:inject_messaging_context]

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
def span_kind(client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.span_kind(client_method)
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
def span_name(context, client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.legacy_span_name(context, client_method)
else
OpenTelemetry::Trace::SpanKind::CLIENT
client_method
end
end

def span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
def attributes(context, client_method, service_id, operation)
{
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id
}.tap do |attrs|
attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB'
MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id)
end
end
end

# A Seahorse::Client::Plugin that enables instrumentation for all AWS services
class Plugin < Seahorse::Client::Plugin
def add_handlers(handlers, config)
def add_handlers(handlers, _config)
# run before Seahorse::Client::Plugin::ParamValidator (priority 50)
handlers.add Handler, step: :validate, priority: 49
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ module AwsSdk
# Instrumentation class that detects and installs the AwsSdk instrumentation
class Instrumentation < OpenTelemetry::Instrumentation::Base
MINIMUM_VERSION = Gem::Version.new('2.0.0')
CURRENT_MAJOR_VERSION = Gem::Version.new('3.0.0')

install do |_config|
require_dependencies
add_plugin(Seahorse::Client::Base, *loaded_constants)
add_plugins(Seahorse::Client::Base, *loaded_constants)
end

present do
Expand Down Expand Up @@ -46,14 +47,42 @@ def require_dependencies
require_relative 'messaging_helper'
end

def add_plugin(*targets)
def add_plugins(*targets)
targets.each { |klass| klass.add_plugin(AwsSdk::Plugin) }
end

def loaded_constants
jterapin marked this conversation as resolved.
Show resolved Hide resolved
if gem_version >= CURRENT_MAJOR_VERSION
load_v3_constants
else
load_legacy_constants
jterapin marked this conversation as resolved.
Show resolved Hide resolved
end
end

def load_v3_constants
jterapin marked this conversation as resolved.
Show resolved Hide resolved
::Aws.constants.each_with_object([]) do |c, constants|
m = ::Aws.const_get(c)
next unless unloaded_service?(c, m)

begin
constants << m.const_get(:Client)
rescue StandardError => e
OpenTelemetry.logger.warn("Constant could not be loaded: #{e}")
end
end
end

def unloaded_service?(constant, service_module)
jterapin marked this conversation as resolved.
Show resolved Hide resolved
!::Aws.autoload?(constant) &&
service_module.is_a?(Module) &&
service_module.const_defined?(:Client) &&
(service_module.const_get(:Client).superclass == Seahorse::Client::Base)
end

def load_legacy_constants
# Cross-check services against loaded AWS constants
# Module#const_get can return a constant from ancestors when there's a miss.
# If this conincidentally matches another constant, it will attempt to patch
# If this coincidentally matches another constant, it will attempt to patch
# the wrong constant, resulting in patch failure.
available_services = ::Aws.constants & SERVICES.map(&:to_sym)
available_services.each_with_object([]) do |service, constants|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module OpenTelemetry
module Instrumentation
module AwsSdk
# MessagingHelper class provides methods for calculating messaging span attributes
# An utility class to help SQS/SNS-related span attributes/context injection
class MessagingHelper
class << self
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'
SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze

def queue_name(context)
topic_arn = context.params[:topic_arn]
target_arn = context.params[:target_arn]
Expand All @@ -28,19 +34,65 @@ def queue_name(context)
'unknown'
end

def legacy_span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
end
end

# def apply_span_attributes(attrs, context, service_id, client_method)
jterapin marked this conversation as resolved.
Show resolved Hide resolved
def apply_span_attributes(context, attrs, client_method, service_id)
case service_id
when 'SQS'
apply_sqs_attributes(attrs, context, client_method)
when 'SNS'
apply_sns_attributes(attrs, context, client_method)
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
else
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method)

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
end
end

private

def apply_sqs_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url]

attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage'
attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE
end

def apply_sns_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns'

return unless client_method == 'SNS.Publish'
return unless client_method == SNS_PUBLISH

attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@ Gem::Specification.new do |spec|
spec.homepage = 'https://github.com/open-telemetry/opentelemetry-ruby-contrib'
spec.license = 'Apache-2.0'

spec.files = Dir.glob('lib/**/*.rb') +
Dir.glob('*.md') +
['LICENSE', '.yardopts']
spec.files =
Dir.glob('lib/**/*.rb') +
Dir.glob('*.md') +
['LICENSE', '.yardopts']
spec.require_paths = ['lib']
spec.required_ruby_version = '>= 3.0'

spec.add_dependency 'opentelemetry-api', '~> 1.0'
spec.add_dependency 'opentelemetry-instrumentation-base', '~> 0.22.1'

spec.add_development_dependency 'appraisal', '~> 2.5'
spec.add_development_dependency 'aws-sdk-dynamodb'
spec.add_development_dependency 'aws-sdk-s3'
spec.add_development_dependency 'aws-sdk-sns'
spec.add_development_dependency 'aws-sdk-sqs'
spec.add_development_dependency 'bundler', '~> 2.4'
spec.add_development_dependency 'minitest', '~> 5.0'
spec.add_development_dependency 'opentelemetry-sdk', '~> 1.1'
Expand Down
Loading