From eab7d986f354d76c31e423e4633bf956c8bb2a9a Mon Sep 17 00:00:00 2001 From: Snehil Chopra Date: Thu, 23 Jul 2020 20:10:21 +0000 Subject: [PATCH] Addressed review comments --- .../sdk/trace/batch_span_processor.h | 12 ++-- sdk/src/trace/batch_span_processor.cc | 31 +++++----- sdk/test/trace/batch_span_processor_test.cc | 57 +++++++++++++------ 3 files changed, 58 insertions(+), 42 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 8145426bdc..582af392ba 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -35,9 +35,9 @@ class BatchSpanProcessor : public SpanProcessor * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or equal to max_queue_size */ explicit BatchSpanProcessor(std::unique_ptr&& exporter, - const int max_queue_size = 2048, - const int schedule_delay_millis = 5000, - const int max_export_batch_size = 512); + const size_t max_queue_size = 2048, + std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), + const size_t max_export_batch_size = 512); /** * Requests a Recordable(Span) from the configured exporter. @@ -128,9 +128,9 @@ class BatchSpanProcessor : public SpanProcessor std::unique_ptr worker_thread_; /* Configurable parameters as per the official specs */ - const int schedule_delay_millis_; - const int max_queue_size_; - const int max_export_batch_size_; + const std::chrono::milliseconds schedule_delay_millis_; + const size_t max_queue_size_; + const size_t max_export_batch_size_; /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index b3981c432b..182562a78a 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -17,9 +17,9 @@ namespace trace { BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr&& exporter, - const int max_queue_size, - const int schedule_delay_millis, - const int max_export_batch_size) + const size_t max_queue_size, + const std::chrono::milliseconds schedule_delay_millis, + const size_t max_export_batch_size) :exporter_(std::move(exporter)), max_queue_size_(max_queue_size), schedule_delay_millis_(schedule_delay_millis), @@ -52,20 +52,17 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr &&span) noexcept std::unique_lock lk(cv_m_); - if(static_cast(buffer_->size()) >= max_queue_size_){ + if(buffer_->Add(span) == false) + { // TODO: glog that spans will likely be dropped } - buffer_->Add(span); - // If the queue gets at least half full a preemptive notification is // sent to the worker thread to start a new export cycle. - if(static_cast(buffer_->size()) >= max_queue_size_ / 2){ + if(buffer_->size() >= max_queue_size_ / 2){ // signal the worker thread cv_.notify_one(); } - - lk.unlock(); } void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept @@ -118,7 +115,7 @@ void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept } void BatchSpanProcessor::DoBackgroundWork(){ - int timeout = schedule_delay_millis_; + auto timeout = schedule_delay_millis_; while (true) { @@ -126,7 +123,7 @@ void BatchSpanProcessor::DoBackgroundWork(){ // If we already have max_export_batch_size_ spans in the buffer, better to export them // now - if (static_cast(buffer_->size()) < max_export_batch_size_) + if (buffer_->size() < max_export_batch_size_) { // In case of spurious wake up, we export only if we have atleast one span // in the batch. This is acceptable because batching is a best mechanism @@ -170,7 +167,7 @@ void BatchSpanProcessor::DoBackgroundWork(){ auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end - start); - timeout = schedule_delay_millis_ - duration.count(); + timeout = schedule_delay_millis_ - duration; } } @@ -215,17 +212,15 @@ std::unique_ptr> BatchSpanProcessor::CopySpan new common::CircularBuffer(max_queue_size_) ); - // Get the appropriate size. - const int buffer_size = static_cast(buffer_->size()); - if(was_force_flush_called == true) { buffer_.swap(buffer_copy); } else { - const int num_spans_to_export = buffer_size >= max_export_batch_size_ ? - max_export_batch_size_ : buffer_size; + // Get the appropriate size + const size_t num_spans_to_export = buffer_->size() >= max_export_batch_size_ ? + max_export_batch_size_ : buffer_->size(); buffer_->Consume( num_spans_to_export, [&](CircularBufferRange> range) noexcept { @@ -243,7 +238,7 @@ std::unique_ptr> BatchSpanProcessor::CopySpan void BatchSpanProcessor::DrainQueue() { - while(static_cast(buffer_->size()) > 0) Export(buffer_, false); + while(buffer_->empty() == false) Export(buffer_, false); } void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index ab77e285f0..fcbef0b53b 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -16,8 +16,12 @@ class MockSpanExporter final : public sdk::trace::SpanExporter public: MockSpanExporter(std::shared_ptr>> spans_received, std::shared_ptr is_shutdown, + std::shared_ptr is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept - : spans_received_(spans_received), is_shutdown_(is_shutdown), export_delay_(export_delay) + : spans_received_(spans_received), + is_shutdown_(is_shutdown), + is_export_completed_(is_export_completed), + export_delay_(export_delay) {} std::unique_ptr MakeRecordable() noexcept override @@ -27,6 +31,8 @@ class MockSpanExporter final : public sdk::trace::SpanExporter sdk::trace::ExportResult Export(const nostd::span> &recordables) noexcept override { + *is_export_completed_ = false; + std::this_thread::sleep_for(export_delay_); for (auto &recordable : recordables) @@ -40,6 +46,7 @@ class MockSpanExporter final : public sdk::trace::SpanExporter } } + *is_export_completed_ = true; return sdk::trace::ExportResult::kSuccess; } @@ -48,9 +55,15 @@ class MockSpanExporter final : public sdk::trace::SpanExporter *is_shutdown_ = true; } + bool IsExportCompleted() + { + return *is_export_completed_; + } + private: std::shared_ptr>> spans_received_; std::shared_ptr is_shutdown_; + std::shared_ptr is_export_completed_; // Meant exclusively to test force flush timeout const std::chrono::milliseconds export_delay_; }; @@ -64,14 +77,18 @@ class BatchSpanProcessorTestPeer : public testing::Test std::shared_ptr GetMockProcessor( std::shared_ptr>> spans_received, std::shared_ptr is_shutdown, + std::shared_ptr is_export_completed = std::shared_ptr(new bool(false)), const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), - const int schedule_delay_millis = 5000, - const int max_queue_size = 2048, - const int max_export_batch_size = 512 + const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), + const size_t max_queue_size = 2048, + const size_t max_export_batch_size = 512 ) { return std::shared_ptr( - new sdk::trace::BatchSpanProcessor(GetMockExporter(spans_received, is_shutdown, export_delay), + new sdk::trace::BatchSpanProcessor(GetMockExporter(spans_received, + is_shutdown, + is_export_completed, + export_delay), max_queue_size, schedule_delay_millis, max_export_batch_size)); @@ -100,10 +117,14 @@ class BatchSpanProcessorTestPeer : public testing::Test std::unique_ptr GetMockExporter( std::shared_ptr>> spans_received, std::shared_ptr is_shutdown, + std::shared_ptr is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0) ) { - return std::unique_ptr(new MockSpanExporter(spans_received, is_shutdown, export_delay)); + return std::unique_ptr(new MockSpanExporter(spans_received, + is_shutdown, + is_export_completed, + export_delay)); } }; @@ -174,17 +195,16 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush) } } -// TODO : Discuss and revise this test (maybe ForceFlush can return a bool) TEST_F(BatchSpanProcessorTestPeer, TestForceFlushTimeout) { - std::shared_ptr is_shutdown(new bool(false)); + std::shared_ptr is_shutdown(new bool(false)), is_export_completed(new bool(false)); std::shared_ptr>> spans_received( new std::vector>); const std::chrono::milliseconds export_delay(400); const int num_spans = 3; - auto batch_processor = GetMockProcessor(spans_received, is_shutdown, export_delay); + auto batch_processor = GetMockProcessor(spans_received, is_shutdown, is_export_completed, export_delay); auto test_spans = GetTestSpans(batch_processor, num_spans); @@ -195,7 +215,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlushTimeout) // force flush batch_processor->ForceFlush(std::chrono::milliseconds(300)); - // TODO: Discuss what to expect here. I am not so sure on this. + EXPECT_FALSE(*is_export_completed); } TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss) @@ -257,31 +277,32 @@ TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis) /* Test that max_export_batch_size spans are exported every schedule_delay_millis seconds */ - std::shared_ptr is_shutdown(new bool(false)); + std::shared_ptr is_shutdown(new bool(false)), is_export_completed(new bool(false)); std::shared_ptr>> spans_received( new std::vector>); const std::chrono::milliseconds export_delay(0); - const int schedule_delay_millis = 200; - const int max_export_batch_size = 512; // default value + const std::chrono::milliseconds schedule_delay_millis(200); + const size_t max_export_batch_size = 512; // default value auto batch_processor = GetMockProcessor(spans_received, - is_shutdown, + is_shutdown, + is_export_completed, export_delay, schedule_delay_millis); auto test_spans = GetTestSpans(batch_processor, max_export_batch_size); - for(int i = 0; i < max_export_batch_size; ++i){ + for(size_t i = 0; i < max_export_batch_size; ++i){ batch_processor->OnEnd(std::move(test_spans->at(i))); } - // Sleep for schedule_delay_millis milliseconds - std::this_thread::sleep_for(std::chrono::milliseconds(schedule_delay_millis + 10)); + // Sleep for schedule_delay_millis milliseconds + small delay to give time to export + std::this_thread::sleep_for(schedule_delay_millis + std::chrono::milliseconds(10)); // Spans should be exported by now EXPECT_EQ(max_export_batch_size, spans_received->size()); - for(int i = 0; i < max_export_batch_size; ++i) + for(size_t i = 0; i < max_export_batch_size; ++i) { EXPECT_EQ("Span " + i, spans_received->at(i)->GetName()); }