Skip to content

Commit

Permalink
Merge pull request #3768 from DataDog/anmarchenko/telemetry_metrics_s…
Browse files Browse the repository at this point in the history
…upport

[SDTEST-409] Telemetry metrics support
  • Loading branch information
anmarchenko authored Jul 10, 2024
2 parents d9912ed + 3c61007 commit ba23d76
Show file tree
Hide file tree
Showing 21 changed files with 362 additions and 90 deletions.
2 changes: 2 additions & 0 deletions lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def build_telemetry(settings, agent_settings, logger)

Telemetry::Component.new(
enabled: enabled,
metrics_enabled: enabled && settings.telemetry.metrics_enabled,
heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds,
dependency_collection: settings.telemetry.dependency_collection
)
end
Expand Down
23 changes: 23 additions & 0 deletions lib/datadog/core/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,16 @@ def initialize(*_)
o.type :bool
end

# Enable metrics collection for telemetry. Metrics collection only works when telemetry is enabled and
# metrics are enabled.
# @default `DD_TELEMETRY_METRICS_ENABLED` environment variable, otherwise `true`.
# @return [Boolean]
option :metrics_enabled do |o|
o.type :bool
o.env Core::Telemetry::Ext::ENV_METRICS_ENABLED
o.default true
end

# The interval in seconds when telemetry must be sent.
#
# This method is used internally, for testing purposes only.
Expand All @@ -676,6 +686,19 @@ def initialize(*_)
o.default 60.0
end

# The interval in seconds when telemetry metrics are aggregated.
# Should be a denominator of `heartbeat_interval_seconds`.
#
# This method is used internally, for testing purposes only.
# @default `DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL` environment variable, otherwise `10`.
# @return [Float]
# @!visibility private
option :metrics_aggregation_interval_seconds do |o|
o.type :float
o.env Core::Telemetry::Ext::ENV_METRICS_AGGREGATION_INTERVAL
o.default 10.0
end

# The install id of the application.
#
# This method is used internally, by library injection.
Expand Down
43 changes: 42 additions & 1 deletion lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'emitter'
require_relative 'event'
require_relative 'metrics_manager'
require_relative 'worker'
require_relative '../utils/forking'

Expand All @@ -15,16 +16,31 @@ class Component
include Core::Utils::Forking

# @param enabled [Boolean] Determines whether telemetry events should be sent to the API
# @param metrics_enabled [Boolean] Determines whether telemetry metrics should be sent to the API
# @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds.
# @param metrics_aggregation_interval_seconds [Float] How frequently metrics will be aggregated, in seconds.
# @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event
def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true)
def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
dependency_collection:,
enabled: true,
metrics_enabled: true
)
@enabled = enabled
@stopped = false

@metrics_manager = MetricsManager.new(
enabled: enabled && metrics_enabled,
aggregation_interval: metrics_aggregation_interval_seconds
)

@worker = Telemetry::Worker.new(
enabled: @enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: Emitter.new,
metrics_manager: @metrics_manager,
dependency_collection: dependency_collection
)
@worker.start
Expand Down Expand Up @@ -60,6 +76,31 @@ def client_configuration_change!(changes)

@worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config'))
end

