Skip to content

Commit

Permalink
Add recorded? logic to BatchSampledSpanProcessor
Browse files Browse the repository at this point in the history
* also some rubocop fixes
* fleshed out some tests as well
  • Loading branch information
rdooley committed Sep 13, 2019
1 parent a6a8842 commit c3430b7
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
2 changes: 2 additions & 0 deletions sdk/.rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
AllCops:
TargetRubyVersion: '2.4.0'

Metrics/BlockLength:
Enabled: false
Metrics/LineLength:
Enabled: false
Metrics/ParameterLists:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Copyright 2019 OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
require 'thread'

module OpenTelemetry
module SDK
Expand Down Expand Up @@ -45,10 +44,12 @@ def on_start(span)

# adds a span to the batcher, threadsafe may block on lock
def on_end(span)
return unless span.recorded?

lock do
spans.shift if spans.size >= max_queue_size
spans << span
@condition.signal if spans.size > max_queue_size/2
@condition.signal if spans.size > max_queue_size / 2
end
end

Expand All @@ -72,13 +73,10 @@ def work
keep_running = nil
batch = nil
lock do
if spans.size < max_queue_size
loop do
break if !spans.empty? || !@keep_running
@condition.wait(@mutex, @delay)
end
if spans.size < max_queue_size
@condition.wait(@mutex, @delay) while spans.empty? && @keep_running
keep_running = @keep_running
end
keep_running = @keep_running
batch = fetch_batch
end
# this is done outside the lock to unblock the producers
Expand All @@ -89,15 +87,14 @@ def work
end

def flush
until spans.empty? do
@exporter.export(fetch_batch)
end
@exporter.export(fetch_batch) until spans.empty?
end

def fetch_batch
batch = []
loop do
break if batch.size >= @batch_size || spans.empty?

batch << spans.shift.to_span_proto
end
batch
Expand Down
2 changes: 1 addition & 1 deletion sdk/opentelemetry-sdk.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'minitest', '~> 5.0'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'rubocop', '~> 0.73.0'
spec.add_development_dependency 'simplecov', '~> 0.17'
spec.add_development_dependency 'yard', '~> 0.9'
spec.add_development_dependency 'yard-doctest', '~> 0.1.6'
spec.add_development_dependency 'simplecov', '~> 0.17'
end
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ def batches
end
end

class TestSpan
def initialize(id = nil, recorded = true)
@id = id
@recorded = recorded
end

attr_reader :id

def recorded?
@recorded
end

def to_span_proto
self
end
end

describe 'lifecycle' do
it 'should stop and start correctly' do
bsp = BatchSampledSpanProcessor.new(exporter: TestExporter.new)
Expand All @@ -28,26 +45,43 @@ def batches
it 'should flush everything on shutdown' do
te = TestExporter.new
bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 3)
bsp.on_end('foo')
ts = TestSpan.new
bsp.on_end(ts)

bsp.shutdown

te.batches.must_equal [['foo']]
te.batches.must_equal [[ts]]
end
end

describe 'batching' do
it 'should batch up to the max_batch' do
it 'should batch up to but not over the max_batch' do
te = TestExporter.new

bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3)

bsp.on_end('1')
bsp.on_end('2')
bsp.on_end('3')
tss = [[TestSpan.new, TestSpan.new, TestSpan.new, TestSpan.new]]
bsp.on_end(tss[0][0])
bsp.on_end(tss[0][1])
bsp.on_end(tss[0][2])
bsp.on_end(tss[0][3])
bsp.shutdown

te.batches[0].size.must_equal(3)
te.batches[1].size.must_equal(1)
end

it 'should batch only recorded samples' do
te = TestExporter.new

bsp = BatchSampledSpanProcessor.new(exporter: te, max_queue_size: 6, max_export_batch_size: 3)

tss = [[TestSpan.new, TestSpan.new(nil, false)]]
bsp.on_end(tss[0][0])
bsp.on_end(tss[0][1])
bsp.shutdown

te.batches[0].size.must_equal(1)
end
end

Expand All @@ -60,15 +94,15 @@ def batches
Thread.new do
x = i * 10
0.upto(9) do |j|
bsp.on_end(x + j)
bsp.on_end(TestSpan.new(x + j))
end
sleep(rand 0.01)
sleep(rand(0.01))
end
end
producers.each(&:join)
bsp.shutdown

out = te.batches.flatten.sort
out = te.batches.flatten.map(&:id).sort

expected = 0.upto(99).map { |i| i }

Expand Down

0 comments on commit c3430b7

Please sign in to comment.