Skip to content

Commit

Permalink
Merge pull request #1091 from fledman/writer_thread_safety
Browse files Browse the repository at this point in the history
writer thread safety
  • Loading branch information
marcotc authored Jul 7, 2020
2 parents f5733c8 + a88bda3 commit 16643b9
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions lib/ddtrace/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,18 @@ def initialize(options = {})
@worker = nil
end

# spawns a worker for spans; they share the same transport which is thread-safe
def start
@pid = Process.pid
@mutex_after_fork.synchronize do
pid = Process.pid
return if @worker && pid == @pid
@pid = pid
start_worker
true
end
end

# spawns a worker for spans; they share the same transport which is thread-safe
def start_worker
@trace_handler = ->(items, transport) { send_spans(items, transport) }
@worker = Datadog::Workers::AsyncTransport.new(
transport: @transport,
Expand All @@ -57,14 +66,19 @@ def start
@worker.start
end

# stops worker for spans.
def stop
return if worker.nil?
@mutex_after_fork.synchronize { stop_worker }
end

def stop_worker
return if @worker.nil?
@worker.stop
@worker = nil
true
end

private :start_worker, :stop_worker

# flush spans to the trace-agent, handles spans only
def send_spans(traces, transport)
return true if traces.empty?
Expand Down Expand Up @@ -106,13 +120,7 @@ def write(trace, services = nil)
#
# This check ensures that if a process doesn't own the current +Writer+, async workers
# will be initialized again (but only once for each process).
pid = Process.pid
if pid != @pid # avoid using Mutex when pids are equal
@mutex_after_fork.synchronize do
# we should start threads because the worker doesn't own this
start if pid != @pid
end
end
start if @worker.nil? || @pid != Process.pid

# TODO: Remove this, and have the tracer pump traces directly to runtime metrics
# instead of working through the trace writer.
Expand All @@ -121,7 +129,13 @@ def write(trace, services = nil)
Datadog.runtime_metrics.associate_with_span(trace.first)
end

@worker.enqueue_trace(trace)
worker_local = @worker

if worker_local
worker_local.enqueue_trace(trace)
else
Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
end
end

# stats returns a dictionary of stats about the writer.
Expand Down

0 comments on commit 16643b9

Please sign in to comment.