diff --git a/lib/datadog/core/workers/polling.rb b/lib/datadog/core/workers/polling.rb index 4433ead51ec..fd516f16f79 100644 --- a/lib/datadog/core/workers/polling.rb +++ b/lib/datadog/core/workers/polling.rb @@ -6,7 +6,7 @@ module Core module Workers # Adds polling (async looping) behavior to workers module Polling - SHUTDOWN_TIMEOUT = 1 + DEFAULT_SHUTDOWN_TIMEOUT = 1 def self.included(base) base.include(Workers::IntervalLoop) @@ -21,7 +21,7 @@ def perform(*args) end end - def stop(force_stop = false, timeout = SHUTDOWN_TIMEOUT) + def stop(force_stop = false, timeout = DEFAULT_SHUTDOWN_TIMEOUT) if running? # Attempt graceful stop and wait stop_loop diff --git a/lib/datadog/tracing/component.rb b/lib/datadog/tracing/component.rb index 81e933f6cc3..26e86b128f5 100644 --- a/lib/datadog/tracing/component.rb +++ b/lib/datadog/tracing/component.rb @@ -134,12 +134,12 @@ def ensure_priority_sampling(sampler, settings) # process, but can take a variety of options (including # a fully custom instance) that makes the Tracer # initialization process complex. - def build_writer(settings, agent_settings) + def build_writer(settings, agent_settings, options = settings.tracing.writer_options) if (writer = settings.tracing.writer) return writer end - Tracing::Writer.new(agent_settings: agent_settings, **settings.tracing.writer_options) + Tracing::Writer.new(agent_settings: agent_settings, **options) end def subscribe_to_writer_events!(writer, sampler_delegator, test_mode) @@ -223,8 +223,11 @@ def build_test_mode_sampler end def build_test_mode_writer(settings, agent_settings) - # Flush traces synchronously, to guarantee they are written. writer_options = settings.tracing.test_mode.writer_options || {} + + return build_writer(settings, agent_settings, writer_options) if settings.tracing.test_mode.async + + # Flush traces synchronously, to guarantee they are written. Tracing::SyncWriter.new(agent_settings: agent_settings, **writer_options) end end diff --git a/lib/datadog/tracing/configuration/settings.rb b/lib/datadog/tracing/configuration/settings.rb index 1419f657752..3ecf6d420cd 100644 --- a/lib/datadog/tracing/configuration/settings.rb +++ b/lib/datadog/tracing/configuration/settings.rb @@ -371,6 +371,12 @@ def self.extended(base) o.env Tracing::Configuration::Ext::Test::ENV_MODE_ENABLED end + # Use async writer in test mode + option :async do |o| + o.type :bool + o.default false + end + option :trace_flush option :writer_options do |o| diff --git a/lib/datadog/tracing/workers.rb b/lib/datadog/tracing/workers.rb index b72ef42d7e0..c950a8b7e95 100644 --- a/lib/datadog/tracing/workers.rb +++ b/lib/datadog/tracing/workers.rb @@ -14,7 +14,7 @@ class AsyncTransport DEFAULT_TIMEOUT = 5 BACK_OFF_RATIO = 1.2 BACK_OFF_MAX = 5 - SHUTDOWN_TIMEOUT = 1 + DEFAULT_SHUTDOWN_TIMEOUT = 1 attr_reader \ :trace_buffer @@ -36,6 +36,7 @@ def initialize(options = {}) # Threading @shutdown = ConditionVariable.new + @shutdown_timeout = options.fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT) @mutex = Mutex.new @worker = nil @run = false @@ -89,7 +90,7 @@ def stop # Block until executor shutdown is complete or until timeout seconds have passed. def join - @worker.join(SHUTDOWN_TIMEOUT) + @worker.join(@shutdown_timeout) end # Enqueue an item in the trace internal buffer. This operation is thread-safe diff --git a/lib/datadog/tracing/workers/trace_writer.rb b/lib/datadog/tracing/workers/trace_writer.rb index fc25ab14885..4ece7038e00 100644 --- a/lib/datadog/tracing/workers/trace_writer.rb +++ b/lib/datadog/tracing/workers/trace_writer.rb @@ -104,6 +104,8 @@ def initialize(options = {}) # Workers::Queue settings @buffer_size = options.fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) self.buffer = TraceBuffer.new(@buffer_size) + + @shutdown_timeout = options.fetch(:shutdown_timeout, Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT) end # NOTE: #perform is wrapped by other modules: @@ -119,7 +121,7 @@ def perform(traces) nil end - def stop(*args) + def stop(force_stop = false, timeout = @shutdown_timeout) buffer.close if running? super end diff --git a/lib/datadog/tracing/writer.rb b/lib/datadog/tracing/writer.rb index 34ad931d46d..17771c21255 100644 --- a/lib/datadog/tracing/writer.rb +++ b/lib/datadog/tracing/writer.rb @@ -28,6 +28,8 @@ def initialize(options = {}) Transport::HTTP.default(**transport_options) end + @shutdown_timeout = options.fetch(:shutdown_timeout, Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT) + # handles the thread creation after an eventual fork @mutex_after_fork = Mutex.new @pid = nil @@ -72,7 +74,8 @@ def start_worker transport: @transport, buffer_size: @buff_size, on_trace: @trace_handler, - interval: @flush_interval + interval: @flush_interval, + shutdown_timeout: @shutdown_timeout ) @worker.start diff --git a/sig/datadog/tracing/component.rbs b/sig/datadog/tracing/component.rbs index f1ceb727984..f757266b0fe 100644 --- a/sig/datadog/tracing/component.rbs +++ b/sig/datadog/tracing/component.rbs @@ -7,7 +7,7 @@ module Datadog def build_sampler: (untyped settings) -> untyped def ensure_priority_sampling: (untyped sampler, untyped settings) -> untyped - def build_writer: (untyped settings, untyped agent_settings) -> untyped + def build_writer: (untyped settings, untyped agent_settings, ?Hash[Symbol, untyped] options) -> untyped def subscribe_to_writer_events!: (untyped writer, untyped sampler, untyped test_mode) -> (nil | untyped) diff --git a/spec/datadog/core/configuration/components_spec.rb b/spec/datadog/core/configuration/components_spec.rb index 6522502c1c4..c0c3735a746 100644 --- a/spec/datadog/core/configuration/components_spec.rb +++ b/spec/datadog/core/configuration/components_spec.rb @@ -785,73 +785,104 @@ context 'set to true' do let(:enabled) { true } - let(:sync_writer) { Datadog::Tracing::SyncWriter.new } - before do - expect(Datadog::Tracing::SyncWriter) - .to receive(:new) - .with(agent_settings: agent_settings, **writer_options) - .and_return(writer) - end + context 'and :async' do + context 'is set' do + let(:writer) { Datadog::Tracing::Writer.new } + let(:writer_options) { { transport_options: :bar } } + let(:writer_options_test_mode) { { transport_options: :baz } } - context 'and :trace_flush' do - before do - allow(settings.tracing.test_mode) - .to receive(:trace_flush) - .and_return(trace_flush) + before do + allow(settings.tracing.test_mode) + .to receive(:async) + .and_return(true) + + allow(settings.tracing.test_mode) + .to receive(:writer_options) + .and_return(writer_options_test_mode) + + expect(Datadog::Tracing::SyncWriter) + .not_to receive(:new) + + expect(Datadog::Tracing::Writer) + .to receive(:new) + .with(agent_settings: agent_settings, **writer_options_test_mode) + .and_return(writer) + end + + it_behaves_like 'event publishing writer' end context 'is not set' do - let(:trace_flush) { nil } + let(:sync_writer) { Datadog::Tracing::SyncWriter.new } - it_behaves_like 'new tracer' do - let(:options) do - { - writer: kind_of(Datadog::Tracing::SyncWriter) - } + before do + expect(Datadog::Tracing::SyncWriter) + .to receive(:new) + .with(agent_settings: agent_settings, **writer_options) + .and_return(writer) + end + + context 'and :trace_flush' do + before do + allow(settings.tracing.test_mode) + .to receive(:trace_flush) + .and_return(trace_flush) end - let(:writer) { sync_writer } - it_behaves_like 'event publishing writer' - end - end + context 'is not set' do + let(:trace_flush) { nil } - context 'is set' do - let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) } + it_behaves_like 'new tracer' do + let(:options) do + { + writer: kind_of(Datadog::Tracing::SyncWriter) + } + end + let(:writer) { sync_writer } - it_behaves_like 'new tracer' do - let(:options) do - { - trace_flush: trace_flush, - writer: kind_of(Datadog::Tracing::SyncWriter) - } + it_behaves_like 'event publishing writer' + end end - let(:writer) { sync_writer } - it_behaves_like 'event publishing writer' - end - end - end + context 'is set' do + let(:trace_flush) { instance_double(Datadog::Tracing::Flush::Finished) } - context 'and :writer_options' do - before do - allow(settings.tracing.test_mode) - .to receive(:writer_options) - .and_return(writer_options) - end + it_behaves_like 'new tracer' do + let(:options) do + { + trace_flush: trace_flush, + writer: kind_of(Datadog::Tracing::SyncWriter) + } + end + let(:writer) { sync_writer } - context 'are set' do - let(:writer_options) { { transport_options: :bar } } + it_behaves_like 'event publishing writer' + end + end + end - it_behaves_like 'new tracer' do - let(:options) do - { - writer: writer - } + context 'and :writer_options' do + before do + allow(settings.tracing.test_mode) + .to receive(:writer_options) + .and_return(writer_options) end - let(:writer) { sync_writer } - it_behaves_like 'event publishing writer' + context 'are set' do + let(:writer_options) { { transport_options: :bar } } + + it_behaves_like 'new tracer' do + let(:options) do + { + writer: writer + } + end + let(:writer) { sync_writer } + + it_behaves_like 'event publishing writer' + end + end end end end diff --git a/spec/datadog/core/workers/polling_spec.rb b/spec/datadog/core/workers/polling_spec.rb index 072bb07bcc7..e4059d95eeb 100644 --- a/spec/datadog/core/workers/polling_spec.rb +++ b/spec/datadog/core/workers/polling_spec.rb @@ -47,7 +47,7 @@ shared_context 'graceful stop' do before do allow(worker).to receive(:join) - .with(described_class::SHUTDOWN_TIMEOUT) + .with(described_class::DEFAULT_SHUTDOWN_TIMEOUT) .and_return(true) end end @@ -55,7 +55,7 @@ context 'when the worker has not been started' do before do allow(worker).to receive(:join) - .with(described_class::SHUTDOWN_TIMEOUT) + .with(described_class::DEFAULT_SHUTDOWN_TIMEOUT) .and_return(true) end @@ -113,6 +113,22 @@ end end end + + context 'given shutdown timeout' do + subject(:stop) { worker.stop(false, 1000) } + include_context 'graceful stop' + + before do + expect(worker).to receive(:join) + .with(1000) + .and_return(true) + + worker.perform + try_wait_until { worker.running? && worker.run_loop? } + end + + it { is_expected.to be true } + end end describe '#enabled?' do diff --git a/spec/datadog/tracing/configuration/settings_spec.rb b/spec/datadog/tracing/configuration/settings_spec.rb index a9fa105f5c9..a9c689c6751 100644 --- a/spec/datadog/tracing/configuration/settings_spec.rb +++ b/spec/datadog/tracing/configuration/settings_spec.rb @@ -631,6 +631,21 @@ def propagation_inject_style end end + describe '#async' do + subject(:enabled) { settings.tracing.test_mode.async } + + it { is_expected.to be false } + end + + describe '#async=' do + it 'updates the #async setting' do + expect { settings.tracing.test_mode.async = true } + .to change { settings.tracing.test_mode.async } + .from(false) + .to(true) + end + end + describe '#writer_options' do subject(:writer_options) { settings.tracing.test_mode.writer_options } diff --git a/spec/datadog/tracing/workers/trace_writer_spec.rb b/spec/datadog/tracing/workers/trace_writer_spec.rb index 1f41db4a297..60b33b4b474 100644 --- a/spec/datadog/tracing/workers/trace_writer_spec.rb +++ b/spec/datadog/tracing/workers/trace_writer_spec.rb @@ -397,6 +397,22 @@ end end end + + context 'given shutdown_timeout' do + let(:options) { { shutdown_timeout: 1000 } } + include_context 'shuts down the worker' + + context 'and the worker has been started' do + before do + expect(writer).to receive(:join).with(1000).and_return(true) + + writer.perform + try_wait_until { writer.running? && writer.run_loop? } + end + + it { is_expected.to be true } + end + end end describe '#work_pending?' do diff --git a/spec/datadog/tracing/workers_integration_spec.rb b/spec/datadog/tracing/workers_integration_spec.rb index ab6aa7339b5..e1a1972dea1 100644 --- a/spec/datadog/tracing/workers_integration_spec.rb +++ b/spec/datadog/tracing/workers_integration_spec.rb @@ -236,7 +236,7 @@ def wait_for_flush(num = 1, period = 0.1) expect(trace_task).to have_received(:call).once expect(service_task).to_not have_received(:call) expect(@shutdown_end - @shutdown_beg) - .to be < Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT + .to be < Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT end end @@ -248,7 +248,7 @@ def wait_for_flush(num = 1, period = 0.1) it 'interrupts the worker to speed up shutdown' do expect(@shutdown_end - @shutdown_beg) .to be_within(5).of( - Datadog::Tracing::Workers::AsyncTransport::SHUTDOWN_TIMEOUT + Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT ) end end diff --git a/spec/datadog/tracing/workers_spec.rb b/spec/datadog/tracing/workers_spec.rb index ccb0e61ba9f..45199566f0c 100644 --- a/spec/datadog/tracing/workers_spec.rb +++ b/spec/datadog/tracing/workers_spec.rb @@ -72,4 +72,36 @@ expect(worker.start).to be nil end end + + describe '#stop' do + before { skip if PlatformHelpers.jruby? } # DEV: this test causes jruby-9.2 to fail + + it 'stops underlying thread with default timeout' do + expect_any_instance_of(Thread).to receive(:join).with( + Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT + ).and_call_original + + worker.start + worker.stop + end + + context 'with shutdown timeout configured' do + let(:worker) do + described_class.new( + transport: nil, + buffer_size: 100, + on_trace: task, + interval: 0.5, + shutdown_timeout: 1000 + ) + end + + it 'stops underlying thread with configured timeout' do + expect_any_instance_of(Thread).to receive(:join).with(1000).and_call_original + + worker.start + worker.stop + end + end + end end diff --git a/spec/datadog/tracing/writer_spec.rb b/spec/datadog/tracing/writer_spec.rb index cecf8f3b2ad..448a1b62e2c 100644 --- a/spec/datadog/tracing/writer_spec.rb +++ b/spec/datadog/tracing/writer_spec.rb @@ -7,6 +7,7 @@ require 'datadog/tracing/runtime/metrics' require 'datadog/tracing/trace_segment' require 'datadog/tracing/writer' +require 'datadog/tracing/workers' require 'datadog/tracing/transport/http' require 'datadog/tracing/transport/http/traces' require 'datadog/core/transport/response' @@ -62,6 +63,42 @@ end end + describe '#start_worker' do + let(:worker) { double(:async_transport, start: nil) } + let(:async_transport_params) do + { + transport: transport, + buffer_size: Datadog::Tracing::Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE, + on_trace: anything, + interval: Datadog::Tracing::Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL, + shutdown_timeout: Datadog::Tracing::Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT + } + end + + before do + expect(Datadog::Tracing::Workers::AsyncTransport).to( + receive(:new).with(**expected_async_transport_params).and_return(worker) + ) + end + + context 'without shutdown timeout' do + let(:expected_async_transport_params) { async_transport_params } + + it 'creates worker with default shutdown timeout' do + writer.start + end + end + + context 'with shutdown timeout provided in options' do + let(:options) { { transport: transport, shutdown_timeout: 1000 } } + let(:expected_async_transport_params) { async_transport_params.merge(shutdown_timeout: 1000) } + + it 'creates worker with configured shutdown timeout' do + writer.start + end + end + end + describe '#send_spans' do subject(:send_spans) { writer.send_spans(traces, writer.transport) }