From 94709682e050847bf0f49ee7c4253d61a547ff0f Mon Sep 17 00:00:00 2001 From: Tim Perkins Date: Sat, 6 Jun 2020 16:00:43 -0400 Subject: [PATCH 1/4] Kafka contrib integration --- Appraisals | 1 + Rakefile | 1 + docs/GettingStarted.md | 24 + lib/ddtrace.rb | 1 + .../contrib/kafka/configuration/settings.rb | 25 + lib/ddtrace/contrib/kafka/consumer_event.rb | 14 + .../contrib/kafka/consumer_group_event.rb | 14 + lib/ddtrace/contrib/kafka/event.rb | 51 ++ lib/ddtrace/contrib/kafka/events.rb | 44 ++ .../kafka/events/connection/request.rb | 34 + .../kafka/events/consumer/process_batch.rb | 41 ++ .../kafka/events/consumer/process_message.rb | 39 ++ .../kafka/events/consumer_group/heartbeat.rb | 39 ++ .../kafka/events/consumer_group/join_group.rb | 29 + .../events/consumer_group/leave_group.rb | 29 + .../kafka/events/consumer_group/sync_group.rb | 29 + .../events/produce_operation/send_messages.rb | 32 + .../kafka/events/producer/deliver_messages.rb | 35 + lib/ddtrace/contrib/kafka/ext.rb | 38 + lib/ddtrace/contrib/kafka/integration.rb | 39 ++ lib/ddtrace/contrib/kafka/patcher.rb | 26 + .../ddtrace/contrib/kafka/integration_spec.rb | 94 +++ spec/ddtrace/contrib/kafka/patcher_spec.rb | 659 ++++++++++++++++++ 23 files changed, 1338 insertions(+) create mode 100644 lib/ddtrace/contrib/kafka/configuration/settings.rb create mode 100644 lib/ddtrace/contrib/kafka/consumer_event.rb create mode 100644 lib/ddtrace/contrib/kafka/consumer_group_event.rb create mode 100644 lib/ddtrace/contrib/kafka/event.rb create mode 100644 lib/ddtrace/contrib/kafka/events.rb create mode 100644 lib/ddtrace/contrib/kafka/events/connection/request.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer/process_message.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb create mode 100644 lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb create mode 100644 lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb create mode 100644 lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb create mode 100644 lib/ddtrace/contrib/kafka/ext.rb create mode 100644 lib/ddtrace/contrib/kafka/integration.rb create mode 100644 lib/ddtrace/contrib/kafka/patcher.rb create mode 100644 spec/ddtrace/contrib/kafka/integration_spec.rb create mode 100644 spec/ddtrace/contrib/kafka/patcher_spec.rb diff --git a/Appraisals b/Appraisals index a7c90ffd34f..e8c74d40350 100644 --- a/Appraisals +++ b/Appraisals @@ -353,6 +353,7 @@ elsif Gem::Version.new('2.2.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' diff --git a/Rakefile b/Rakefile index 38ec48a0d0c..ccc9ad8aade 100644 --- a/Rakefile +++ b/Rakefile @@ -95,6 +95,7 @@ namespace :spec do :graphql, :grpc, :http, + :kafka, :mongodb, :mysql2, :presto, diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index be3555e6155..ae25accb261 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -344,6 +344,7 @@ For a list of available integrations, and their configuration options, please re | Grape | `grape` | `>= 1.0` | *[Link](#grape)* | *[Link](https://github.com/ruby-grape/grape)* | | GraphQL | `graphql` | `>= 1.7.9` | *[Link](#graphql)* | *[Link](https://github.com/rmosolgo/graphql-ruby)* | | gRPC | `grpc` | `>= 1.7` | *[Link](#grpc)* | *[Link](https://github.com/grpc/grpc/tree/master/src/rubyc)* | +| Kafka | `ruby-kafka` | `>= 0.7.10` | *[Link](#kafka)* | MongoDB | `mongo` | `>= 2.1` | *[Link](#mongodb)* | *[Link](https://github.com/mongodb/mongo-ruby-driver)* | | MySQL2 | `mysql2` | `>= 0.3.21` | *[Link](#mysql2)* | *[Link](https://github.com/brianmario/mysql2)* | | Net/HTTP | `http` | *(Any supported Ruby)* | *[Link](#nethttp)* | *[Link](https://ruby-doc.org/stdlib-2.4.0/libdoc/net/http/rdoc/Net/HTTP.html)* | @@ -946,6 +947,29 @@ alternate_client = Demo::Echo::Service.rpc_stub_class.new( The integration will ensure that the `configured_interceptor` establishes a unique tracing setup for that client instance. +### Kafka + +The Kafka integration provides tracing of the `ruby-kafka` gem: + +You can enable it through `Datadog.configure`: + +```ruby +require 'ddtrace' + +Datadog.configure do |c| + c.use :kafka, options +end +``` + +Where `options` is an optional `Hash` that accepts the following parameters: + +| Key | Description | Default | +| --- | ----------- | ------- | +| `analytics_enabled` | Enable analytics for spans produced by this integration. `true` for on, `nil` to defer to global setting, `false` for off. | `false` | +| `service_name` | Service name used for `kafka` instrumentation | `'kafka'` | +| `tracer` | `Datadog::Tracer` used to perform instrumentation. Usually you don't need to set this. | `Datadog.tracer` | + + ### MongoDB The integration traces any `Command` that is sent from the [MongoDB Ruby Driver](https://github.com/mongodb/mongo-ruby-driver) to a MongoDB cluster. By extension, Object Document Mappers (ODM) such as Mongoid are automatically instrumented if they use the official Ruby driver. To activate the integration, simply: diff --git a/lib/ddtrace.rb b/lib/ddtrace.rb index 32f53da8b85..755494cf53c 100644 --- a/lib/ddtrace.rb +++ b/lib/ddtrace.rb @@ -56,6 +56,7 @@ module Datadog require 'ddtrace/contrib/grpc/integration' require 'ddtrace/contrib/http/integration' require 'ddtrace/contrib/integration' +require 'ddtrace/contrib/kafka/integration' require 'ddtrace/contrib/presto/integration' require 'ddtrace/contrib/mysql2/integration' require 'ddtrace/contrib/mongodb/integration' diff --git a/lib/ddtrace/contrib/kafka/configuration/settings.rb b/lib/ddtrace/contrib/kafka/configuration/settings.rb new file mode 100644 index 00000000000..f2bafd8b9c9 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/configuration/settings.rb @@ -0,0 +1,25 @@ +require 'ddtrace/contrib/configuration/settings' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + module Configuration + # Custom settings for the Kafka integration + class Settings < Contrib::Configuration::Settings + option :analytics_enabled do |o| + o.default { env_to_bool(Ext::ENV_ANALYTICS_ENABLED, false) } + o.lazy + end + + option :analytics_sample_rate do |o| + o.default { env_to_float(Ext::ENV_ANALYTICS_SAMPLE_RATE, 1.0) } + o.lazy + end + + option :service_name, default: Ext::SERVICE_NAME + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/consumer_event.rb b/lib/ddtrace/contrib/kafka/consumer_event.rb new file mode 100644 index 00000000000..d06eaf7bb18 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/consumer_event.rb @@ -0,0 +1,14 @@ +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an event for a consumer. + module ConsumerEvent + def process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_GROUP, payload[:group_id]) + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/consumer_group_event.rb b/lib/ddtrace/contrib/kafka/consumer_group_event.rb new file mode 100644 index 00000000000..46da336182a --- /dev/null +++ b/lib/ddtrace/contrib/kafka/consumer_group_event.rb @@ -0,0 +1,14 @@ +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an event for a consumer group. + module ConsumerGroupEvent + def process(span, _event, _id, payload) + super + + span.resource = payload[:group_id] + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/event.rb b/lib/ddtrace/contrib/kafka/event.rb new file mode 100644 index 00000000000..608e5ae468d --- /dev/null +++ b/lib/ddtrace/contrib/kafka/event.rb @@ -0,0 +1,51 @@ +require 'ddtrace/contrib/analytics' +require 'ddtrace/contrib/active_support/notifications/event' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an ActiveSupport event. + module Event + def self.included(base) + base.send(:include, ActiveSupport::Notifications::Event) + base.send(:extend, ClassMethods) + end + + # Class methods for Kafka events. + module ClassMethods + def event_name + self::EVENT_NAME + end + + def span_options + { service: configuration[:service_name] } + end + + def tracer + -> { configuration[:tracer] } + end + + def configuration + Datadog.configuration[:kafka] + end + + def process(span, _event, _id, payload) + span.service = configuration[:service_name] + span.set_tag(Ext::TAG_CLIENT, payload[:client_id]) + + # Set analytics sample rate + if Contrib::Analytics.enabled?(configuration[:analytics_enabled]) + Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate]) + end + + # Measure service stats + Contrib::Analytics.set_measured(span) + + span.set_error(payload[:exception_object]) if payload[:exception_object] + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events.rb b/lib/ddtrace/contrib/kafka/events.rb new file mode 100644 index 00000000000..3b961d596cd --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events.rb @@ -0,0 +1,44 @@ +require 'ddtrace/contrib/kafka/events/connection/request' +require 'ddtrace/contrib/kafka/events/consumer/process_batch' +require 'ddtrace/contrib/kafka/events/consumer/process_message' +require 'ddtrace/contrib/kafka/events/consumer_group/heartbeat' +require 'ddtrace/contrib/kafka/events/consumer_group/join_group' +require 'ddtrace/contrib/kafka/events/consumer_group/leave_group' +require 'ddtrace/contrib/kafka/events/consumer_group/sync_group' +require 'ddtrace/contrib/kafka/events/produce_operation/send_messages' +require 'ddtrace/contrib/kafka/events/producer/deliver_messages' + +module Datadog + module Contrib + module Kafka + # Defines collection of instrumented Kafka events + module Events + ALL = [ + Events::Connection::Request, + Events::Consumer::ProcessBatch, + Events::Consumer::ProcessMessage, + Events::ConsumerGroup::Heartbeat, + Events::ConsumerGroup::JoinGroup, + Events::ConsumerGroup::LeaveGroup, + Events::ConsumerGroup::SyncGroup, + Events::ProduceOperation::SendMessages, + Events::Producer::DeliverMessages + ].freeze + + module_function + + def all + self::ALL + end + + def subscriptions + all.collect(&:subscriptions).collect(&:to_a).flatten + end + + def subscribe! + all.each(&:subscribe!) + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/connection/request.rb b/lib/ddtrace/contrib/kafka/events/connection/request.rb new file mode 100644 index 00000000000..f7ae74cb155 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/connection/request.rb @@ -0,0 +1,34 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module Connection + # Defines instrumentation for request.connection.kafka event + module Request + include Kafka::Event + + EVENT_NAME = 'request.connection.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:api] + + span.set_tag(Ext::TAG_REQUEST_SIZE, payload[:request_size]) if payload.key?(:request_size) + span.set_tag(Ext::TAG_RESPONSE_SIZE, payload[:response_size]) if payload.key?(:response_size) + end + + module_function + + def span_name + Ext::SPAN_CONNECTION_REQUEST + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb b/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb new file mode 100644 index 00000000000..b6001bc9364 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb @@ -0,0 +1,41 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' + +module Datadog + module Contrib + module Kafka + module Events + module Consumer + # Defines instrumentation for process_batch.consumer.kafka event + module ProcessBatch + include Kafka::Event + extend Kafka::ConsumerEvent + + EVENT_NAME = 'process_batch.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:topic] + + span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic) + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition) + if payload.key?(:highwater_mark_offset) + span.set_tag(Ext::TAG_HIGHWATER_MARK_OFFSET, payload[:highwater_mark_offset]) + end + span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag) + end + + module_function + + def span_name + Ext::SPAN_PROCESS_BATCH + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb b/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb new file mode 100644 index 00000000000..d732fda5a4e --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' + +module Datadog + module Contrib + module Kafka + module Events + module Consumer + # Defines instrumentation for process_message.consumer.kafka event + module ProcessMessage + include Kafka::Event + extend Kafka::ConsumerEvent + + EVENT_NAME = 'process_message.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:topic] + + span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic) + span.set_tag(Ext::TAG_MESSAGE_KEY, payload[:key]) if payload.key?(:key) + span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition) + span.set_tag(Ext::TAG_OFFSET, payload[:offset]) if payload.key?(:offset) + span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag) + end + + module_function + + def span_name + Ext::SPAN_PROCESS_MESSAGE + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb new file mode 100644 index 00000000000..bc31a717c6c --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for heartbeat.consumer.kafka event + module Heartbeat + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'heartbeat.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + if payload.key?(:topic_partitions) + payload[:topic_partitions].each do |topic, partitions| + span.set_tag("#{Ext::TAG_TOPIC_PARTITIONS}.#{topic}", partitions) + end + end + end + + module_function + + def span_name + Ext::SPAN_CONSUMER_HEARTBEAT + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb new file mode 100644 index 00000000000..cbdb4fb1ef3 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for join_group.consumer.kafka event + module JoinGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'join_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_JOIN_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb new file mode 100644 index 00000000000..99b9a51d1c4 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for leave_group.consumer.kafka event + module LeaveGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'leave_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_LEAVE_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb new file mode 100644 index 00000000000..6809a94cdfe --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for sync_group.consumer.kafka event + module SyncGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'sync_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_SYNC_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb b/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb new file mode 100644 index 00000000000..db16642848b --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb @@ -0,0 +1,32 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module ProduceOperation + # Defines instrumentation for send_messages.producer.kafka event + module SendMessages + include Kafka::Event + + EVENT_NAME = 'send_messages.producer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + span.set_tag(Ext::TAG_SENT_MESSAGE_COUNT, payload[:sent_message_count]) if payload.key?(:sent_message_count) + end + + module_function + + def span_name + Ext::SPAN_SEND_MESSAGES + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb b/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb new file mode 100644 index 00000000000..fe81299f90d --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb @@ -0,0 +1,35 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module Producer + # Defines instrumentation for deliver_messages.producer.kafka event + module DeliverMessages + include Kafka::Event + + EVENT_NAME = 'deliver_messages.producer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_ATTEMPTS, payload[:attempts]) if payload.key?(:attempts) + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + if payload.key?(:delivered_message_count) + span.set_tag(Ext::TAG_DELIVERED_MESSAGE_COUNT, payload[:delivered_message_count]) + end + end + + module_function + + def span_name + Ext::SPAN_DELIVER_MESSAGES + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/ext.rb b/lib/ddtrace/contrib/kafka/ext.rb new file mode 100644 index 00000000000..2b7440c2383 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/ext.rb @@ -0,0 +1,38 @@ +module Datadog + module Contrib + module Kafka + # Kafka integration constants + module Ext + APP = 'kafka'.freeze + ENV_ANALYTICS_ENABLED = 'DD_KAFKA_ANALYTICS_ENABLED'.freeze + ENV_ANALYTICS_SAMPLE_RATE = 'DD_KAFKA_ANALYTICS_SAMPLE_RATE'.freeze + SERVICE_NAME = 'kafka'.freeze + SPAN_CONNECTION_REQUEST = 'kafka.connection.request'.freeze + SPAN_CONSUMER_HEARTBEAT = 'kafka.consumer.heartbeat'.freeze + SPAN_CONSUMER_JOIN_GROUP = 'kafka.consumer.join_group'.freeze + SPAN_CONSUMER_LEAVE_GROUP = 'kafka.consumer.leave_group'.freeze + SPAN_CONSUMER_SYNC_GROUP = 'kafka.consumer.sync_group'.freeze + SPAN_DELIVER_MESSAGES = 'kafka.producer.deliver_messages'.freeze + SPAN_PROCESS_BATCH = 'kafka.consumer.process_batch'.freeze + SPAN_PROCESS_MESSAGE = 'kafka.consumer.process_message'.freeze + SPAN_SEND_MESSAGES = 'kafka.producer.send_messages'.freeze + TAG_ATTEMPTS = 'kafka.attempts'.freeze + TAG_API = 'kafka.api'.freeze + TAG_CLIENT = 'kafka.client'.freeze + TAG_GROUP = 'kafka.group'.freeze + TAG_HIGHWATER_MARK_OFFSET = 'kafka.highwater_mark_offset'.freeze + TAG_MESSAGE_COUNT = 'kafka.message_count'.freeze + TAG_MESSAGE_KEY = 'kafka.message_key'.freeze + TAG_DELIVERED_MESSAGE_COUNT = 'kafka.delivered_message_count'.freeze + TAG_OFFSET = 'kafka.offset'.freeze + TAG_OFFSET_LAG = 'kafka.offset_lag'.freeze + TAG_PARTITION = 'kafka.partition'.freeze + TAG_REQUEST_SIZE = 'kafka.request_size'.freeze + TAG_RESPONSE_SIZE = 'kafka.response_size'.freeze + TAG_SENT_MESSAGE_COUNT = 'kafka.sent_message_count'.freeze + TAG_TOPIC = 'kafka.topic'.freeze + TAG_TOPIC_PARTITIONS = 'kafka.topic_partitions'.freeze + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/integration.rb b/lib/ddtrace/contrib/kafka/integration.rb new file mode 100644 index 00000000000..a1d4ef632bb --- /dev/null +++ b/lib/ddtrace/contrib/kafka/integration.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/integration' +require 'ddtrace/contrib/kafka/configuration/settings' +require 'ddtrace/contrib/kafka/patcher' + +module Datadog + module Contrib + module Kafka + # Description of Kafka integration + class Integration + include Contrib::Integration + + MINIMUM_VERSION = Gem::Version.new('0.7.10') + + register_as :kafka, auto_patch: false + + def self.version + Gem.loaded_specs['ruby-kafka'] && Gem.loaded_specs['ruby-kafka'].version + end + + def self.loaded? + !defined?(::Kafka).nil? \ + && !defined?(::ActiveSupport::Notifications).nil? + end + + def self.compatible? + super && version >= MINIMUM_VERSION + end + + def default_configuration + Configuration::Settings.new + end + + def patcher + Patcher + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/patcher.rb b/lib/ddtrace/contrib/kafka/patcher.rb new file mode 100644 index 00000000000..4ba2836744d --- /dev/null +++ b/lib/ddtrace/contrib/kafka/patcher.rb @@ -0,0 +1,26 @@ +require 'ddtrace/contrib/patcher' +require 'ddtrace/ext/app_types' +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/events' + +module Datadog + module Contrib + module Kafka + # Patcher enables patching of 'kafka' module. + module Patcher + include Contrib::Patcher + + module_function + + def target_version + Integration.version + end + + def patch + # Subscribe to Kafka events + Events.subscribe! + end + end + end + end +end diff --git a/spec/ddtrace/contrib/kafka/integration_spec.rb b/spec/ddtrace/contrib/kafka/integration_spec.rb new file mode 100644 index 00000000000..7ccd9e5eb2f --- /dev/null +++ b/spec/ddtrace/contrib/kafka/integration_spec.rb @@ -0,0 +1,94 @@ +require 'spec_helper' + +require 'ddtrace/contrib/kafka/integration' + +RSpec.describe Datadog::Contrib::Kafka::Integration do + extend ConfigurationHelpers + + let(:integration) { described_class.new(:kafka) } + + describe '.version' do + subject(:version) { described_class.version } + + context 'when the "ruby-kafka" gem is loaded' do + include_context 'loaded gems', :'ruby-kafka' => described_class::MINIMUM_VERSION + it { is_expected.to be_a_kind_of(Gem::Version) } + end + + context 'when "ruby-kafka" gem is not loaded' do + include_context 'loaded gems', :'ruby-kafka' => nil + it { is_expected.to be nil } + end + end + + describe '.loaded?' do + subject(:loaded?) { described_class.loaded? } + + context 'when neither Kafka or ActiveSupport::Notifications are defined' do + before do + hide_const('Kafka') + hide_const('ActiveSupport::Notifications') + end + + it { is_expected.to be false } + end + + context 'when only Kafka is defined' do + before do + stub_const('Kafka', Class.new) + hide_const('ActiveSupport::Notifications') + end + + it { is_expected.to be false } + end + + context 'when only ActiveSupport::Notifications is defined' do + before do + hide_const('Kafka') + stub_const('ActiveSupport::Notifications', Class.new) + end + + it { is_expected.to be false } + end + + context 'when both Kafka and ActiveSupport::Notifications are defined' do + before do + stub_const('Kafka', Class.new) + stub_const('ActiveSupport::Notifications', Class.new) + end + + it { is_expected.to be true } + end + end + + describe '.compatible?' do + subject(:compatible?) { described_class.compatible? } + + context 'when "ruby-kafka" gem is loaded with a version' do + context 'that is less than the minimum' do + include_context 'loaded gems', :'ruby-kafka' => decrement_gem_version(described_class::MINIMUM_VERSION) + it { is_expected.to be false } + end + + context 'that meets the minimum version' do + include_context 'loaded gems', :'ruby-kafka' => described_class::MINIMUM_VERSION + it { is_expected.to be true } + end + end + + context 'when gem is not loaded' do + include_context 'loaded gems', :'ruby-kafka' => nil + it { is_expected.to be false } + end + end + + describe '#default_configuration' do + subject(:default_configuration) { integration.default_configuration } + it { is_expected.to be_a_kind_of(Datadog::Contrib::Kafka::Configuration::Settings) } + end + + describe '#patcher' do + subject(:patcher) { integration.patcher } + it { is_expected.to be Datadog::Contrib::Kafka::Patcher } + end +end diff --git a/spec/ddtrace/contrib/kafka/patcher_spec.rb b/spec/ddtrace/contrib/kafka/patcher_spec.rb new file mode 100644 index 00000000000..5e2b5b2ba06 --- /dev/null +++ b/spec/ddtrace/contrib/kafka/patcher_spec.rb @@ -0,0 +1,659 @@ +require 'spec_helper' +require 'ddtrace/contrib/analytics_examples' + +require 'ruby-kafka' +require 'active_support' +require 'ddtrace' + +RSpec.describe 'Kafka patcher' do + let(:tracer) { get_test_tracer } + let(:configuration_options) { { tracer: tracer } } + let(:client_id) { SecureRandom.uuid } + let(:span) do + all_spans.select { |s| s.name == span_name }.first + end + + def all_spans + tracer.writer.spans(:keep) + end + + before(:each) do + Datadog.configure do |c| + c.use :kafka, configuration_options + end + end + + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:kafka].reset_configuration! + example.run + Datadog.registry[:kafka].reset_configuration! + end + + describe 'connection.request' do + let(:api) { 'api' } + let(:request_size) { rand(1..1000) } + let(:response_size) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + api: api, + request_size: request_size, + response_size: response_size + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONNECTION_REQUEST } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('request.connection.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.connection.request') + expect(span.resource).to eq(api) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.request_size')).to eq(request_size) + expect(span.get_tag('kafka.response_size')).to eq(response_size) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('request.connection.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.connection.request') + expect(span.resource).to eq(api) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.request_size')).to eq(request_size) + expect(span.get_tag('kafka.response_size')).to eq(response_size) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('request.connection.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('request.connection.kafka', payload) } + end + end + + describe 'consumer.process_batch' do + let(:group_id) { SecureRandom.uuid } + let(:topic) { 'my-topic' } + let(:message_count) { rand(1..10) } + let(:partition) { rand(0..100) } + let(:highwater_mark_offset) { rand(100..1000) } + let(:offset_lag) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + group_id: group_id, + topic: topic, + message_count: message_count, + partition: partition, + highwater_mark_offset: highwater_mark_offset, + offset_lag: offset_lag + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_PROCESS_BATCH } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_batch') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.highwater_mark_offset')).to eq(highwater_mark_offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_batch') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.highwater_mark_offset')).to eq(highwater_mark_offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) } + end + end + + describe 'consumer.process_message' do + let(:group_id) { SecureRandom.uuid } + let(:topic) { 'my-topic' } + let(:key) { SecureRandom.hex } + let(:partition) { rand(0..100) } + let(:offset) { rand(1..1000) } + let(:offset_lag) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + group_id: group_id, + key: key, + topic: topic, + partition: partition, + offset: offset, + offset_lag: offset_lag + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_PROCESS_MESSAGE } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_message') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_key')).to eq(key) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.offset')).to eq(offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_message') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_key')).to eq(key) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.offset')).to eq(offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) } + end + end + + describe 'consumer.heartbeat' do + let(:group_id) { SecureRandom.uuid } + let(:topic_partitions) do + { + 'foo' => [0, 2], + 'bar' => [1, 3] + } + end + let(:payload) do + { + client_id: client_id, + group_id: group_id, + topic_partitions: topic_partitions + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_HEARTBEAT } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.heartbeat') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic_partitions.foo')).to eq(topic_partitions['foo'].to_s) + expect(span.get_tag('kafka.topic_partitions.bar')).to eq(topic_partitions['bar'].to_s) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.heartbeat') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic_partitions.foo')).to eq(topic_partitions['foo'].to_s) + expect(span.get_tag('kafka.topic_partitions.bar')).to eq(topic_partitions['bar'].to_s) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) } + end + end + + describe 'consumer.join_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_JOIN_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.join_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.join_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) } + end + end + + describe 'consumer.leave_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_LEAVE_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.leave_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.leave_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) } + end + end + + describe 'consumer.sync_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_SYNC_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.sync_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.sync_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) } + end + end + + describe 'producer.send_messages' do + let(:message_count) { rand(10..100) } + let(:sent_message_count) { rand(1..message_count) } + let(:payload) do + { + client_id: client_id, + message_count: message_count, + sent_message_count: sent_message_count + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_SEND_MESSAGES } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.send_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.sent_message_count')).to eq(sent_message_count) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.send_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.sent_message_count')).to eq(sent_message_count) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) } + end + end + + describe 'producer.deliver_messages' do + let(:attempts) { rand(1..10) } + let(:message_count) { rand(10..100) } + let(:delivered_message_count) { rand(1..message_count) } + let(:payload) do + { + client_id: client_id, + attempts: attempts, + message_count: message_count, + delivered_message_count: delivered_message_count + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_DELIVER_MESSAGES } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.deliver_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.attempts')).to eq(attempts) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.delivered_message_count')).to eq(delivered_message_count) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.deliver_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.attempts')).to eq(attempts) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.delivered_message_count')).to eq(delivered_message_count) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) } + end + end +end From a8b7a8fe54a29c2b0444dc0a1cd9f56734bc51d2 Mon Sep 17 00:00:00 2001 From: Tim Perkins Date: Thu, 11 Jun 2020 06:41:13 -0400 Subject: [PATCH 2/4] Use contrib spec_helper --- spec/ddtrace/contrib/kafka/integration_spec.rb | 2 +- spec/ddtrace/contrib/kafka/patcher_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/ddtrace/contrib/kafka/integration_spec.rb b/spec/ddtrace/contrib/kafka/integration_spec.rb index 7ccd9e5eb2f..fc594c8aacf 100644 --- a/spec/ddtrace/contrib/kafka/integration_spec.rb +++ b/spec/ddtrace/contrib/kafka/integration_spec.rb @@ -1,4 +1,4 @@ -require 'spec_helper' +require 'ddtrace/contrib/support/spec_helper' require 'ddtrace/contrib/kafka/integration' diff --git a/spec/ddtrace/contrib/kafka/patcher_spec.rb b/spec/ddtrace/contrib/kafka/patcher_spec.rb index 5e2b5b2ba06..dae43ca7da0 100644 --- a/spec/ddtrace/contrib/kafka/patcher_spec.rb +++ b/spec/ddtrace/contrib/kafka/patcher_spec.rb @@ -1,4 +1,4 @@ -require 'spec_helper' +require 'ddtrace/contrib/support/spec_helper' require 'ddtrace/contrib/analytics_examples' require 'ruby-kafka' From 5d08b34821f54e9c7994aebc6c5ebf270cc6c812 Mon Sep 17 00:00:00 2001 From: Tim Perkins Date: Wed, 17 Jun 2020 06:40:44 -0400 Subject: [PATCH 3/4] Additional requires in GettingStarted --- docs/GettingStarted.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index ae25accb261..f059f2c24d3 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -954,6 +954,8 @@ The Kafka integration provides tracing of the `ruby-kafka` gem: You can enable it through `Datadog.configure`: ```ruby +require 'active_support/notifications' # required to enable 'ruby-kafka' instrumentation +require 'kafka' require 'ddtrace' Datadog.configure do |c| From 630cfce1a526729203413b5a6c5e1ec17146c39b Mon Sep 17 00:00:00 2001 From: Tim Perkins Date: Wed, 17 Jun 2020 06:41:24 -0400 Subject: [PATCH 4/4] Add ruby-kafka to all supported versions in Appraisals --- Appraisals | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Appraisals b/Appraisals index e8c74d40350..b11ccb65ee8 100644 --- a/Appraisals +++ b/Appraisals @@ -193,6 +193,7 @@ elsif Gem::Version.new('2.1.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel', '~> 4.0', '< 4.37' gem 'shoryuken' gem 'sidekiq', '~> 3.5.4' @@ -514,6 +515,7 @@ elsif Gem::Version.new('2.3.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -593,6 +595,7 @@ elsif Gem::Version.new('2.4.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -705,6 +708,7 @@ elsif Gem::Version.new('2.5.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -817,6 +821,7 @@ elsif Gem::Version.new('2.6.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -931,6 +936,7 @@ elsif Gem::Version.new('2.7.0') <= Gem::Version.new(RUBY_VERSION) gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq'