-
Notifications
You must be signed in to change notification settings - Fork 375
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added: Datadog::Worker and extensions.
- Loading branch information
Showing
10 changed files
with
1,333 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
module Datadog | ||
# Base class for work tasks | ||
class Worker | ||
attr_reader \ | ||
:task | ||
|
||
def initialize(&block) | ||
@task = block | ||
end | ||
|
||
def perform(*args) | ||
task.call(*args) unless task.nil? | ||
end | ||
|
||
protected | ||
|
||
attr_writer \ | ||
:task | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
require 'ddtrace/logger' | ||
|
||
module Datadog | ||
module Workers | ||
module Async | ||
# Adds threading behavior to workers | ||
# to run tasks asynchronously. | ||
# rubocop:disable Metrics/ModuleLength | ||
module Thread | ||
FORK_POLICY_STOP = :stop | ||
FORK_POLICY_RESTART = :restart | ||
SHUTDOWN_TIMEOUT = 1 | ||
|
||
def self.included(base) | ||
base.send(:prepend, PrependedMethods) | ||
end | ||
|
||
# Methods that must be prepended | ||
module PrependedMethods | ||
def perform(*args) | ||
start { self.result = super(*args) } if unstarted? | ||
end | ||
end | ||
|
||
attr_reader \ | ||
:error, | ||
:result | ||
|
||
attr_writer \ | ||
:fork_policy | ||
|
||
def join(timeout = nil) | ||
return true unless running? | ||
!worker.join(timeout).nil? | ||
end | ||
|
||
def terminate | ||
return false unless running? | ||
@run_async = false | ||
worker.terminate | ||
true | ||
end | ||
|
||
def run_async? | ||
@run_async = false unless instance_variable_defined?(:@run_async) | ||
@run_async == true | ||
end | ||
|
||
def unstarted? | ||
worker.nil? || forked? | ||
end | ||
|
||
def running? | ||
!worker.nil? && worker.alive? | ||
end | ||
|
||
def error? | ||
@error = nil unless instance_variable_defined?(:@error) | ||
!@error.nil? | ||
end | ||
|
||
def completed? | ||
!worker.nil? && worker.status == false && !error? | ||
end | ||
|
||
def failed? | ||
!worker.nil? && worker.status == false && error? | ||
end | ||
|
||
def forked? | ||
!pid.nil? && pid != Process.pid | ||
end | ||
|
||
def fork_policy | ||
@fork_policy ||= FORK_POLICY_STOP | ||
end | ||
|
||
protected | ||
|
||
attr_writer \ | ||
:result | ||
|
||
def mutex | ||
@mutex ||= Mutex.new | ||
end | ||
|
||
def after_fork | ||
# Do nothing by default | ||
end | ||
|
||
private | ||
|
||
attr_reader \ | ||
:pid | ||
|
||
def mutex_after_fork | ||
@mutex_after_fork ||= Mutex.new | ||
end | ||
|
||
def worker | ||
@worker ||= nil | ||
end | ||
|
||
def start(&block) | ||
mutex.synchronize do | ||
return if running? | ||
if forked? | ||
case fork_policy | ||
when FORK_POLICY_STOP | ||
stop_fork | ||
when FORK_POLICY_RESTART | ||
restart_after_fork(&block) | ||
end | ||
elsif !run_async? | ||
start_worker(&block) | ||
end | ||
end | ||
end | ||
|
||
def start_worker | ||
@run_async = true | ||
@pid = Process.pid | ||
@error = nil | ||
Logger.log.debug("Starting thread in the process: #{Process.pid}") | ||
|
||
@worker = ::Thread.new do | ||
begin | ||
yield | ||
rescue StandardError => e | ||
@error = e | ||
Logger.log.debug("Worker thread error. Cause #{e.message} Location: #{e.backtrace.first}") | ||
end | ||
end | ||
end | ||
|
||
def stop_fork | ||
mutex_after_fork.synchronize do | ||
if forked? | ||
# Trigger callback to allow workers to reset themselves accordingly | ||
after_fork | ||
|
||
# Reset and turn off | ||
@pid = Process.pid | ||
@run_async = false | ||
end | ||
end | ||
end | ||
|
||
def restart_after_fork(&block) | ||
mutex_after_fork.synchronize do | ||
if forked? | ||
# Trigger callback to allow workers to reset themselves accordingly | ||
after_fork | ||
|
||
# Start worker | ||
start_worker(&block) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
module Datadog | ||
module Workers | ||
# Adds looping behavior to workers, with a sleep | ||
# interval between each loop. | ||
module IntervalLoop | ||
BACK_OFF_RATIO = 1.2 | ||
BACK_OFF_MAX = 5 | ||
DEFAULT_INTERVAL = 1 | ||
|
||
def self.included(base) | ||
base.send(:prepend, PrependedMethods) | ||
end | ||
|
||
# Methods that must be prepended | ||
module PrependedMethods | ||
def perform(*args) | ||
perform_loop { super(*args) } | ||
end | ||
end | ||
|
||
def stop_loop | ||
mutex.synchronize do | ||
return false unless run_loop? | ||
@run_loop = false | ||
shutdown.signal | ||
end | ||
|
||
true | ||
end | ||
|
||
def work_pending? | ||
run_loop? | ||
end | ||
|
||
def run_loop? | ||
@run_loop = false unless instance_variable_defined?(:@run_loop) | ||
@run_loop == true | ||
end | ||
|
||
def loop_default_interval | ||
@loop_default_interval ||= DEFAULT_INTERVAL | ||
end | ||
|
||
def loop_back_off_ratio | ||
@loop_back_off_ratio ||= BACK_OFF_RATIO | ||
end | ||
|
||
def loop_back_off_max | ||
@loop_back_off_max ||= BACK_OFF_MAX | ||
end | ||
|
||
def loop_wait_time | ||
@loop_wait_time ||= loop_default_interval | ||
end | ||
|
||
def loop_back_off? | ||
false | ||
end | ||
|
||
def loop_back_off!(amount = nil) | ||
@loop_wait_time = amount || [loop_wait_time * BACK_OFF_RATIO, BACK_OFF_MAX].min | ||
end | ||
|
||
protected | ||
|
||
attr_writer \ | ||
:loop_back_off_max, | ||
:loop_back_off_ratio, | ||
:loop_default_interval | ||
|
||
def mutex | ||
@mutex ||= Mutex.new | ||
end | ||
|
||
private | ||
|
||
def perform_loop | ||
@run_loop = true | ||
|
||
loop do | ||
if work_pending? | ||
# Run the task | ||
yield | ||
|
||
# Reset the wait interval | ||
loop_back_off!(loop_default_interval) | ||
elsif loop_back_off? | ||
# Back off the wait interval a bit | ||
loop_back_off! | ||
end | ||
|
||
# Wait for an interval, unless shutdown has been signaled. | ||
mutex.synchronize do | ||
return unless run_loop? || work_pending? | ||
shutdown.wait(mutex, loop_wait_time) if run_loop? | ||
end | ||
end | ||
end | ||
|
||
def shutdown | ||
@shutdown ||= ConditionVariable.new | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
require 'ddtrace/workers/async' | ||
require 'ddtrace/workers/loop' | ||
|
||
module Datadog | ||
module Workers | ||
# Adds polling (async looping) behavior to workers | ||
module Polling | ||
SHUTDOWN_TIMEOUT = 1 | ||
|
||
def self.included(base) | ||
base.send(:include, Workers::IntervalLoop) | ||
base.send(:include, Workers::Async::Thread) | ||
base.send(:prepend, PrependedMethods) | ||
end | ||
|
||
# Methods that must be prepended | ||
module PrependedMethods | ||
def perform(*args) | ||
super if enabled? | ||
end | ||
end | ||
|
||
def stop(force_stop = false, timeout = SHUTDOWN_TIMEOUT) | ||
if running? | ||
# Attempt graceful stop and wait | ||
stop_loop | ||
graceful = join(timeout) | ||
|
||
# If timeout and force stop... | ||
!graceful && force_stop ? terminate : graceful | ||
else | ||
false | ||
end | ||
end | ||
|
||
def enabled? | ||
@enabled = true unless instance_variable_defined?(:@enabled) | ||
@enabled | ||
end | ||
|
||
# Allow worker to be started | ||
def enabled=(value) | ||
# Coerce to boolean | ||
@enabled = (value == true) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
module Datadog | ||
module Workers | ||
# Adds queue behavior to workers, with a buffer | ||
# to which items can be queued then dequeued. | ||
module Queue | ||
def self.included(base) | ||
base.send(:prepend, PrependedMethods) | ||
end | ||
|
||
# Methods that must be prepended | ||
module PrependedMethods | ||
def perform(*args) | ||
super(*dequeue) if work_pending? | ||
end | ||
end | ||
|
||
def buffer | ||
@buffer ||= [] | ||
end | ||
|
||
def enqueue(*args) | ||
buffer.push(args) | ||
end | ||
|
||
def dequeue | ||
buffer.shift | ||
end | ||
|
||
def work_pending? | ||
!buffer.empty? | ||
end | ||
|
||
protected | ||
|
||
attr_writer \ | ||
:buffer | ||
end | ||
end | ||
end |
Oops, something went wrong.