Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
snehilchopra committed Jul 23, 2020
1 parent a8ac7a1 commit eab7d98
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 42 deletions.
12 changes: 6 additions & 6 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class BatchSpanProcessor : public SpanProcessor
* @param max_export_batch_size - The maximum batch size of every export. It must be smaller or equal to max_queue_size
*/
explicit BatchSpanProcessor(std::unique_ptr<SpanExporter>&& exporter,
const int max_queue_size = 2048,
const int schedule_delay_millis = 5000,
const int max_export_batch_size = 512);
const size_t max_queue_size = 2048,
std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512);

/**
* Requests a Recordable(Span) from the configured exporter.
Expand Down Expand Up @@ -128,9 +128,9 @@ class BatchSpanProcessor : public SpanProcessor
std::unique_ptr<std::thread> worker_thread_;

/* Configurable parameters as per the official specs */
const int schedule_delay_millis_;
const int max_queue_size_;
const int max_export_batch_size_;
const std::chrono::milliseconds schedule_delay_millis_;
const size_t max_queue_size_;
const size_t max_export_batch_size_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
Expand Down
31 changes: 13 additions & 18 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace trace
{

BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr<SpanExporter>&& exporter,
const int max_queue_size,
const int schedule_delay_millis,
const int max_export_batch_size)
const size_t max_queue_size,
const std::chrono::milliseconds schedule_delay_millis,
const size_t max_export_batch_size)
:exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
schedule_delay_millis_(schedule_delay_millis),
Expand Down Expand Up @@ -52,20 +52,17 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr<Recordable> &&span) noexcept

std::unique_lock<std::mutex> lk(cv_m_);

if(static_cast<int>(buffer_->size()) >= max_queue_size_){
if(buffer_->Add(span) == false)
{
// TODO: glog that spans will likely be dropped
}

buffer_->Add(span);

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if(static_cast<int>(buffer_->size()) >= max_queue_size_ / 2){
if(buffer_->size() >= max_queue_size_ / 2){
// signal the worker thread
cv_.notify_one();
}

lk.unlock();
}

void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
Expand Down Expand Up @@ -118,15 +115,15 @@ void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
}

void BatchSpanProcessor::DoBackgroundWork(){
int timeout = schedule_delay_millis_;
auto timeout = schedule_delay_millis_;

while (true)
{
std::unique_lock<std::mutex> lk(cv_m_);

// If we already have max_export_batch_size_ spans in the buffer, better to export them
// now
if (static_cast<int>(buffer_->size()) < max_export_batch_size_)
if (buffer_->size() < max_export_batch_size_)
{
// In case of spurious wake up, we export only if we have atleast one span
// in the batch. This is acceptable because batching is a best mechanism
Expand Down Expand Up @@ -170,7 +167,7 @@ void BatchSpanProcessor::DoBackgroundWork(){
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

timeout = schedule_delay_millis_ - duration.count();
timeout = schedule_delay_millis_ - duration;
}
}

Expand Down Expand Up @@ -215,17 +212,15 @@ std::unique_ptr<common::CircularBuffer<Recordable>> BatchSpanProcessor::CopySpan
new common::CircularBuffer<Recordable>(max_queue_size_)
);

// Get the appropriate size.
const int buffer_size = static_cast<int>(buffer_->size());

if(was_force_flush_called == true)
{
buffer_.swap(buffer_copy);
}
else
{
const int num_spans_to_export = buffer_size >= max_export_batch_size_ ?
max_export_batch_size_ : buffer_size;
// Get the appropriate size
const size_t num_spans_to_export = buffer_->size() >= max_export_batch_size_ ?
max_export_batch_size_ : buffer_->size();

buffer_->Consume(
num_spans_to_export, [&](CircularBufferRange<AtomicUniquePtr<Recordable>> range) noexcept {
Expand All @@ -243,7 +238,7 @@ std::unique_ptr<common::CircularBuffer<Recordable>> BatchSpanProcessor::CopySpan

void BatchSpanProcessor::DrainQueue()
{
while(static_cast<int>(buffer_->size()) > 0) Export(buffer_, false);
while(buffer_->empty() == false) Export(buffer_, false);
}

void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
Expand Down
57 changes: 39 additions & 18 deletions sdk/test/trace/batch_span_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
public:
MockSpanExporter(std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
std::shared_ptr<bool> is_shutdown,
std::shared_ptr<bool> is_export_completed,
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept
: spans_received_(spans_received), is_shutdown_(is_shutdown), export_delay_(export_delay)
: spans_received_(spans_received),
is_shutdown_(is_shutdown),
is_export_completed_(is_export_completed),
export_delay_(export_delay)
{}

std::unique_ptr<sdk::trace::Recordable> MakeRecordable() noexcept override
Expand All @@ -27,6 +31,8 @@ class MockSpanExporter final : public sdk::trace::SpanExporter

sdk::trace::ExportResult Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
{
*is_export_completed_ = false;

std::this_thread::sleep_for(export_delay_);

for (auto &recordable : recordables)
Expand All @@ -40,6 +46,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
}
}

*is_export_completed_ = true;
return sdk::trace::ExportResult::kSuccess;
}

Expand All @@ -48,9 +55,15 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
*is_shutdown_ = true;
}

bool IsExportCompleted()
{
return *is_export_completed_;
}

private:
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received_;
std::shared_ptr<bool> is_shutdown_;
std::shared_ptr<bool> is_export_completed_;
// Meant exclusively to test force flush timeout
const std::chrono::milliseconds export_delay_;
};
Expand All @@ -64,14 +77,18 @@ class BatchSpanProcessorTestPeer : public testing::Test
std::shared_ptr<sdk::trace::SpanProcessor> GetMockProcessor(
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
std::shared_ptr<bool> is_shutdown,
std::shared_ptr<bool> is_export_completed = std::shared_ptr<bool>(new bool(false)),
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0),
const int schedule_delay_millis = 5000,
const int max_queue_size = 2048,
const int max_export_batch_size = 512
const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000),
const size_t max_queue_size = 2048,
const size_t max_export_batch_size = 512
)
{
return std::shared_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::BatchSpanProcessor(GetMockExporter(spans_received, is_shutdown, export_delay),
new sdk::trace::BatchSpanProcessor(GetMockExporter(spans_received,
is_shutdown,
is_export_completed,
export_delay),
max_queue_size,
schedule_delay_millis,
max_export_batch_size));
Expand Down Expand Up @@ -100,10 +117,14 @@ class BatchSpanProcessorTestPeer : public testing::Test
std::unique_ptr<sdk::trace::SpanExporter> GetMockExporter(
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
std::shared_ptr<bool> is_shutdown,
std::shared_ptr<bool> is_export_completed,
const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)
)
{
return std::unique_ptr<sdk::trace::SpanExporter>(new MockSpanExporter(spans_received, is_shutdown, export_delay));
return std::unique_ptr<sdk::trace::SpanExporter>(new MockSpanExporter(spans_received,
is_shutdown,
is_export_completed,
export_delay));
}
};

Expand Down Expand Up @@ -174,17 +195,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
}
}

// TODO : Discuss and revise this test (maybe ForceFlush can return a bool)
TEST_F(BatchSpanProcessorTestPeer, TestForceFlushTimeout)
{
std::shared_ptr<bool> is_shutdown(new bool(false));
std::shared_ptr<bool> is_shutdown(new bool(false)), is_export_completed(new bool(false));
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);

const std::chrono::milliseconds export_delay(400);
const int num_spans = 3;

auto batch_processor = GetMockProcessor(spans_received, is_shutdown, export_delay);
auto batch_processor = GetMockProcessor(spans_received, is_shutdown, is_export_completed, export_delay);

auto test_spans = GetTestSpans(batch_processor, num_spans);

Expand All @@ -195,7 +215,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlushTimeout)
// force flush
batch_processor->ForceFlush(std::chrono::milliseconds(300));

// TODO: Discuss what to expect here. I am not so sure on this.
EXPECT_FALSE(*is_export_completed);
}

TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
Expand Down Expand Up @@ -257,31 +277,32 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
/* Test that max_export_batch_size spans are exported every schedule_delay_millis
seconds */

std::shared_ptr<bool> is_shutdown(new bool(false));
std::shared_ptr<bool> is_shutdown(new bool(false)), is_export_completed(new bool(false));
std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
new std::vector<std::unique_ptr<sdk::trace::SpanData>>);

const std::chrono::milliseconds export_delay(0);
const int schedule_delay_millis = 200;
const int max_export_batch_size = 512; // default value
const std::chrono::milliseconds schedule_delay_millis(200);
const size_t max_export_batch_size = 512; // default value

auto batch_processor = GetMockProcessor(spans_received,
is_shutdown,
is_shutdown,
is_export_completed,
export_delay,
schedule_delay_millis);

auto test_spans = GetTestSpans(batch_processor, max_export_batch_size);

for(int i = 0; i < max_export_batch_size; ++i){
for(size_t i = 0; i < max_export_batch_size; ++i){
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

// Sleep for schedule_delay_millis milliseconds
std::this_thread::sleep_for(std::chrono::milliseconds(schedule_delay_millis + 10));
// Sleep for schedule_delay_millis milliseconds + small delay to give time to export
std::this_thread::sleep_for(schedule_delay_millis + std::chrono::milliseconds(10));

// Spans should be exported by now
EXPECT_EQ(max_export_batch_size, spans_received->size());
for(int i = 0; i < max_export_batch_size; ++i)
for(size_t i = 0; i < max_export_batch_size; ++i)
{
EXPECT_EQ("Span " + i, spans_received->at(i)->GetName());
}
Expand Down

0 comments on commit eab7d98

Please sign in to comment.