From 32c0aa829cb8ccd3e54afffc9315ff6a1a9e1309 Mon Sep 17 00:00:00 2001 From: Juli Tera Date: Thu, 5 Sep 2024 09:56:26 -0700 Subject: [PATCH] refactor: Clean up Handler and MessageHelper --- .../instrumentation/aws_sdk/handler.rb | 84 ++++++++----------- .../aws_sdk/messaging_helper.rb | 62 ++++++++++++-- ...ntelemetry-instrumentation-aws_sdk.gemspec | 3 +- 3 files changed, 95 insertions(+), 54 deletions(-) diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb index a973adeb5..c0b040956 100644 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb +++ b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/handler.rb @@ -9,36 +9,32 @@ module Instrumentation module AwsSdk # Generates Spans for all interactions with AwsSdk class Handler < Seahorse::Client::Handler - SQS_SEND_MESSAGE = 'SQS.SendMessage' - SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' - SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' - SNS_PUBLISH = 'SNS.Publish' - def call(context) return super unless context - service_name = service_name(context) + service_id = service_name(context) operation = context.operation&.name - client_method = "#{service_name}.#{operation}" - attributes = { - 'aws.region' => context.config.region, - OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', - OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, - OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name - } - attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB' - MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS' - MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS' + client_method = "#{service_id}.#{operation}" + + tracer.in_span( + span_name(context, client_method, service_id), + attributes: attributes(context, client_method, service_id, operation), + kind: span_kind(client_method, service_id) + ) do |span| + if instrumentation_config[:inject_messaging_context] && + %w[SQS SNS].include?(service_id) + MessagingHelper.inject_context(context, client_method) + end - tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span| - inject_context(context, client_method) if instrumentation_config[:suppress_internal_instrumentation] OpenTelemetry::Common::Utilities.untraced { super } else super end.tap do |response| - span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, - context.http_response.status_code) + span.set_attribute( + OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE, + context.http_response.status_code + ) if (err = response.error) span.record_exception(err) @@ -65,48 +61,40 @@ def service_name(context) context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1] end - SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze - def inject_context(context, client_method) - return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method - return unless instrumentation_config[:inject_messaging_context] - - if client_method == SQS_SEND_MESSAGE_BATCH - context.params[:entries].each do |entry| - entry[:message_attributes] ||= {} - OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) - end + def span_kind(client_method, service_id) + case service_id + when 'SQS', 'SNS' + MessagingHelper.span_kind(client_method) else - context.params[:message_attributes] ||= {} - OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + OpenTelemetry::Trace::SpanKind::CLIENT end end - def span_kind(client_method) - case client_method - when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH - OpenTelemetry::Trace::SpanKind::PRODUCER - when SQS_RECEIVE_MESSAGE - OpenTelemetry::Trace::SpanKind::CONSUMER + def span_name(context, client_method, service_id) + case service_id + when 'SQS', 'SNS' + MessagingHelper.legacy_span_name(context, client_method) else - OpenTelemetry::Trace::SpanKind::CLIENT + client_method end end - def span_name(context, client_method) - case client_method - when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH - "#{MessagingHelper.queue_name(context)} publish" - when SQS_RECEIVE_MESSAGE - "#{MessagingHelper.queue_name(context)} receive" - else - client_method + def attributes(context, client_method, service_id, operation) + { + 'aws.region' => context.config.region, + OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api', + OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation, + OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id + }.tap do |attrs| + attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB' + MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id) end end end # A Seahorse::Client::Plugin that enables instrumentation for all AWS services class Plugin < Seahorse::Client::Plugin - def add_handlers(handlers, config) + def add_handlers(handlers, _config) # run before Seahorse::Client::Plugin::ParamValidator (priority 50) handlers.add Handler, step: :validate, priority: 49 end diff --git a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb index e6fb8d0e4..7e9225cac 100644 --- a/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb +++ b/instrumentation/aws_sdk/lib/opentelemetry/instrumentation/aws_sdk/messaging_helper.rb @@ -7,9 +7,15 @@ module OpenTelemetry module Instrumentation module AwsSdk - # MessagingHelper class provides methods for calculating messaging span attributes + # An utility class to help SQS/SNS-related span attributes/context injection class MessagingHelper class << self + SQS_SEND_MESSAGE = 'SQS.SendMessage' + SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch' + SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage' + SNS_PUBLISH = 'SNS.Publish' + SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze + def queue_name(context) topic_arn = context.params[:topic_arn] target_arn = context.params[:target_arn] @@ -28,19 +34,65 @@ def queue_name(context) 'unknown' end + def legacy_span_name(context, client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + "#{MessagingHelper.queue_name(context)} publish" + when SQS_RECEIVE_MESSAGE + "#{MessagingHelper.queue_name(context)} receive" + else + client_method + end + end + + # def apply_span_attributes(attrs, context, service_id, client_method) + def apply_span_attributes(context, attrs, client_method, service_id) + case service_id + when 'SQS' + apply_sqs_attributes(attrs, context, client_method) + when 'SNS' + apply_sns_attributes(attrs, context, client_method) + end + end + + def span_kind(client_method) + case client_method + when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH + OpenTelemetry::Trace::SpanKind::PRODUCER + when SQS_RECEIVE_MESSAGE + OpenTelemetry::Trace::SpanKind::CONSUMER + else + OpenTelemetry::Trace::SpanKind::CLIENT + end + end + + def inject_context(context, client_method) + return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method) + + if client_method == SQS_SEND_MESSAGE_BATCH + context.params[:entries].each do |entry| + entry[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter) + end + else + context.params[:message_attributes] ||= {} + OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter) + end + end + + private + def apply_sqs_attributes(attributes, context, client_method) attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context) attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url] - - attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage' + attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE end def apply_sns_attributes(attributes, context, client_method) attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns' - - return unless client_method == 'SNS.Publish' + return unless client_method == SNS_PUBLISH attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic' attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context) diff --git a/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec b/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec index 4fba14c34..83cac55c6 100644 --- a/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec +++ b/instrumentation/aws_sdk/opentelemetry-instrumentation-aws_sdk.gemspec @@ -19,7 +19,8 @@ Gem::Specification.new do |spec| spec.homepage = 'https://github.com/open-telemetry/opentelemetry-ruby-contrib' spec.license = 'Apache-2.0' - spec.files = Dir.glob('lib/**/*.rb') + + spec.files = + Dir.glob('lib/**/*.rb') + Dir.glob('*.md') + ['LICENSE', '.yardopts'] spec.require_paths = ['lib']