-
Notifications
You must be signed in to change notification settings - Fork 373
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2691 from DataDog/add-remote-config-worker
Add remote configuration worker
- Loading branch information
Showing
4 changed files
with
201 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# frozen_string_literal: true | ||
|
||
module Datadog | ||
module Core | ||
module Remote | ||
# Worker executes a block every interval on a separate Thread | ||
class Worker | ||
def initialize(interval:, &block) | ||
@mutex = Mutex.new | ||
@thr = nil | ||
|
||
@starting = false | ||
@stopping = false | ||
@started = false | ||
|
||
@interval = interval | ||
raise ArgumentError, 'can not initialize a worker without a block' unless block | ||
|
||
@block = block | ||
end | ||
|
||
def start | ||
Datadog.logger.debug { 'remote worker starting' } | ||
|
||
acquire_lock | ||
|
||
return if @starting || @started | ||
|
||
@starting = true | ||
|
||
@thr = Thread.new { poll(@interval) } | ||
|
||
@started = true | ||
@starting = false | ||
|
||
Datadog.logger.debug { 'remote worker started' } | ||
ensure | ||
release_lock | ||
end | ||
|
||
def stop | ||
Datadog.logger.debug { 'remote worker stopping' } | ||
|
||
acquire_lock | ||
|
||
@stopping = true | ||
|
||
thread = @thr | ||
|
||
if thread | ||
thread.kill | ||
thread.join | ||
end | ||
|
||
@started = false | ||
@stopping = false | ||
@thr = nil | ||
|
||
Datadog.logger.debug { 'remote worker stopped' } | ||
ensure | ||
release_lock | ||
end | ||
|
||
def started? | ||
@started | ||
end | ||
|
||
private | ||
|
||
def acquire_lock | ||
@mutex.lock | ||
end | ||
|
||
def release_lock | ||
@mutex.unlock | ||
end | ||
|
||
def poll(interval) | ||
loop do | ||
break unless @mutex.synchronize { @starting || @started } | ||
|
||
call | ||
|
||
sleep(interval) | ||
end | ||
end | ||
|
||
def call | ||
Datadog.logger.debug { 'remote worker perform' } | ||
|
||
@block.call | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
module Datadog | ||
module Core | ||
module Remote | ||
class Worker | ||
attr_reader starting: bool | ||
attr_reader stopping: bool | ||
attr_reader started: bool | ||
attr_reader thr: Thread? | ||
attr_reader interval: Integer | ||
attr_reader mutex: ::Thread::Mutex | ||
attr_reader block: (^() -> void) | ||
|
||
def initialize: (interval: Integer) { () -> void } -> void | ||
|
||
def start: () -> void | ||
|
||
def stop: () -> void | ||
|
||
def started?: () -> bool | ||
|
||
private | ||
|
||
def acquire_lock: () -> void | ||
|
||
def release_lock: () -> void | ||
|
||
def poll: (Integer interval) -> void | ||
|
||
def call: () -> void | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'spec_helper' | ||
require 'datadog/core/remote/worker' | ||
|
||
RSpec.describe Datadog::Core::Remote::Worker do | ||
let(:task) { proc { 1 + 1 } } | ||
subject(:worker) { described_class.new(interval: 1, &task) } | ||
|
||
describe '#initialize' do | ||
it 'raises ArgumentError when no block is provided' do | ||
expect do | ||
described_class.new(interval: 1) | ||
end.to raise_error(ArgumentError) | ||
end | ||
end | ||
|
||
describe '#start' do | ||
after { worker.stop } | ||
|
||
it 'mark worker as started' do | ||
expect(worker).not_to be_started | ||
worker.start | ||
expect(worker).to be_started | ||
end | ||
|
||
it 'acquire and release lock' do | ||
expect(worker).to receive(:acquire_lock).at_least(:once) | ||
expect(worker).to receive(:release_lock).at_least(:once) | ||
worker.start | ||
end | ||
|
||
context 'execute block when started' do | ||
let(:result) { [] } | ||
let(:queue) { Queue.new } | ||
let(:task) do | ||
proc do | ||
value = 1 | ||
result << value | ||
queue << value | ||
end | ||
end | ||
|
||
it 'runs block' do | ||
worker.start | ||
# Wait for the work task to execute once | ||
queue.pop | ||
expect(result).to eq([1]) | ||
end | ||
end | ||
end | ||
|
||
describe '#stop' do | ||
it 'mark worker as stopped' do | ||
expect(worker).not_to be_started | ||
worker.start | ||
expect(worker).to be_started | ||
worker.stop | ||
expect(worker).not_to be_started | ||
end | ||
|
||
it 'acquire and release lock' do | ||
expect(worker).to receive(:acquire_lock) | ||
expect(worker).to receive(:release_lock) | ||
worker.stop | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters