diff --git a/lib/datadog/core/remote/worker.rb b/lib/datadog/core/remote/worker.rb new file mode 100644 index 00000000000..6e2d702ef11 --- /dev/null +++ b/lib/datadog/core/remote/worker.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module Datadog + module Core + module Remote + # Worker executes a block every interval on a separate Thread + class Worker + def initialize(interval:, &block) + @mutex = Mutex.new + @thr = nil + + @starting = false + @stopping = false + @started = false + + @interval = interval + raise ArgumentError, 'can not initialize a worker without a block' unless block + + @block = block + end + + def start + Datadog.logger.debug { 'remote worker starting' } + + acquire_lock + + return if @starting || @started + + @starting = true + + @thr = Thread.new { poll(@interval) } + + @started = true + @starting = false + + Datadog.logger.debug { 'remote worker started' } + ensure + release_lock + end + + def stop + Datadog.logger.debug { 'remote worker stopping' } + + acquire_lock + + @stopping = true + + thread = @thr + + if thread + thread.kill + thread.join + end + + @started = false + @stopping = false + @thr = nil + + Datadog.logger.debug { 'remote worker stopped' } + ensure + release_lock + end + + def started? + @started + end + + private + + def acquire_lock + @mutex.lock + end + + def release_lock + @mutex.unlock + end + + def poll(interval) + loop do + break unless @mutex.synchronize { @starting || @started } + + call + + sleep(interval) + end + end + + def call + Datadog.logger.debug { 'remote worker perform' } + + @block.call + end + end + end + end +end diff --git a/sig/datadog/core/remote/worker.rbs b/sig/datadog/core/remote/worker.rbs new file mode 100644 index 00000000000..6a7561812fd --- /dev/null +++ b/sig/datadog/core/remote/worker.rbs @@ -0,0 +1,33 @@ +module Datadog + module Core + module Remote + class Worker + attr_reader starting: bool + attr_reader stopping: bool + attr_reader started: bool + attr_reader thr: Thread? + attr_reader interval: Integer + attr_reader mutex: ::Thread::Mutex + attr_reader block: (^() -> void) + + def initialize: (interval: Integer) { () -> void } -> void + + def start: () -> void + + def stop: () -> void + + def started?: () -> bool + + private + + def acquire_lock: () -> void + + def release_lock: () -> void + + def poll: (Integer interval) -> void + + def call: () -> void + end + end + end +end diff --git a/spec/datadog/core/remote/worker_spec.rb b/spec/datadog/core/remote/worker_spec.rb new file mode 100644 index 00000000000..0e71135ee40 --- /dev/null +++ b/spec/datadog/core/remote/worker_spec.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'datadog/core/remote/worker' + +RSpec.describe Datadog::Core::Remote::Worker do + let(:task) { proc { 1 + 1 } } + subject(:worker) { described_class.new(interval: 1, &task) } + + describe '#initialize' do + it 'raises ArgumentError when no block is provided' do + expect do + described_class.new(interval: 1) + end.to raise_error(ArgumentError) + end + end + + describe '#start' do + after { worker.stop } + + it 'mark worker as started' do + expect(worker).not_to be_started + worker.start + expect(worker).to be_started + end + + it 'acquire and release lock' do + expect(worker).to receive(:acquire_lock).at_least(:once) + expect(worker).to receive(:release_lock).at_least(:once) + worker.start + end + + context 'execute block when started' do + let(:result) { [] } + let(:queue) { Queue.new } + let(:task) do + proc do + value = 1 + result << value + queue << value + end + end + + it 'runs block' do + worker.start + # Wait for the work task to execute once + queue.pop + expect(result).to eq([1]) + end + end + end + + describe '#stop' do + it 'mark worker as stopped' do + expect(worker).not_to be_started + worker.start + expect(worker).to be_started + worker.stop + expect(worker).not_to be_started + end + + it 'acquire and release lock' do + expect(worker).to receive(:acquire_lock) + expect(worker).to receive(:release_lock) + worker.stop + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1e3c4bfcf69..e57d0b9d3f3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -166,7 +166,10 @@ # teardown in those tests. # They currently flood the output, making our test # suite output unreadable. - if example.file_path.start_with?('./spec/datadog/core/workers/', './spec/ddtrace/workers/') + if example.file_path.start_with?( + './spec/datadog/core/workers/', + './spec/ddtrace/workers/' + ) puts # Add newline so we get better output when the progress formatter is being used RSpec.warning("FIXME: #{example.file_path}:#{example.metadata[:line_number]} is leaking threads") next