Skip to content

Commit

Permalink
Addressed further comments
Browse files Browse the repository at this point in the history
  • Loading branch information
snehilchopra committed Aug 4, 2020
1 parent fef4c4c commit 6d5eb05
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 82 deletions.
10 changes: 5 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,25 @@ class BatchSpanProcessor : public SpanProcessor
/* The configured backend exporter */
std::unique_ptr<SpanExporter> exporter_;

/* The background worker thread */
std::thread worker_thread_;

/* Configurable parameters as per the official specs */
const std::chrono::milliseconds schedule_delay_millis_;
const size_t max_queue_size_;
const std::chrono::milliseconds schedule_delay_millis_;
const size_t max_export_batch_size_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended spans are added */
std::unique_ptr<common::CircularBuffer<Recordable>> buffer_;
common::CircularBuffer<Recordable> buffer_;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_{false};
std::atomic<bool> is_force_flush_{false};
std::atomic<bool> is_force_flush_notified_{false};

/* The background worker thread */
std::thread worker_thread_;
};

} // namespace trace
Expand Down
5 changes: 3 additions & 2 deletions sdk/src/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_library(
opentelemetry_trace tracer_provider.cc tracer.cc span.cc
samplers/parent_or_else.cc samplers/probability.cc)
opentelemetry_trace
tracer_provider.cc tracer.cc span.cc batch_span_processor.cc
samplers/parent_or_else.cc samplers/probability.cc)
102 changes: 37 additions & 65 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,36 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
: exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
schedule_delay_millis_(schedule_delay_millis),
max_export_batch_size_(max_export_batch_size)
{
buffer_ =
std::unique_ptr<CircularBuffer<Recordable>>(new CircularBuffer<Recordable>(max_queue_size_));

// Start the background worker thread
worker_thread_ = std::thread(&BatchSpanProcessor::DoBackgroundWork, this);
}
max_export_batch_size_(max_export_batch_size),
buffer_(max_queue_size_),
worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this)
{}

std::unique_ptr<Recordable> BatchSpanProcessor::MakeRecordable() noexcept
{
return exporter_->MakeRecordable();
}

void BatchSpanProcessor::OnStart(Recordable &span) noexcept
void BatchSpanProcessor::OnStart(Recordable &) noexcept
{
// no-op
(void)span;
}

void BatchSpanProcessor::OnEnd(std::unique_ptr<Recordable> &&span) noexcept
{
if (is_shutdown_.load())
if (is_shutdown_.load() == true)
{
return;
}

if (buffer_->Add(span) == false)
if (buffer_.Add(span) == false)
{
return;
}

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if (buffer_->size() >= max_queue_size_ / 2)
if (buffer_.size() >= max_queue_size_ / 2)
{
// signal the worker thread
cv_.notify_one();
Expand All @@ -60,7 +55,7 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr<Recordable> &&span) noexcept

void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
if (is_shutdown_.load())
if (is_shutdown_.load() == true)
{
return;
}
Expand Down Expand Up @@ -90,55 +85,43 @@ void BatchSpanProcessor::DoBackgroundWork()

while (true)
{
// If we already have max_export_batch_size_ spans in the buffer, better to export them
// now
if (buffer_->size() < max_export_batch_size_)
// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(cv_m_);
cv_.wait_for(lk, timeout);

if (is_shutdown_.load() == true)
{
// In case of spurious wake up, we export only if we have atleast one span
// in the batch. This is acceptable because batching is a best mechanism
// effort here.
do
{
std::unique_lock<std::mutex> lk(cv_m_);

cv_.wait_for(lk, std::chrono::milliseconds(timeout));

// If shutdown has been invoked, break out of the loop
// and drain out the queue.
if (is_shutdown_.load() == true)
{
is_shutdown_ = false;
// ensure the main thread has been notified
while (is_shutdown_.load() == false)
;
DrainQueue();
return;
}

if (is_force_flush_.load() == true)
{
break;
}
} while (buffer_->empty() == true);
DrainQueue();
return;
}

// Get the value of is_force_flush_ to check whether this export is
// a result of a ForceFlush call. This flag is propagated to the Export
// method as well.
bool was_force_flush_called = is_force_flush_.load();

// Set the flag back to false to notify main thread in case ForceFlush
// was invoked.
if (is_force_flush_.load() == true)
// Check if this export was the result of a force flush.
if (was_force_flush_called == true)
{
// Since this export was the result of a force flush, signal the
// main thread that the worker thread has been notified
is_force_flush_ = false;
}
else
{
// If the buffer was empty during the entire `timeout` time interval,
// go back to waiting. If this was a spurious wake-up, we export only if
// `buffer_` is not empty. This is acceptable because batching is a best
// mechanism effort here.
if (buffer_.empty() == true)
{
continue;
}
}

auto start = std::chrono::steady_clock::now();
Export(was_force_flush_called);
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

// Subtract the duration of this export call from the next `timeout`.
timeout = schedule_delay_millis_ - duration;
}
}
Expand All @@ -151,15 +134,15 @@ void BatchSpanProcessor::Export(const bool was_force_flush_called)

