From d7958fbcbcfc4906ecee5b46eec42f7e069c93ec Mon Sep 17 00:00:00 2001 From: Loic Nageleisen Date: Tue, 14 Mar 2023 13:13:34 +0100 Subject: [PATCH 1/5] Add remote configuration worker --- lib/datadog/core/remote/worker.rb | 79 +++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 lib/datadog/core/remote/worker.rb diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb new file mode 100644 index 00000000000..3c6071b7b92 --- /dev/null +++ b/lib/datadog/core/remote/worker.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +module Datadog + module Core + module Remote + class Worker + def initialize(interval:, &block) + @mutex = Mutex.new + @thr = nil + + @starting = false + @stopping = false + @started = false + + @interval = interval + @block = block + end + + def start + Datadog.logger.debug { 'remote worker starting' } + + @mutex.lock + + return if @starting || @started + + @starting = true + + @thr = Thread.new { poll(@interval) } + + @started = true + @starting = false + + Datadog.logger.debug { 'remote worker started' } + ensure + @mutex.unlock + end + + def stop + Datadog.logger.debug { 'remote worker stopping' } + + @mutex.lock + + @stopping = true + + @thr.kill unless @thr.nil? + + @started = false + @stopping = false + + Datadog.logger.debug { 'remote worker stopped' } + ensure + @mutex.unlock + end + + def started? + @started + end + + private + + def poll(interval) + loop do + break unless @mutex.synchronize { @starting || @started } + + call + + sleep(interval) + end + end + + def call + Datadog.logger.debug { 'remote worker perform' } + + @block.call + end + end + end + end +end From 3c1849ccda4b6babfb81bf2bb761b4f8aa1e0df6 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Tue, 21 Mar 2023 11:33:21 +0100 Subject: [PATCH 2/5] Add RBS for Core::Remote::Worker --- lib/datadog/core/remote/worker.rb | 6 ++++-- sig/datadog/core/remote/worker.rbs | 29 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 sig/datadog/core/remote/worker.rbs diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb index 3c6071b7b92..789b4a928ec 100644 --- a/lib/datadog/core/remote/worker.rb +++ b/lib/datadog/core/remote/worker.rb @@ -39,13 +39,15 @@ def stop Datadog.logger.debug { 'remote worker stopping' } @mutex.lock - @stopping = true - @thr.kill unless @thr.nil? + thread = @thr + + thread.kill if thread @started = false @stopping = false + @thr = nil Datadog.logger.debug { 'remote worker stopped' } ensure diff --git a/sig/datadog/core/remote/worker.rbs b/sig/datadog/core/remote/worker.rbs new file mode 100644 index 00000000000..c37bfb8e345 --- /dev/null +++ b/sig/datadog/core/remote/worker.rbs @@ -0,0 +1,29 @@ +module Datadog + module Core + module Remote + class Worker + attr_reader starting: bool + attr_reader stopping: bool + attr_reader started: bool + attr_reader thr: Thread? + attr_reader interval: Integer + attr_reader mutex: ::Thread::Mutex + attr_reader block: (^() -> untyped) + + def initialize: (interval: Integer) { () -> untyped } -> void + + def start: () -> void + + def stop: () -> void + + def started?: () -> bool + + private + + def poll: (Integer interval) -> void + + def call: () -> void + end + end + end +end From 9eb7f634a0668161747e7540def54a024ffe4982 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Tue, 21 Mar 2023 13:13:39 +0100 Subject: [PATCH 3/5] Add specs for Core::Remote::Worker --- lib/datadog/core/remote/worker.rb | 19 ++++++-- sig/datadog/core/remote/worker.rbs | 4 ++ spec/datadog/core/remote/worker_spec.rb | 64 +++++++++++++++++++++++++ spec/spec_helper.rb | 6 ++- 4 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 spec/datadog/core/remote/worker_spec.rb diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb index 789b4a928ec..55ab6d0bd52 100644 --- a/lib/datadog/core/remote/worker.rb +++ b/lib/datadog/core/remote/worker.rb @@ -13,13 +13,15 @@ def initialize(interval:, &block) @started = false @interval = interval + raise ArgumentError, 'can not initialize a worker without a block' unless block + @block = block end def start Datadog.logger.debug { 'remote worker starting' } - @mutex.lock + acquire_lock return if @starting || @started @@ -32,13 +34,14 @@ def start Datadog.logger.debug { 'remote worker started' } ensure - @mutex.unlock + release_lock end def stop Datadog.logger.debug { 'remote worker stopping' } - @mutex.lock + acquire_lock + @stopping = true thread = @thr @@ -51,7 +54,7 @@ def stop Datadog.logger.debug { 'remote worker stopped' } ensure - @mutex.unlock + release_lock end def started? @@ -60,6 +63,14 @@ def started? private + def acquire_lock + @mutex.lock + end + + def release_lock + @mutex.unlock + end + def poll(interval) loop do break unless @mutex.synchronize { @starting || @started } diff --git a/sig/datadog/core/remote/worker.rbs b/sig/datadog/core/remote/worker.rbs index c37bfb8e345..f9ebde5a56b 100644 --- a/sig/datadog/core/remote/worker.rbs +++ b/sig/datadog/core/remote/worker.rbs @@ -20,6 +20,10 @@ module Datadog private + def acquire_lock: () -> void + + def release_lock: () -> void + def poll: (Integer interval) -> void def call: () -> void diff --git a/spec/datadog/core/remote/worker_spec.rb b/spec/datadog/core/remote/worker_spec.rb new file mode 100644 index 00000000000..341c4121e6e --- /dev/null +++ b/spec/datadog/core/remote/worker_spec.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'datadog/core/remote/worker' + +RSpec.describe Datadog::Core::Remote::Worker do + describe '#initialize' do + it 'raises ArgumentError when no block is provided' do + expect do + described_class.new(interval: 1) + end.to raise_error(ArgumentError) + end + end + + subject(:worker) do + described_class.new(interval: 1) do + 1 + 1 + end + end + + describe '#start' do + it 'mark worker as started' do + expect(worker).not_to be_started + worker.start + expect(worker).to be_started + worker.stop + end + + it 'acquire and release lock' do + expect(worker).to receive(:acquire_lock) + expect(worker).to receive(:release_lock) + worker.start + end + + it 'execute block when started' do + result = [] + queue = Queue.new + queue_worker = described_class.new(interval: 1) do + result << queue.pop + end + queue_worker.start + # Unblock worker thread + queue << 1 + expect(result).to eq([1]) + queue_worker.stop + end + end + + describe '#stop' do + it 'mark worker as stopped' do + expect(worker).not_to be_started + worker.start + expect(worker).to be_started + worker.stop + expect(worker).not_to be_started + end + + it 'acquire and release lock' do + expect(worker).to receive(:acquire_lock) + expect(worker).to receive(:release_lock) + worker.stop + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1e3c4bfcf69..fb618d2bc18 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -166,7 +166,11 @@ # teardown in those tests. # They currently flood the output, making our test # suite output unreadable. - if example.file_path.start_with?('./spec/datadog/core/workers/', './spec/ddtrace/workers/') + if example.file_path.start_with?( + './spec/datadog/core/workers/', + './spec/ddtrace/workers/', + './spec/datadog/core/remote/worker_spec.rb' + ) puts # Add newline so we get better output when the progress formatter is being used RSpec.warning("FIXME: #{example.file_path}:#{example.metadata[:line_number]} is leaking threads") next From 98c66ca6726e9bc9b2f7030d759904e64179d3f8 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Tue, 21 Mar 2023 13:14:03 +0100 Subject: [PATCH 4/5] Add documentation for Worker class --- lib/datadog/core/remote/worker.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb index 55ab6d0bd52..9916ca94e5e 100644 --- a/lib/datadog/core/remote/worker.rb +++ b/lib/datadog/core/remote/worker.rb @@ -3,6 +3,7 @@ module Datadog module Core module Remote + # Worker executes a block every interval on a separate Thread class Worker def initialize(interval:, &block) @mutex = Mutex.new From 748ee58cbb2d79516c540e3b4b5ca67eb72648c6 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Tue, 21 Mar 2023 14:21:13 +0100 Subject: [PATCH 5/5] Apply feedback --- lib/datadog/core/remote/worker.rb | 5 ++- sig/datadog/core/remote/worker.rbs | 4 +-- spec/datadog/core/remote/worker_spec.rb | 42 ++++++++++++++----------- spec/spec_helper.rb | 3 +- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb index 9916ca94e5e..6e2d702ef11 100644 --- a/lib/datadog/core/remote/worker.rb +++ b/lib/datadog/core/remote/worker.rb @@ -47,7 +47,10 @@ def stop thread = @thr - thread.kill if thread + if thread + thread.kill + thread.join + end @started = false @stopping = false diff --git a/sig/datadog/core/remote/worker.rbs b/sig/datadog/core/remote/worker.rbs index f9ebde5a56b..6a7561812fd 100644 --- a/sig/datadog/core/remote/worker.rbs +++ b/sig/datadog/core/remote/worker.rbs @@ -8,9 +8,9 @@ module Datadog attr_reader thr: Thread? attr_reader interval: Integer attr_reader mutex: ::Thread::Mutex - attr_reader block: (^() -> untyped) + attr_reader block: (^() -> void) - def initialize: (interval: Integer) { () -> untyped } -> void + def initialize: (interval: Integer) { () -> void } -> void def start: () -> void diff --git a/spec/datadog/core/remote/worker_spec.rb b/spec/datadog/core/remote/worker_spec.rb index 341c4121e6e..0e71135ee40 100644 --- a/spec/datadog/core/remote/worker_spec.rb +++ b/spec/datadog/core/remote/worker_spec.rb @@ -4,6 +4,9 @@ require 'datadog/core/remote/worker' RSpec.describe Datadog::Core::Remote::Worker do + let(:task) { proc { 1 + 1 } } + subject(:worker) { described_class.new(interval: 1, &task) } + describe '#initialize' do it 'raises ArgumentError when no block is provided' do expect do @@ -12,37 +15,38 @@ end end - subject(:worker) do - described_class.new(interval: 1) do - 1 + 1 - end - end - describe '#start' do + after { worker.stop } + it 'mark worker as started' do expect(worker).not_to be_started worker.start expect(worker).to be_started - worker.stop end it 'acquire and release lock' do - expect(worker).to receive(:acquire_lock) - expect(worker).to receive(:release_lock) + expect(worker).to receive(:acquire_lock).at_least(:once) + expect(worker).to receive(:release_lock).at_least(:once) worker.start end - it 'execute block when started' do - result = [] - queue = Queue.new - queue_worker = described_class.new(interval: 1) do - result << queue.pop + context 'execute block when started' do + let(:result) { [] } + let(:queue) { Queue.new } + let(:task) do + proc do + value = 1 + result << value + queue << value + end + end + + it 'runs block' do + worker.start + # Wait for the work task to execute once + queue.pop + expect(result).to eq([1]) end - queue_worker.start - # Unblock worker thread - queue << 1 - expect(result).to eq([1]) - queue_worker.stop end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index fb618d2bc18..e57d0b9d3f3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -168,8 +168,7 @@ # suite output unreadable. if example.file_path.start_with?( './spec/datadog/core/workers/', - './spec/ddtrace/workers/', - './spec/datadog/core/remote/worker_spec.rb' + './spec/ddtrace/workers/' ) puts # Add newline so we get better output when the progress formatter is being used RSpec.warning("FIXME: #{example.file_path}:#{example.metadata[:line_number]} is leaking threads")