Skip to content

Commit

Permalink
ensure that app-started event is sent at most once, flush events befo…
Browse files Browse the repository at this point in the history
…re stopping worker
  • Loading branch information
anmarchenko committed Jun 17, 2024
1 parent c33435c commit 2457491
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 31 deletions.
17 changes: 8 additions & 9 deletions lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: tru
heartbeat_interval_seconds: heartbeat_interval_seconds,
emitter: Emitter.new
)
@worker.start
end

def disable!
Expand All @@ -38,26 +39,24 @@ def disable!
def started!
return if !@enabled || forked?

@worker.start
@worker.enqueue(Event::AppDependenciesLoaded.new) if @dependency_collection

@started = true
end

def emit_closing!
return if !@enabled || forked?

@worker.enqueue(Event::AppClosing.new)
end

def stop!
return if @stopped

# gracefully stop the worker and send leftover events
@worker.stop
@worker.stop(true)
@stopped = true
end

def emit_closing!
return if !@enabled || forked?

@worker.enqueue(Event::AppClosing.new)
end

def integrations_change!
return if !@enabled || forked?

Expand Down
43 changes: 27 additions & 16 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'event'

require_relative '../utils/only_once_successful'
require_relative '../workers/polling'
require_relative '../workers/queue'

Expand All @@ -15,6 +16,8 @@ class Worker

DEFAULT_BUFFER_MAX_SIZE = 1000

TELEMETRY_STARTED_ONCE = Utils::OnlyOnceSuccessful.new

def initialize(
heartbeat_interval_seconds:,
emitter:,
Expand All @@ -24,8 +27,6 @@ def initialize(
)
@emitter = emitter

@sent_started_event = false

# Workers::Polling settings
self.enabled = enabled
# Workers::IntervalLoop settings
Expand All @@ -48,6 +49,8 @@ def start
def stop(force_stop = false, timeout = @shutdown_timeout)
buffer.close if running?

flush_events(dequeue) if work_pending?

super
end

Expand All @@ -56,7 +59,7 @@ def enqueue(event)
end

def sent_started_event?
@sent_started_event
TELEMETRY_STARTED_ONCE.ran?
end

private
Expand Down Expand Up @@ -89,24 +92,25 @@ def heartbeat!
def started!
return unless enabled?

res = send_event(Event::AppStarted.new)
TELEMETRY_STARTED_ONCE.run do
res = send_event(Event::AppStarted.new)

if res.not_found? # Telemetry is only supported by agent versions 7.34 and up
Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
self.enabled = false
elsif res.ok?
Datadog.logger.debug('Telemetry app-started event is successfully sent')
@sent_started_event = true
else
Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...')
if res.ok?
Datadog.logger.debug('Telemetry app-started event is successfully sent')
true
else
Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...')
false
end
end
end

def send_event(event)
Datadog.logger.debug { "Sending telemetry event: #{event}" }
response = @emitter.request(event)
Datadog.logger.debug { "Received response: #{response}" }
response
res = @emitter.request(event)

disable_on_not_found!(res)

res
end

def dequeue
Expand All @@ -120,6 +124,13 @@ def buffer_klass
Core::Buffer::ThreadSafe
end
end

def disable_on_not_found!(response)
return unless response.not_found?

Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
self.enabled = false
end
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Datadog
include Core::Workers::IntervalLoop
include Core::Workers::Queue

TELEMETRY_STARTED_ONCE: Datadog::Core::Utils::OnlyOnceSuccessful
DEFAULT_BUFFER_MAX_SIZE: 1000

@emitter: Emitter
Expand All @@ -23,6 +24,8 @@ module Datadog

def enqueue: (Event::Base event) -> void

def dequeue: () -> Array[Event::Base]

private

def heartbeat!: () -> void
Expand All @@ -33,6 +36,8 @@ module Datadog

def send_event: (Event::Base event) -> Datadog::Core::Telemetry::Http::Adapters::Net::Response

def disable_on_not_found!: (Datadog::Core::Telemetry::Http::Adapters::Net::Response response) -> void

def buffer_klass: () -> untyped
end
end
Expand Down
4 changes: 2 additions & 2 deletions spec/datadog/core/telemetry/component_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
it do
started!

expect(worker).to_not have_received(:start)
expect(worker).to_not have_received(:enqueue)
end
end

Expand Down Expand Up @@ -118,7 +118,7 @@
expect_in_fork do
telemetry.started!

expect(worker).to_not have_received(:start)
expect(worker).to_not have_received(:enqueue)
end
end
end
Expand Down
78 changes: 74 additions & 4 deletions spec/datadog/core/telemetry/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
end

after do
worker.stop(true, 0)
worker.stop(true)
worker.join

Datadog::Core::Telemetry::Worker::TELEMETRY_STARTED_ONCE.send(:reset_ran_once_state_for_tests)
end

describe '.new' do
Expand Down Expand Up @@ -97,19 +99,19 @@
end

it 'always sends heartbeat event after started event' do
@sent_hearbeat = false
sent_hearbeat = false
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
# app-started was already sent by now
expect(worker.sent_started_event?).to be(true)

@sent_hearbeat = true
sent_hearbeat = true

response
end

worker.start

try_wait_until { @sent_hearbeat }
try_wait_until { sent_hearbeat }
end
end

Expand All @@ -124,6 +126,42 @@
expect(@received_heartbeat).to be(false)
end
end

context 'several workers running' do
it 'sends single started event' do
started_events = 0
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppStarted)) do
started_events += 1

response
end

heartbeat_events = 0
allow(emitter).to receive(:request).with(kind_of(Datadog::Core::Telemetry::Event::AppHeartbeat)) do
heartbeat_events += 1

response
end

workers = Array.new(3) do
described_class.new(
enabled: enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
emitter: emitter
)
end
workers.each(&:start)

try_wait_until { heartbeat_events >= 3 }

expect(started_events).to be(1)

workers.each do |w|
w.stop(true, 0)
w.join
end
end
end
end

context 'when disabled' do
Expand All @@ -137,13 +175,45 @@
end
end

describe '#stop' do
let(:heartbeat_interval_seconds) { 3 }

it 'flushes events and stops the worker' do
events_received = 0
allow(emitter).to receive(:request).with(
an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange)
) do
events_received += 1

response
end

worker.start

worker.enqueue(Datadog::Core::Telemetry::Event::AppIntegrationsChange.new)
worker.stop(true)

try_wait_until { !worker.running? }

expect(worker).to have_attributes(
enabled?: true,
loop_base_interval: heartbeat_interval_seconds,
run_async?: false,
running?: false,
started?: true
)
end
end

describe '#enqueue' do
it 'adds events to the buffer and flushes them later' do
events_received = 0
allow(emitter).to receive(:request).with(
an_instance_of(Datadog::Core::Telemetry::Event::AppIntegrationsChange)
) do
events_received += 1

response
end

worker.start
Expand Down

0 comments on commit 2457491

Please sign in to comment.