if (was_force_flush_called == true)
{
num_spans_to_export = buffer_->size();
num_spans_to_export = buffer_.size();
}
else
{
num_spans_to_export =
buffer_->size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_->size();
buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size();
}

buffer_->Consume(
buffer_.Consume(
num_spans_to_export, [&](CircularBufferRange<AtomicUniquePtr<Recordable>> range) noexcept {
range.ForEach([&](AtomicUniquePtr<Recordable> &ptr) {
std::unique_ptr<Recordable> swap_ptr = std::unique_ptr<Recordable>(nullptr);
Expand All @@ -180,16 +163,11 @@ void BatchSpanProcessor::Export(const bool was_force_flush_called)
force_flush_cv_.notify_one();
}
}

if (is_shutdown_.load() == true)
{
return;
}
}

void BatchSpanProcessor::DrainQueue()
{
while (buffer_->empty() == false)
while (buffer_.empty() == false)
{
Export(false);
}
Expand All @@ -199,13 +177,7 @@ void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_ = true;

// Notify thread, drain the queue and wait for the thread.
while (is_shutdown_.load() == true)
{
cv_.notify_one();
}

is_shutdown_ = true;
cv_.notify_one();
worker_thread_.join();

exporter_->Shutdown();
Expand Down
3 changes: 2 additions & 1 deletion sdk/test/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ foreach(
always_off_sampler_test
always_on_sampler_test
parent_or_else_sampler_test
probability_sampler_test)
probability_sampler_test
batch_span_processor_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
Expand Down
16 changes: 7 additions & 9 deletions sdk/test/trace/batch_span_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

batch_processor->Shutdown(std::chrono::milliseconds(50));
batch_processor->Shutdown();

EXPECT_EQ(num_spans, spans_received->size());
for (int i = 0; i < num_spans; ++i)
Expand All @@ -162,10 +162,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

// give some time to export
// Give some time to export
std::this_thread::sleep_for(std::chrono::milliseconds(50));

// force flush
batch_processor->ForceFlush();

EXPECT_EQ(num_spans, spans_received->size());
Expand All @@ -181,15 +180,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
batch_processor->OnEnd(std::move(more_test_spans->at(i)));
}

// give some time to export the spans
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(50));

batch_processor->ForceFlush();

EXPECT_EQ(num_spans * 2, spans_received->size());
for (int i = 0; i < num_spans; ++i)
{
EXPECT_EQ("Span " + std::to_string(i % num_spans), spans_received->at(i)->GetName());
EXPECT_EQ("Span " + std::to_string(i % num_spans),
spans_received->at(num_spans + i)->GetName());
}
}

Expand All @@ -212,10 +212,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

// give some time to export the spans
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(700));

// force flush
batch_processor->ForceFlush();

// Span should be exported by now
Expand All @@ -241,10 +240,9 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

// give some time to export the spans
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(50));

// force flush
batch_processor->ForceFlush();

EXPECT_EQ(num_spans, spans_received->size());
Expand Down

0 comments on commit 6d5eb05

Please sign in to comment.