# Increments a count metric.
def inc(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.inc(namespace, metric_name, value, tags: tags, common: common)
end

# Decremenets a count metric.
def dec(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.dec(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks gauge metric.
def gauge(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.gauge(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks rate metric.
def rate(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.rate(namespace, metric_name, value, tags: tags, common: common)
end

# Tracks distribution metric.
def distribution(namespace, metric_name, value, tags: {}, common: true)
@metrics_manager.distribution(namespace, metric_name, value, tags: tags, common: common)
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/datadog/core/telemetry/ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ module Core
module Telemetry
module Ext
ENV_ENABLED = 'DD_INSTRUMENTATION_TELEMETRY_ENABLED'
ENV_METRICS_ENABLED = 'DD_TELEMETRY_METRICS_ENABLED'
ENV_HEARTBEAT_INTERVAL = 'DD_TELEMETRY_HEARTBEAT_INTERVAL'
ENV_METRICS_AGGREGATION_INTERVAL = 'DD_TELEMETRY_METRICS_AGGREGATION_INTERVAL'
ENV_DEPENDENCY_COLLECTION = 'DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED'
ENV_INSTALL_ID = 'DD_INSTRUMENTATION_INSTALL_ID'
ENV_INSTALL_TYPE = 'DD_INSTRUMENTATION_INSTALL_TYPE'
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Base
attr_reader :name, :tags, :values, :common

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash or array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
def initialize(name, tags: {}, common: true)
@name = name
Expand Down
10 changes: 6 additions & 4 deletions lib/datadog/core/telemetry/metrics_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ def distribution(metric_name, value, tags: {}, common: true)
fetch_or_add_distribution(metric, value)
end

def flush!(queue)
def flush!
@mutex.synchronize do
queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any?
queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any?
events = []
events << Event::GenerateMetrics.new(@namespace, @metrics.values) if @metrics.any?
events << Event::Distributions.new(@namespace, @distributions.values) if @distributions.any?

@metrics = {}
@distributions = {}

events
end
nil
end

private
Expand Down
8 changes: 3 additions & 5 deletions lib/datadog/core/telemetry/metrics_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ def distribution(namespace, metric_name, value, tags: {}, common: true)
collection.distribution(metric_name, value, tags: tags, common: common)
end

def flush!(queue)
return unless @enabled
def flush!
return [] unless @enabled

collections = @mutex.synchronize { @collections.values }
collections.each { |col| col.flush!(queue) }

nil
collections.reduce([]) { |events, collection| events + collection.flush! }
end

def disable!
Expand Down
29 changes: 23 additions & 6 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@ class Worker

def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
emitter:,
metrics_manager:,
dependency_collection:,
enabled: true,
shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
@emitter = emitter
@metrics_manager = metrics_manager
@dependency_collection = dependency_collection

@ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
@current_ticks = 0

# Workers::Polling settings
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = heartbeat_interval_seconds
self.loop_base_interval = metrics_aggregation_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

@shutdown_timeout = shutdown_timeout
Expand Down Expand Up @@ -76,13 +82,19 @@ def perform(*events)

started! unless sent_started_event?

heartbeat!
metric_events = @metrics_manager.flush!
events = [] if events.nil?
flush_events(events + metric_events)

flush_events(events)
@current_ticks += 1
return if @current_ticks < @ticks_per_heartbeat

@current_ticks = 0
heartbeat!
end

def flush_events(events)
return if events.nil? || events.empty?
return if events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
Expand All @@ -100,7 +112,7 @@ def started!

if failed_to_start?
Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker')
self.enabled = false
disable!
return
end

Expand Down Expand Up @@ -144,11 +156,16 @@ def buffer_klass
end
end

def disable!
self.enabled = false
@metrics_manager.disable!
end

def disable_on_not_found!(response)
return unless response.not_found?

Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
self.enabled = false
disable!
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ module Datadog
class Component
@enabled: bool
@stopped: bool
@metrics_manager: Datadog::Core::Telemetry::MetricsManager
@worker: Datadog::Core::Telemetry::Worker

attr_reader enabled: bool

include Core::Utils::Forking

def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void
def initialize: (heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, dependency_collection: bool, ?enabled: bool, ?metrics_enabled: bool) -> void

def disable!: () -> void

Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/telemetry/ext.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ module Datadog
module Ext
ENV_DEPENDENCY_COLLECTION: ::String
ENV_ENABLED: ::String
ENV_METRICS_ENABLED: ::String
ENV_HEARTBEAT_INTERVAL: ::String
ENV_METRICS_AGGREGATION_INTERVAL: ::String
ENV_INSTALL_ID: ::String
ENV_INSTALL_TIME: ::String
ENV_INSTALL_TYPE: ::String
Expand Down
6 changes: 3 additions & 3 deletions sig/datadog/core/telemetry/metric.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ module Datadog
end

class IntervalMetric < Base
@interval: Integer
@interval: Float

attr_reader interval: Integer
attr_reader interval: Float

def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Integer) -> void
def initialize: (String name, ?tags: tags_input, ?common: bool, interval: Float) -> void
end

class Count < Base
Expand Down
10 changes: 3 additions & 7 deletions sig/datadog/core/telemetry/metrics_collection.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ module Datadog
module Core
module Telemetry
class MetricsCollection
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@namespace: String

@interval: Integer
@interval: Float

@mutex: Thread::Mutex

Expand All @@ -18,7 +14,7 @@ module Datadog

attr_reader namespace: String

def initialize: (String namespace, aggregation_interval: Integer) -> void
def initialize: (String namespace, aggregation_interval: Float) -> void

def inc: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

Expand All @@ -30,7 +26,7 @@ module Datadog

def distribution: (String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

private

Expand Down
10 changes: 3 additions & 7 deletions sig/datadog/core/telemetry/metrics_manager.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ module Datadog
module Core
module Telemetry
class MetricsManager
interface _Queue
def enqueue: (Datadog::Core::Telemetry::Event::Base event) -> void
end

@interval: Integer
@interval: Float

@enabled: bool

Expand All @@ -16,7 +12,7 @@ module Datadog

attr_reader enabled: bool

def initialize: (aggregation_interval: Integer, enabled: bool) -> void
def initialize: (aggregation_interval: Float, enabled: bool) -> void

def inc: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

Expand All @@ -28,7 +24,7 @@ module Datadog

def distribution: (String namespace, String metric_name, Datadog::Core::Telemetry::Metric::input_value value, ?tags: Datadog::Core::Telemetry::Metric::tags_input, ?common: bool) -> void

def flush!: (_Queue queue) -> void
def flush!: () -> Array[Datadog::Core::Telemetry::Event::Base]

def disable!: () -> void

Expand Down
11 changes: 8 additions & 3 deletions sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ module Datadog
DEFAULT_BUFFER_MAX_SIZE: 1000

@emitter: Emitter
@metrics_manager: MetricsManager
@sent_started_event: bool
@shutdown_timeout: Integer
@buffer_size: Integer
@dependency_collection: bool
@ticks_per_heartbeat: Integer
@current_ticks: Integer

def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void
def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void

def start: () -> void

Expand All @@ -38,9 +41,11 @@ module Datadog

def flush_events: (Array[Event::Base] events) -> void

def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response
def send_event: (Event::Base event) -> Http::Adapters::Net::Response

def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void
def disable!: () -> void

def disable_on_not_found!: (Http::Adapters::Net::Response response) -> void

def buffer_klass: () -> untyped
end
Expand Down
Loading

0 comments on commit ba23d76

Please sign in to comment.