Skip to content

Commit

Permalink
Merge pull request #1070 from ezcater/tjwp-kafka
Browse files Browse the repository at this point in the history
Add Kafka integration
  • Loading branch information
marcotc authored Jul 9, 2020
2 parents 61ef72b + 630cfce commit 0c7425b
Show file tree
Hide file tree
Showing 23 changed files with 1,346 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ elsif Gem::Version.new('2.1.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel', '~> 4.0', '< 4.37'
gem 'shoryuken'
gem 'sidekiq', '~> 3.5.4'
Expand Down Expand Up @@ -353,6 +354,7 @@ elsif Gem::Version.new('2.2.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down Expand Up @@ -513,6 +515,7 @@ elsif Gem::Version.new('2.3.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down Expand Up @@ -592,6 +595,7 @@ elsif Gem::Version.new('2.4.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down Expand Up @@ -704,6 +708,7 @@ elsif Gem::Version.new('2.5.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down Expand Up @@ -816,6 +821,7 @@ elsif Gem::Version.new('2.6.0') <= Gem::Version.new(RUBY_VERSION) \
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down Expand Up @@ -930,6 +936,7 @@ elsif Gem::Version.new('2.7.0') <= Gem::Version.new(RUBY_VERSION)
gem 'redis', '< 4.0'
gem 'rest-client'
gem 'resque', '< 2.0'
gem 'ruby-kafka', '>= 0.7.10'
gem 'sequel'
gem 'shoryuken'
gem 'sidekiq'
Expand Down
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ namespace :spec do
:graphql,
:grpc,
:http,
:kafka,
:mongodb,
:mysql2,
:presto,
Expand Down
26 changes: 26 additions & 0 deletions docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ For a list of available integrations, and their configuration options, please re
| Grape | `grape` | `>= 1.0` | *[Link](#grape)* | *[Link](https://github.com/ruby-grape/grape)* |
| GraphQL | `graphql` | `>= 1.7.9` | *[Link](#graphql)* | *[Link](https://github.com/rmosolgo/graphql-ruby)* |
| gRPC | `grpc` | `>= 1.7` | *[Link](#grpc)* | *[Link](https://github.com/grpc/grpc/tree/master/src/rubyc)* |
| Kafka | `ruby-kafka` | `>= 0.7.10` | *[Link](#kafka)*
| MongoDB | `mongo` | `>= 2.1` | *[Link](#mongodb)* | *[Link](https://github.com/mongodb/mongo-ruby-driver)* |
| MySQL2 | `mysql2` | `>= 0.3.21` | *[Link](#mysql2)* | *[Link](https://github.com/brianmario/mysql2)* |
| Net/HTTP | `http` | *(Any supported Ruby)* | *[Link](#nethttp)* | *[Link](https://ruby-doc.org/stdlib-2.4.0/libdoc/net/http/rdoc/Net/HTTP.html)* |
Expand Down Expand Up @@ -929,6 +930,31 @@ alternate_client = Demo::Echo::Service.rpc_stub_class.new(
The integration will ensure that the `configured_interceptor` establishes a unique tracing setup for that client instance.
### Kafka
The Kafka integration provides tracing of the `ruby-kafka` gem:
You can enable it through `Datadog.configure`:
```ruby
require 'active_support/notifications' # required to enable 'ruby-kafka' instrumentation
require 'kafka'
require 'ddtrace'
Datadog.configure do |c|
c.use :kafka, options
end
```
Where `options` is an optional `Hash` that accepts the following parameters:
| Key | Description | Default |
| --- | ----------- | ------- |
| `analytics_enabled` | Enable analytics for spans produced by this integration. `true` for on, `nil` to defer to global setting, `false` for off. | `false` |
| `service_name` | Service name used for `kafka` instrumentation | `'kafka'` |
| `tracer` | `Datadog::Tracer` used to perform instrumentation. Usually you don't need to set this. | `Datadog.tracer` |


### MongoDB

The integration traces any `Command` that is sent from the [MongoDB Ruby Driver](https://github.com/mongodb/mongo-ruby-driver) to a MongoDB cluster. By extension, Object Document Mappers (ODM) such as Mongoid are automatically instrumented if they use the official Ruby driver. To activate the integration, simply:
Expand Down
1 change: 1 addition & 0 deletions lib/ddtrace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ module Datadog
require 'ddtrace/contrib/grpc/integration'
require 'ddtrace/contrib/http/integration'
require 'ddtrace/contrib/integration'
require 'ddtrace/contrib/kafka/integration'
require 'ddtrace/contrib/presto/integration'
require 'ddtrace/contrib/mysql2/integration'
require 'ddtrace/contrib/mongodb/integration'
Expand Down
25 changes: 25 additions & 0 deletions lib/ddtrace/contrib/kafka/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'ddtrace/contrib/configuration/settings'
require 'ddtrace/contrib/kafka/ext'

module Datadog
module Contrib
module Kafka
module Configuration
# Custom settings for the Kafka integration
class Settings < Contrib::Configuration::Settings
option :analytics_enabled do |o|
o.default { env_to_bool(Ext::ENV_ANALYTICS_ENABLED, false) }
o.lazy
end

option :analytics_sample_rate do |o|
o.default { env_to_float(Ext::ENV_ANALYTICS_SAMPLE_RATE, 1.0) }
o.lazy
end

option :service_name, default: Ext::SERVICE_NAME
end
end
end
end
end
14 changes: 14 additions & 0 deletions lib/ddtrace/contrib/kafka/consumer_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Datadog
module Contrib
module Kafka
# Defines basic behaviors for an event for a consumer.
module ConsumerEvent
def process(span, _event, _id, payload)
super

span.set_tag(Ext::TAG_GROUP, payload[:group_id])
end
end
end
end
end
14 changes: 14 additions & 0 deletions lib/ddtrace/contrib/kafka/consumer_group_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Datadog
module Contrib
module Kafka
# Defines basic behaviors for an event for a consumer group.
module ConsumerGroupEvent
def process(span, _event, _id, payload)
super

span.resource = payload[:group_id]
end
end
end
end
end
51 changes: 51 additions & 0 deletions lib/ddtrace/contrib/kafka/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'ddtrace/contrib/analytics'
require 'ddtrace/contrib/active_support/notifications/event'
require 'ddtrace/contrib/kafka/ext'

module Datadog
module Contrib
module Kafka
# Defines basic behaviors for an ActiveSupport event.
module Event
def self.included(base)
base.send(:include, ActiveSupport::Notifications::Event)
base.send(:extend, ClassMethods)
end

# Class methods for Kafka events.
module ClassMethods
def event_name
self::EVENT_NAME
end

def span_options
{ service: configuration[:service_name] }
end

def tracer
-> { configuration[:tracer] }
end

def configuration
Datadog.configuration[:kafka]
end

def process(span, _event, _id, payload)
span.service = configuration[:service_name]
span.set_tag(Ext::TAG_CLIENT, payload[:client_id])

# Set analytics sample rate
if Contrib::Analytics.enabled?(configuration[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate])
end

# Measure service stats
Contrib::Analytics.set_measured(span)

span.set_error(payload[:exception_object]) if payload[:exception_object]
end
end
end
end
end
end
44 changes: 44 additions & 0 deletions lib/ddtrace/contrib/kafka/events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require 'ddtrace/contrib/kafka/events/connection/request'
require 'ddtrace/contrib/kafka/events/consumer/process_batch'
require 'ddtrace/contrib/kafka/events/consumer/process_message'
require 'ddtrace/contrib/kafka/events/consumer_group/heartbeat'
require 'ddtrace/contrib/kafka/events/consumer_group/join_group'
require 'ddtrace/contrib/kafka/events/consumer_group/leave_group'
require 'ddtrace/contrib/kafka/events/consumer_group/sync_group'
require 'ddtrace/contrib/kafka/events/produce_operation/send_messages'
require 'ddtrace/contrib/kafka/events/producer/deliver_messages'

module Datadog
module Contrib
module Kafka
# Defines collection of instrumented Kafka events
module Events
ALL = [
Events::Connection::Request,
Events::Consumer::ProcessBatch,
Events::Consumer::ProcessMessage,
Events::ConsumerGroup::Heartbeat,
Events::ConsumerGroup::JoinGroup,
Events::ConsumerGroup::LeaveGroup,
Events::ConsumerGroup::SyncGroup,
Events::ProduceOperation::SendMessages,
Events::Producer::DeliverMessages
].freeze

module_function

def all
self::ALL
end

def subscriptions
all.collect(&:subscriptions).collect(&:to_a).flatten
end

def subscribe!
all.each(&:subscribe!)
end
end
end
end
end
34 changes: 34 additions & 0 deletions lib/ddtrace/contrib/kafka/events/connection/request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'ddtrace/contrib/kafka/ext'
require 'ddtrace/contrib/kafka/event'

module Datadog
module Contrib
module Kafka
module Events
module Connection
# Defines instrumentation for request.connection.kafka event
module Request
include Kafka::Event

EVENT_NAME = 'request.connection.kafka'.freeze

def self.process(span, _event, _id, payload)
super

span.resource = payload[:api]

span.set_tag(Ext::TAG_REQUEST_SIZE, payload[:request_size]) if payload.key?(:request_size)
span.set_tag(Ext::TAG_RESPONSE_SIZE, payload[:response_size]) if payload.key?(:response_size)
end

module_function

def span_name
Ext::SPAN_CONNECTION_REQUEST
end
end
end
end
end
end
end
41 changes: 41 additions & 0 deletions lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require 'ddtrace/contrib/kafka/ext'
require 'ddtrace/contrib/kafka/event'
require 'ddtrace/contrib/kafka/consumer_event'

module Datadog
module Contrib
module Kafka
module Events
module Consumer
# Defines instrumentation for process_batch.consumer.kafka event
module ProcessBatch
include Kafka::Event
extend Kafka::ConsumerEvent

EVENT_NAME = 'process_batch.consumer.kafka'.freeze

def self.process(span, _event, _id, payload)
super

span.resource = payload[:topic]

span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic)
span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count)
span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition)
if payload.key?(:highwater_mark_offset)
span.set_tag(Ext::TAG_HIGHWATER_MARK_OFFSET, payload[:highwater_mark_offset])
end
span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag)
end

module_function

def span_name
Ext::SPAN_PROCESS_BATCH
end
end
end
end
end
end
end
39 changes: 39 additions & 0 deletions lib/ddtrace/contrib/kafka/events/consumer/process_message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
require 'ddtrace/contrib/kafka/ext'
require 'ddtrace/contrib/kafka/event'
require 'ddtrace/contrib/kafka/consumer_event'

module Datadog
module Contrib
module Kafka
module Events
module Consumer
# Defines instrumentation for process_message.consumer.kafka event
module ProcessMessage
include Kafka::Event
extend Kafka::ConsumerEvent

EVENT_NAME = 'process_message.consumer.kafka'.freeze

def self.process(span, _event, _id, payload)
super

span.resource = payload[:topic]

span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic)
span.set_tag(Ext::TAG_MESSAGE_KEY, payload[:key]) if payload.key?(:key)
span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition)
span.set_tag(Ext::TAG_OFFSET, payload[:offset]) if payload.key?(:offset)
span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag)
end

module_function

def span_name
Ext::SPAN_PROCESS_MESSAGE
end
end
end
end
end
end
end
Loading

0 comments on commit 0c7425b

Please sign in to comment.