Skip to content

Commit

Permalink
refactor: simplifty test consumers by changing waiting logic
Browse files Browse the repository at this point in the history
Rather than counting the messages received by the tests consumer the
tests will now wait count for an expected number of spans received at
the exporter. This has an equivalent outcome, but simplifies the
contents of the consumers.
  • Loading branch information
chrisholmes committed Oct 12, 2022
1 parent 83d935f commit d9819eb
Showing 1 changed file with 31 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,23 @@
let(:instrumentation) { OpenTelemetry::Instrumentation::Racecar::Instrumentation.instance }
let(:exporter) { EXPORTER }
let(:spans) { exporter.finished_spans }
let(:host) { ENV.fetch('TEST_KAFKA_HOST') { '127.0.0.1' } }
let(:port) { (ENV.fetch('TEST_KAFKA_PORT') { 29_092 }) }
let(:host) { ENV.fetch('TEST_KAFKA_HOST', '127.0.0.1') }
let(:port) { ENV.fetch('TEST_KAFKA_PORT', 29_092) }

def tracer
def wait_for_spans(count)
Timeout.timeout(60) do
sleep 0.1 while exporter.finished_spans.size < count
end
end

let(:consumer_class) do
klass = Class.new(Racecar::Consumer)
klass.define_method(:process, &process_method)
klass.subscribes_to(topic_name)
stub_const('TestConsumer', klass)
end

let(:tracer) do
OpenTelemetry.tracer_provider.tracer('test-tracer')
end

Expand Down Expand Up @@ -58,12 +71,6 @@ def stop_racecar(thread)
thread.join(60)
end

def wait_for_messages_seen_by_consumer(count)
Timeout.timeout(60) do
sleep 0.1 until consumer_class.messages_seen.size >= count
end
end

let(:topic_name) do
rand_hash = SecureRandom.hex(10)
"consumer-patch-trace-#{rand_hash}"
Expand All @@ -78,6 +85,7 @@ def wait_for_messages_seen_by_consumer(count)
produce(producer_messages)

@racecar_thread = run_racecar(racecar)
wait_for_spans(expected_spans)
end

after do
Expand All @@ -86,24 +94,14 @@ def wait_for_messages_seen_by_consumer(count)

describe '#process' do
describe 'when the consumer runs and publishes acks' do
let(:consumer_class) do
# a test class
class TestConsumer < Racecar::Consumer
def self.messages_seen
@messages_seen ||= []
end

def process(message)
produce(
'message seen',
topic: "ack-#{message.topic}"
)
deliver!
TestConsumer.messages_seen << message
end
let(:process_method) do
lambda do |message|
produce(
'message seen',
topic: "ack-#{message.topic}"
)
deliver!
end
TestConsumer.subscribes_to(topic_name)
TestConsumer
end

let(:producer_messages) do
Expand All @@ -118,14 +116,12 @@ def process(message)
}]
end

it 'traces each message and traces publishing' do
wait_for_messages_seen_by_consumer(2)
let(:expected_spans) { 6 }

it 'traces each message and traces publishing' do
process_spans = spans.select { |s| s.name == "#{topic_name} process" }
racecar_send_spans = spans.select { |s| s.name == "ack-#{topic_name} send" }

_(spans.size).must_equal(6)

# First pair for send and process spans
first_process_span = process_spans[0]
_(first_process_span.name).must_equal("#{topic_name} process")
Expand Down Expand Up @@ -173,20 +169,10 @@ def process(message)
end

describe 'for an erroring consumer' do
let(:consumer_class) do
# a test class
class ErrorConsumer < Racecar::Consumer
def self.messages_seen
@messages_seen ||= []
end

def process(message)
ErrorConsumer.messages_seen << message
raise 'oops'
end
let(:process_method) do
lambda do |_message|
raise 'oops'
end
ErrorConsumer.subscribes_to(topic_name)
ErrorConsumer
end

let(:producer_messages) do
Expand All @@ -197,13 +183,11 @@ def process(message)
}]
end

it 'can consume and publish a message' do
wait_for_messages_seen_by_consumer(1)
let(:expected_spans) { 2 }

it 'can consume and publish a message' do
process_spans = spans.select { |s| s.name == "#{topic_name} process" }

_(spans.size).must_equal(2)

# First pair for send and process spans
first_process_span = process_spans[0]
_(first_process_span.name).must_equal("#{topic_name} process")
Expand Down

0 comments on commit d9819eb

Please sign in to comment.