diff --git a/sdk/lib/opentelemetry/sdk/trace/export.rb b/sdk/lib/opentelemetry/sdk/trace/export.rb index 83ba06ef8..31575db65 100644 --- a/sdk/lib/opentelemetry/sdk/trace/export.rb +++ b/sdk/lib/opentelemetry/sdk/trace/export.rb @@ -27,7 +27,7 @@ module Export end end -require 'opentelemetry/sdk/trace/export/batch_sampled_span_processor' +require 'opentelemetry/sdk/trace/export/batch_span_processor' require 'opentelemetry/sdk/trace/export/in_memory_span_exporter' require 'opentelemetry/sdk/trace/export/multi_span_exporter' require 'opentelemetry/sdk/trace/export/noop_span_exporter' diff --git a/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb deleted file mode 100644 index 8eacb32b6..000000000 --- a/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb +++ /dev/null @@ -1,29 +0,0 @@ -# frozen_string_literal: true - -# Copyright 2019 OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module SDK - module Trace - module Export - # Implementation of the duck type SpanProcessor that batches spans - # exported by the SDK then pushes them to the exporter pipeline. - # - # All spans reported by the SDK implementation are first added to a - # synchronized queue (with a {max_queue_size} maximum size, after the - # size is reached spans are dropped) and exported every - # {schedule_delay_millis} to the exporter pipeline in batches of - # {max_export_batch_size}. - # - # If the queue gets half full a preemptive notification is sent to the - # worker thread that exports the spans to wake up and start a new - # export cycle. - class BatchSampledSpanProcessor - # TODO - end - end - end - end -end diff --git a/sdk/lib/opentelemetry/sdk/trace/export/batch_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/export/batch_span_processor.rb new file mode 100644 index 000000000..311724936 --- /dev/null +++ b/sdk/lib/opentelemetry/sdk/trace/export/batch_span_processor.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +# Copyright 2019 OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Trace + module Export + # Implementation of the duck type SpanProcessor that batches spans + # exported by the SDK then pushes them to the exporter pipeline. + # + # All spans reported by the SDK implementation are first added to a + # synchronized queue (with a {max_queue_size} maximum size, after the + # size is reached spans are dropped) and exported every + # schedule_delay_millis to the exporter pipeline in batches of + # max_export_batch_size. + # + # If the queue gets half full a preemptive notification is sent to the + # worker thread that exports the spans to wake up and start a new + # export cycle. + # + # max_export_attempts attempts are made to export each batch, while + # export fails with {FAILED_RETRYABLE}, backing off linearly in 100ms + # increments. + class BatchSpanProcessor + SCHEDULE_DELAY_MILLIS = 5 + MAX_QUEUE_SIZE = 2048 + MAX_EXPORT_BATCH_SIZE = 512 + MAX_EXPORT_ATTEMPTS = 5 + private_constant(:SCHEDULE_DELAY_MILLIS, :MAX_QUEUE_SIZE, :MAX_EXPORT_BATCH_SIZE, :MAX_EXPORT_ATTEMPTS) + + def initialize(exporter:, + schedule_delay_millis: SCHEDULE_DELAY_MILLIS, + max_queue_size: MAX_QUEUE_SIZE, + max_export_batch_size: MAX_EXPORT_BATCH_SIZE, + max_export_attempts: MAX_EXPORT_ATTEMPTS) + raise ArgumentError if max_export_batch_size > max_queue_size + + @exporter = exporter + @mutex = Mutex.new + @condition = ConditionVariable.new + @keep_running = true + @delay_seconds = schedule_delay_millis / 1000.0 + @max_queue_size = max_queue_size + @batch_size = max_export_batch_size + @export_attempts = max_export_attempts + @spans = [] + @thread = Thread.new { work } + end + + # does nothing for this processor + def on_start(span) + # noop + end + + # adds a span to the batcher, threadsafe may block on lock + def on_finish(span) + return unless span.recording_events? + + lock do + n = spans.size + 1 - max_queue_size + spans.shift(n) if n.positive? + spans << span + @condition.signal if spans.size > max_queue_size / 2 + end + end + + # shuts the consumer thread down and flushes the current accumulated buffer + # will block until the thread is finished + def shutdown + lock do + @keep_running = false + @condition.signal + end + + @thread.join + flush + @exporter.shutdown + end + + private + + attr_reader :spans, :max_queue_size, :batch_size + + def work + loop do + batch = lock do + @condition.wait(@mutex, @delay_seconds) if spans.size < batch_size && @keep_running + @condition.wait(@mutex, @delay_seconds) while spans.empty? && @keep_running + return unless @keep_running + + fetch_batch + end + + export_batch(batch) + end + end + + def export_batch(batch) + result_code = nil + @export_attempts.times do |attempts| + result_code = @exporter.export(batch) + break unless result_code == FAILED_RETRYABLE + + sleep(0.1 * attempts) + end + report_result(result_code, batch) + end + + def report_result(result_code, batch) + OpenTelemetry.logger.error("Unable to export #{batch.size} spans") unless result_code == SUCCESS + end + + def flush + snapshot = lock { spans.shift(spans.size) } + until snapshot.empty? + batch = snapshot.shift(@batch_size).map!(&:to_span_data) + result_code = @exporter.export(batch) + report_result(result_code, batch) + end + end + + def fetch_batch + spans.shift(@batch_size).map!(&:to_span_data) + end + + def lock + @mutex.synchronize do + yield + end + end + end + end + end + end +end diff --git a/sdk/lib/opentelemetry/sdk/trace/export/multi_span_exporter.rb b/sdk/lib/opentelemetry/sdk/trace/export/multi_span_exporter.rb index 65a5c1b42..a380ce50c 100644 --- a/sdk/lib/opentelemetry/sdk/trace/export/multi_span_exporter.rb +++ b/sdk/lib/opentelemetry/sdk/trace/export/multi_span_exporter.rb @@ -13,7 +13,7 @@ module Export # # Can be used to export to multiple backends using the same # SpanProcessor like a {SimpleSampledSpanProcessor} or a - # {BatchSampledSpanProcessor}. + # {BatchSpanProcessor}. class MultiSpanExporter def initialize(span_exporters) @span_exporters = span_exporters.clone.freeze diff --git a/sdk/lib/opentelemetry/sdk/trace/export/noop_span_exporter.rb b/sdk/lib/opentelemetry/sdk/trace/export/noop_span_exporter.rb index eeb9d692a..f1e696196 100644 --- a/sdk/lib/opentelemetry/sdk/trace/export/noop_span_exporter.rb +++ b/sdk/lib/opentelemetry/sdk/trace/export/noop_span_exporter.rb @@ -13,7 +13,7 @@ module Export # recorded data for sampled spans in their own format. # # To export data an exporter MUST be registered to the {Tracer} using - # a {SimpleSampledSpanProcessor} or a {BatchSampledSpanProcessor}. + # a {SimpleSampledSpanProcessor} or a {BatchSpanProcessor}. class NoopSpanExporter # Called to export sampled {Span}s. # diff --git a/sdk/lib/opentelemetry/sdk/trace/export/simple_sampled_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/export/simple_sampled_span_processor.rb index 17f93eee7..9e62ab26e 100644 --- a/sdk/lib/opentelemetry/sdk/trace/export/simple_sampled_span_processor.rb +++ b/sdk/lib/opentelemetry/sdk/trace/export/simple_sampled_span_processor.rb @@ -46,12 +46,12 @@ def on_start(span) # not throw or block the execution thread. # # @param [Span] span the {Span} that just ended. - def on_end(span) + def on_finish(span) return unless span.context.trace_flags.sampled? @span_exporter.export([span.to_span_proto]) rescue => e # rubocop:disable Style/RescueStandardError - logger.error("unexpected error in span.on_end - #{e}") + logger.error("unexpected error in span.on_finish - #{e}") end # Called when {Tracer#shutdown} is called. diff --git a/sdk/lib/opentelemetry/sdk/trace/multi_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/multi_span_processor.rb index 5cae6f54a..f0f9bc5bc 100644 --- a/sdk/lib/opentelemetry/sdk/trace/multi_span_processor.rb +++ b/sdk/lib/opentelemetry/sdk/trace/multi_span_processor.rb @@ -37,8 +37,8 @@ def on_start(span) # not throw or block the execution thread. # # @param [Span] span the {Span} that just ended. - def on_end(span) - @span_processors.each { |processor| processor.on_end(span) } + def on_finish(span) + @span_processors.each { |processor| processor.on_finish(span) } end # Called when {Tracer#shutdown} is called. diff --git a/sdk/lib/opentelemetry/sdk/trace/noop_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/noop_span_processor.rb index 6f7818abf..0addc585b 100644 --- a/sdk/lib/opentelemetry/sdk/trace/noop_span_processor.rb +++ b/sdk/lib/opentelemetry/sdk/trace/noop_span_processor.rb @@ -29,7 +29,7 @@ def on_start(span); end # not throw or block the execution thread. # # @param [Span] span the {Span} that just ended. - def on_end(span); end + def on_finish(span); end # Called when {Tracer#shutdown} is called. def shutdown; end diff --git a/sdk/lib/opentelemetry/sdk/trace/span.rb b/sdk/lib/opentelemetry/sdk/trace/span.rb index 3fc0f575c..69ddcf9f1 100644 --- a/sdk/lib/opentelemetry/sdk/trace/span.rb +++ b/sdk/lib/opentelemetry/sdk/trace/span.rb @@ -174,7 +174,7 @@ def name=(new_name) # # (*) not actually non-blocking. In particular, it synchronizes on an # internal mutex, which will typically be uncontended, and - # {BatchSpanProcessor} will also synchronize on a mutex, if that + # {Export::BatchSpanProcessor} will also synchronize on a mutex, if that # processor is used. # # @param [Time] end_timestamp optional end timestamp for the span. @@ -191,7 +191,7 @@ def finish(end_timestamp: nil) @events.freeze @ended = true end - @span_processor.on_end(self) + @span_processor.on_finish(self) self end diff --git a/sdk/test/.rubocop.yml b/sdk/test/.rubocop.yml index dd9425858..4c8c0d91e 100644 --- a/sdk/test/.rubocop.yml +++ b/sdk/test/.rubocop.yml @@ -2,3 +2,5 @@ inherit_from: ../.rubocop.yml Metrics/BlockLength: Enabled: false +Metrics/LineLength: + Enabled: false diff --git a/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb b/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb deleted file mode 100644 index 229f30b62..000000000 --- a/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb +++ /dev/null @@ -1,10 +0,0 @@ -# frozen_string_literal: true - -# Copyright 2019 OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -require 'test_helper' - -describe OpenTelemetry::SDK::Trace::Export::BatchSampledSpanProcessor do -end diff --git a/sdk/test/opentelemetry/sdk/trace/export/batch_span_processor_test.rb b/sdk/test/opentelemetry/sdk/trace/export/batch_span_processor_test.rb new file mode 100644 index 000000000..5a45a198a --- /dev/null +++ b/sdk/test/opentelemetry/sdk/trace/export/batch_span_processor_test.rb @@ -0,0 +1,181 @@ +# frozen_string_literal: true + +# Copyright 2019 OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +describe OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor do + BatchSpanProcessor = OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor + SUCCESS = OpenTelemetry::SDK::Trace::Export::SUCCESS + FAILED_RETRYABLE = OpenTelemetry::SDK::Trace::Export::FAILED_RETRYABLE + FAILED_NOT_RETRYABLE = OpenTelemetry::SDK::Trace::Export::FAILED_NOT_RETRYABLE + + class TestExporter + def initialize(status_codes: nil) + @status_codes = status_codes || [] + @batches = [] + @failed_batches = [] + end + + attr_reader :batches + attr_reader :failed_batches + + def export(batch) + # If status codes is empty, its a success for less verbose testing + s = @status_codes.shift + if s.nil? || s == SUCCESS + @batches << batch + SUCCESS + else + @failed_batches << batch + s + end + end + + def shutdown; end + end + + class TestSpan + def initialize(id = nil, recording_events = true) + @id = id + @recording_events = recording_events + end + + attr_reader :id + + def recording_events? + @recording_events + end + + def to_span_data + self + end + end + + describe 'initialization' do + it 'if max batch size is gt max queue size raise' do + assert_raises ArgumentError do + BatchSpanProcessor.new(exporter: TestExporter.new, max_queue_size: 6, max_export_batch_size: 999) + end + end + end + + describe 'lifecycle' do + it 'should stop and start correctly' do + bsp = BatchSpanProcessor.new(exporter: TestExporter.new) + bsp.shutdown + end + + it 'should flush everything on shutdown' do + te = TestExporter.new + bsp = BatchSpanProcessor.new(exporter: te) + ts = TestSpan.new + bsp.on_finish(ts) + + bsp.shutdown + + te.batches.must_equal [[ts]] + end + end + + describe 'batching' do + it 'should batch up to but not over the max_batch' do + te = TestExporter.new + + bsp = BatchSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3) + + tss = [TestSpan.new, TestSpan.new, TestSpan.new, TestSpan.new] + tss.each { |ts| bsp.on_finish(ts) } + bsp.shutdown + + te.batches[0].size.must_equal(3) + te.batches[1].size.must_equal(1) + end + + it 'should batch only recording_events samples' do + te = TestExporter.new + + bsp = BatchSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3) + + tss = [TestSpan.new, TestSpan.new(nil, false)] + tss.each { |ts| bsp.on_finish(ts) } + bsp.shutdown + + te.batches[0].size.must_equal(1) + end + end + + describe 'export retry' do + it 'should retry on FAILED_RETRYABLE exports' do + te = TestExporter.new(status_codes: [FAILED_RETRYABLE, SUCCESS]) + + bsp = BatchSpanProcessor.new(schedule_delay_millis: 999, + exporter: te, + max_queue_size: 6, + max_export_batch_size: 3) + + tss = [TestSpan.new, TestSpan.new, TestSpan.new, TestSpan.new] + tss.each { |ts| bsp.on_finish(ts) } + + # Ensure that our work thread has time to loop + sleep(1) + bsp.shutdown + + te.batches.size.must_equal(2) + te.batches[0].size.must_equal(3) + te.batches[1].size.must_equal(1) + + te.failed_batches.size.must_equal(1) + te.failed_batches[0].size.must_equal(3) + end + + it 'should not retry on FAILED_NOT_RETRYABLE exports' do + te = TestExporter.new(status_codes: [FAILED_NOT_RETRYABLE, SUCCESS]) + + bsp = BatchSpanProcessor.new(schedule_delay_millis: 999, + exporter: te, + max_queue_size: 6, + max_export_batch_size: 3) + + tss = [TestSpan.new, TestSpan.new, TestSpan.new, TestSpan.new] + tss.each { |ts| bsp.on_finish(ts) } + + # Ensure that our work thread has time to loop + sleep(1) + bsp.shutdown + + te.batches.size.must_equal(1) + te.batches[0].size.must_equal(1) + + te.failed_batches.size.must_equal(1) + te.failed_batches[0].size.must_equal(3) + end + end + + describe 'stress test' do + it 'shouldnt blow up with a lot of things' do + te = TestExporter.new + + bsp = BatchSpanProcessor.new(exporter: te) + producers = 10.times.map do |i| + Thread.new do + x = i * 10 + 10.times do |j| + bsp.on_finish(TestSpan.new(x + j)) + end + sleep(rand(0.01)) + end + end + producers.each(&:join) + bsp.shutdown + + out = te.batches.flatten.map(&:id).sort + + expected = 100.times.map { |i| i } + + out.must_equal(expected) + end + end +end diff --git a/sdk/test/opentelemetry/sdk/trace/span_test.rb b/sdk/test/opentelemetry/sdk/trace/span_test.rb index 9e8a241c0..b58d39462 100644 --- a/sdk/test/opentelemetry/sdk/trace/span_test.rb +++ b/sdk/test/opentelemetry/sdk/trace/span_test.rb @@ -167,11 +167,11 @@ span.to_span_data.end_timestamp.wont_be_nil end - it 'calls the span processor #on_end callback' do + it 'calls the span processor #on_finish callback' do mock_span_processor.expect(:on_start, nil) { |_| true } span = Span.new(context, 'name', SpanKind::INTERNAL, nil, trace_config, mock_span_processor, nil, nil, Time.now) - mock_span_processor.expect(:on_end, nil, [span]) + mock_span_processor.expect(:on_finish, nil, [span]) span.finish mock_span_processor.verify end