diff --git a/sdk/.rubocop.yml b/sdk/.rubocop.yml index 2764094cd..377119fd5 100644 --- a/sdk/.rubocop.yml +++ b/sdk/.rubocop.yml @@ -1,6 +1,8 @@ AllCops: TargetRubyVersion: '2.4.0' +Metrics/BlockLength: + Enabled: false Metrics/LineLength: Enabled: false Metrics/ParameterLists: diff --git a/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb b/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb index 09da64360..11520eaa2 100644 --- a/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb +++ b/sdk/lib/opentelemetry/sdk/trace/export/batch_sampled_span_processor.rb @@ -3,7 +3,6 @@ # Copyright 2019 OpenTelemetry Authors # # SPDX-License-Identifier: Apache-2.0 -require 'thread' module OpenTelemetry module SDK @@ -45,10 +44,12 @@ def on_start(span) # adds a span to the batcher, threadsafe may block on lock def on_end(span) + return unless span.recorded? + lock do spans.shift if spans.size >= max_queue_size spans << span - @condition.signal if spans.size > max_queue_size/2 + @condition.signal if spans.size > max_queue_size / 2 end end @@ -72,13 +73,10 @@ def work keep_running = nil batch = nil lock do - if spans.size < max_queue_size - loop do - break if !spans.empty? || !@keep_running - @condition.wait(@mutex, @delay) - end + if spans.size < max_queue_size + @condition.wait(@mutex, @delay) while spans.empty? && @keep_running + keep_running = @keep_running end - keep_running = @keep_running batch = fetch_batch end # this is done outside the lock to unblock the producers @@ -89,15 +87,14 @@ def work end def flush - until spans.empty? do - @exporter.export(fetch_batch) - end + @exporter.export(fetch_batch) until spans.empty? end def fetch_batch batch = [] loop do break if batch.size >= @batch_size || spans.empty? + batch << spans.shift.to_span_proto end batch diff --git a/sdk/opentelemetry-sdk.gemspec b/sdk/opentelemetry-sdk.gemspec index f9c228f5b..acd52b564 100644 --- a/sdk/opentelemetry-sdk.gemspec +++ b/sdk/opentelemetry-sdk.gemspec @@ -32,7 +32,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'minitest', '~> 5.0' spec.add_development_dependency 'rake', '~> 12.0' spec.add_development_dependency 'rubocop', '~> 0.73.0' + spec.add_development_dependency 'simplecov', '~> 0.17' spec.add_development_dependency 'yard', '~> 0.9' spec.add_development_dependency 'yard-doctest', '~> 0.1.6' - spec.add_development_dependency 'simplecov', '~> 0.17' end diff --git a/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb b/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb index 8bc3c0ff0..b10af3ae7 100644 --- a/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb +++ b/sdk/test/opentelemetry/sdk/trace/export/batch_sampled_span_processor_test.rb @@ -19,6 +19,23 @@ def batches end end + class TestSpan + def initialize(id = nil, recorded = true) + @id = id + @recorded = recorded + end + + attr_reader :id + + def recorded? + @recorded + end + + def to_span_proto + self + end + end + describe 'lifecycle' do it 'should stop and start correctly' do bsp = BatchSampledSpanProcessor.new(exporter: TestExporter.new) @@ -28,26 +45,43 @@ def batches it 'should flush everything on shutdown' do te = TestExporter.new bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 3) - bsp.on_end('foo') + ts = TestSpan.new + bsp.on_end(ts) bsp.shutdown - te.batches.must_equal [['foo']] + te.batches.must_equal [[ts]] end end describe 'batching' do - it 'should batch up to the max_batch' do + it 'should batch up to but not over the max_batch' do te = TestExporter.new bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3) - bsp.on_end('1') - bsp.on_end('2') - bsp.on_end('3') + tss = [[TestSpan.new, TestSpan.new, TestSpan.new, TestSpan.new]] + bsp.on_end(tss[0][0]) + bsp.on_end(tss[0][1]) + bsp.on_end(tss[0][2]) + bsp.on_end(tss[0][3]) bsp.shutdown te.batches[0].size.must_equal(3) + te.batches[1].size.must_equal(1) + end + + it 'should batch only recorded samples' do + te = TestExporter.new + + bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3) + + tss = [[TestSpan.new, TestSpan.new(nil, false)]] + bsp.on_end(tss[0][0]) + bsp.on_end(tss[0][1]) + bsp.shutdown + + te.batches[0].size.must_equal(1) end end @@ -60,15 +94,15 @@ def batches Thread.new do x = i * 10 0.upto(9) do |j| - bsp.on_end(x + j) + bsp.on_end(TestSpan.new(x + j)) end - sleep(rand 0.01) + sleep(rand(0.01)) end end producers.each(&:join) bsp.shutdown - out = te.batches.flatten.sort + out = te.batches.flatten.map(&:id).sort expected = 0.upto(99).map { |i| i }