Skip to content

Commit

Permalink
Added: Datadog::Worker and extensions.
Browse files Browse the repository at this point in the history
  • Loading branch information
delner committed Mar 10, 2020
1 parent cfa40e5 commit 5432b63
Show file tree
Hide file tree
Showing 10 changed files with 1,333 additions and 0 deletions.
20 changes: 20 additions & 0 deletions lib/ddtrace/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module Datadog
# Base class for work tasks
class Worker
attr_reader \
:task

def initialize(&block)
@task = block
end

def perform(*args)
task.call(*args) unless task.nil?
end

protected

attr_writer \
:task
end
end
163 changes: 163 additions & 0 deletions lib/ddtrace/workers/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
require 'ddtrace/logger'

module Datadog
module Workers
module Async
# Adds threading behavior to workers
# to run tasks asynchronously.
# rubocop:disable Metrics/ModuleLength
module Thread
FORK_POLICY_STOP = :stop
FORK_POLICY_RESTART = :restart
SHUTDOWN_TIMEOUT = 1

def self.included(base)
base.send(:prepend, PrependedMethods)
end

# Methods that must be prepended
module PrependedMethods
def perform(*args)
start { self.result = super(*args) } if unstarted?
end
end

attr_reader \
:error,
:result

attr_writer \
:fork_policy

def join(timeout = nil)
return true unless running?
!worker.join(timeout).nil?
end

def terminate
return false unless running?
@run_async = false
worker.terminate
true
end

def run_async?
@run_async = false unless instance_variable_defined?(:@run_async)
@run_async == true
end

def unstarted?
worker.nil? || forked?
end

def running?
!worker.nil? && worker.alive?
end

def error?
@error = nil unless instance_variable_defined?(:@error)
!@error.nil?
end

def completed?
!worker.nil? && worker.status == false && !error?
end

def failed?
!worker.nil? && worker.status == false && error?
end

def forked?
!pid.nil? && pid != Process.pid
end

def fork_policy
@fork_policy ||= FORK_POLICY_STOP
end

protected

attr_writer \
:result

def mutex
@mutex ||= Mutex.new
end

def after_fork
# Do nothing by default
end

private

attr_reader \
:pid

def mutex_after_fork
@mutex_after_fork ||= Mutex.new
end

def worker
@worker ||= nil
end

def start(&block)
mutex.synchronize do
return if running?
if forked?
case fork_policy
when FORK_POLICY_STOP
stop_fork
when FORK_POLICY_RESTART
restart_after_fork(&block)
end
elsif !run_async?
start_worker(&block)
end
end
end

def start_worker
@run_async = true
@pid = Process.pid
@error = nil
Logger.log.debug("Starting thread in the process: #{Process.pid}")

@worker = ::Thread.new do
begin
yield
rescue StandardError => e
@error = e
Logger.log.debug("Worker thread error. Cause #{e.message} Location: #{e.backtrace.first}")
end
end
end

def stop_fork
mutex_after_fork.synchronize do
if forked?
# Trigger callback to allow workers to reset themselves accordingly
after_fork

# Reset and turn off
@pid = Process.pid
@run_async = false
end
end
end

def restart_after_fork(&block)
mutex_after_fork.synchronize do
if forked?
# Trigger callback to allow workers to reset themselves accordingly
after_fork

# Start worker
start_worker(&block)
end
end
end
end
end
end
end
105 changes: 105 additions & 0 deletions lib/ddtrace/workers/loop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
module Datadog
module Workers
# Adds looping behavior to workers, with a sleep
# interval between each loop.
module IntervalLoop
BACK_OFF_RATIO = 1.2
BACK_OFF_MAX = 5
DEFAULT_INTERVAL = 1

def self.included(base)
base.send(:prepend, PrependedMethods)
end

# Methods that must be prepended
module PrependedMethods
def perform(*args)
perform_loop { super(*args) }
end
end

def stop_loop
mutex.synchronize do
return false unless run_loop?
@run_loop = false
shutdown.signal
end

true
end

def work_pending?
run_loop?
end

def run_loop?
@run_loop = false unless instance_variable_defined?(:@run_loop)
@run_loop == true
end

def loop_default_interval
@loop_default_interval ||= DEFAULT_INTERVAL
end

def loop_back_off_ratio
@loop_back_off_ratio ||= BACK_OFF_RATIO
end

def loop_back_off_max
@loop_back_off_max ||= BACK_OFF_MAX
end

def loop_wait_time
@loop_wait_time ||= loop_default_interval
end

def loop_back_off?
false
end

def loop_back_off!(amount = nil)
@loop_wait_time = amount || [loop_wait_time * BACK_OFF_RATIO, BACK_OFF_MAX].min
end

protected

attr_writer \
:loop_back_off_max,
:loop_back_off_ratio,
:loop_default_interval

def mutex
@mutex ||= Mutex.new
end

private

def perform_loop
@run_loop = true

loop do
if work_pending?
# Run the task
yield

# Reset the wait interval
loop_back_off!(loop_default_interval)
elsif loop_back_off?
# Back off the wait interval a bit
loop_back_off!
end

# Wait for an interval, unless shutdown has been signaled.
mutex.synchronize do
return unless run_loop? || work_pending?
shutdown.wait(mutex, loop_wait_time) if run_loop?
end
end
end

def shutdown
@shutdown ||= ConditionVariable.new
end
end
end
end
48 changes: 48 additions & 0 deletions lib/ddtrace/workers/polling.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require 'ddtrace/workers/async'
require 'ddtrace/workers/loop'

module Datadog
module Workers
# Adds polling (async looping) behavior to workers
module Polling
SHUTDOWN_TIMEOUT = 1

def self.included(base)
base.send(:include, Workers::IntervalLoop)
base.send(:include, Workers::Async::Thread)
base.send(:prepend, PrependedMethods)
end

# Methods that must be prepended
module PrependedMethods
def perform(*args)
super if enabled?
end
end

def stop(force_stop = false, timeout = SHUTDOWN_TIMEOUT)
if running?
# Attempt graceful stop and wait
stop_loop
graceful = join(timeout)

# If timeout and force stop...
!graceful && force_stop ? terminate : graceful
else
false
end
end

def enabled?
@enabled = true unless instance_variable_defined?(:@enabled)
@enabled
end

# Allow worker to be started
def enabled=(value)
# Coerce to boolean
@enabled = (value == true)
end
end
end
end
39 changes: 39 additions & 0 deletions lib/ddtrace/workers/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module Datadog
module Workers
# Adds queue behavior to workers, with a buffer
# to which items can be queued then dequeued.
module Queue
def self.included(base)
base.send(:prepend, PrependedMethods)
end

# Methods that must be prepended
module PrependedMethods
def perform(*args)
super(*dequeue) if work_pending?
end
end

def buffer
@buffer ||= []
end

def enqueue(*args)
buffer.push(args)
end

def dequeue
buffer.shift
end

def work_pending?
!buffer.empty?
end

protected

attr_writer \
:buffer
end
end
end
Loading

0 comments on commit 5432b63

Please sign in to comment.