Skip to content

Commit

Permalink
Batch span processor (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibawt authored and fbogsany committed Sep 25, 2019
1 parent dde48bb commit 645d99c
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 51 deletions.
2 changes: 1 addition & 1 deletion sdk/lib/opentelemetry/sdk/trace/export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module Export
end
end

require 'opentelemetry/sdk/trace/export/batch_sampled_span_processor'
require 'opentelemetry/sdk/trace/export/batch_span_processor'
require 'opentelemetry/sdk/trace/export/in_memory_span_exporter'
require 'opentelemetry/sdk/trace/export/multi_span_exporter'
require 'opentelemetry/sdk/trace/export/noop_span_exporter'
Expand Down

This file was deleted.

138 changes: 138 additions & 0 deletions sdk/lib/opentelemetry/sdk/trace/export/batch_span_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# frozen_string_literal: true

# Copyright 2019 OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Trace
module Export
# Implementation of the duck type SpanProcessor that batches spans
# exported by the SDK then pushes them to the exporter pipeline.
#
# All spans reported by the SDK implementation are first added to a
# synchronized queue (with a {max_queue_size} maximum size, after the
# size is reached spans are dropped) and exported every
# schedule_delay_millis to the exporter pipeline in batches of
# max_export_batch_size.
#
# If the queue gets half full a preemptive notification is sent to the
# worker thread that exports the spans to wake up and start a new
# export cycle.
#
# max_export_attempts attempts are made to export each batch, while
# export fails with {FAILED_RETRYABLE}, backing off linearly in 100ms
# increments.
class BatchSpanProcessor
SCHEDULE_DELAY_MILLIS = 5
MAX_QUEUE_SIZE = 2048
MAX_EXPORT_BATCH_SIZE = 512
MAX_EXPORT_ATTEMPTS = 5
private_constant(:SCHEDULE_DELAY_MILLIS, :MAX_QUEUE_SIZE, :MAX_EXPORT_BATCH_SIZE, :MAX_EXPORT_ATTEMPTS)

def initialize(exporter:,
schedule_delay_millis: SCHEDULE_DELAY_MILLIS,
max_queue_size: MAX_QUEUE_SIZE,
max_export_batch_size: MAX_EXPORT_BATCH_SIZE,
max_export_attempts: MAX_EXPORT_ATTEMPTS)
raise ArgumentError if max_export_batch_size > max_queue_size

@exporter = exporter
@mutex = Mutex.new
@condition = ConditionVariable.new
@keep_running = true
@delay_seconds = schedule_delay_millis / 1000.0
@max_queue_size = max_queue_size
@batch_size = max_export_batch_size
@export_attempts = max_export_attempts
@spans = []
@thread = Thread.new { work }
end

# does nothing for this processor
def on_start(span)
# noop
end

# adds a span to the batcher, threadsafe may block on lock
def on_finish(span)
return unless span.recording_events?

lock do
n = spans.size + 1 - max_queue_size
spans.shift(n) if n.positive?
spans << span
@condition.signal if spans.size > max_queue_size / 2
end
end

# shuts the consumer thread down and flushes the current accumulated buffer
# will block until the thread is finished
def shutdown
lock do
@keep_running = false
@condition.signal
end

@thread.join
flush
@exporter.shutdown
end

private

attr_reader :spans, :max_queue_size, :batch_size

def work
loop do
batch = lock do
@condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running
@condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running
return unless @keep_running

fetch_batch
end

export_batch(batch)
end
end

def export_batch(batch)
result_code = nil
@export_attempts.times do |attempts|
result_code = @exporter.export(batch)
break unless result_code == FAILED_RETRYABLE

sleep(0.1 * attempts)
end
report_result(result_code, batch)
end

def report_result(result_code, batch)
OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS
end

def flush
snapshot = lock { spans.shift(spans.size) }
until snapshot.empty?
batch = snapshot.shift(@batch_size).map!(&:to_span_data)
result_code = @exporter.export(batch)
report_result(result_code, batch)
end
end

def fetch_batch
spans.shift(@batch_size).map!(&:to_span_data)
end

def lock
@mutex.synchronize do
yield
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Export
#
# Can be used to export to multiple backends using the same
# SpanProcessor like a {SimpleSampledSpanProcessor} or a
# {BatchSampledSpanProcessor}.
# {BatchSpanProcessor}.
class MultiSpanExporter
def initialize(span_exporters)
@span_exporters = span_exporters.clone.freeze
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module Export
# recorded data for sampled spans in their own format.
#
# To export data an exporter MUST be registered to the {Tracer} using
# a {SimpleSampledSpanProcessor} or a {BatchSampledSpanProcessor}.
# a {SimpleSampledSpanProcessor} or a {BatchSpanProcessor}.
class NoopSpanExporter
# Called to export sampled {Span}s.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def on_start(span)
# not throw or block the execution thread.
#
# @param [Span] span the {Span} that just ended.
def on_end(span)
def on_finish(span)
return unless span.context.trace_flags.sampled?

@span_exporter.export([span.to_span_proto])
rescue => e # rubocop:disable Style/RescueStandardError
logger.error("unexpected error in span.on_end - #{e}")
logger.error("unexpected error in span.on_finish - #{e}")
end

# Called when {Tracer#shutdown} is called.
Expand Down
4 changes: 2 additions & 2 deletions sdk/lib/opentelemetry/sdk/trace/multi_span_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def on_start(span)
# not throw or block the execution thread.
#
# @param [Span] span the {Span} that just ended.
def on_end(span)
@span_processors.each { |processor| processor.on_end(span) }
def on_finish(span)
@span_processors.each { |processor| processor.on_finish(span) }
end

# Called when {Tracer#shutdown} is called.
Expand Down
2 changes: 1 addition & 1 deletion sdk/lib/opentelemetry/sdk/trace/noop_span_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def on_start(span); end
# not throw or block the execution thread.
#
# @param [Span] span the {Span} that just ended.
def on_end(span); end
def on_finish(span); end

# Called when {Tracer#shutdown} is called.
def shutdown; end
Expand Down
4 changes: 2 additions & 2 deletions sdk/lib/opentelemetry/sdk/trace/span.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def name=(new_name)
#
# (*) not actually non-blocking. In particular, it synchronizes on an
# internal mutex, which will typically be uncontended, and
# {BatchSpanProcessor} will also synchronize on a mutex, if that
# {Export::BatchSpanProcessor} will also synchronize on a mutex, if that
# processor is used.
#
# @param [Time] end_timestamp optional end timestamp for the span.
Expand All @@ -191,7 +191,7 @@ def finish(end_timestamp: nil)
@events.freeze
@ended = true
end
@span_processor.on_end(self)
@span_processor.on_finish(self)
self
end

Expand Down
2 changes: 2 additions & 0 deletions sdk/test/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ inherit_from: ../.rubocop.yml

Metrics/BlockLength:
Enabled: false
Metrics/LineLength:
Enabled: false

This file was deleted.

Loading

0 comments on commit 645d99c

Please sign in to comment.