diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h new file mode 100644 index 00000000000..f27bf231fcd --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -0,0 +1,139 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "opentelemetry/sdk/common/circular_buffer.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/processor.h" + +#include +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ + +namespace logs +{ + +/** + * This is an implementation of the LogProcessor which creates batches of finished logs and passes + * the export-friendly log data representations to the configured LogExporter. + */ +class BatchLogProcessor : public LogProcessor +{ +public: + /** + * Creates a batch log processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the logs to + * @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are + * dropped. + * @param scheduled_delay_millis - The time interval between two consecutive exports. + * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or + * equal to max_queue_size + */ + explicit BatchLogProcessor( + std::unique_ptr &&exporter, + const size_t max_queue_size = 2048, + const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), + const size_t max_export_batch_size = 512); + + /** Makes a new recordable **/ + std::unique_ptr MakeRecordable() noexcept override; + + /** + * Called when the Logger's log method creates a log record + * @param record the log record + */ + + void OnReceive(std::unique_ptr &&record) noexcept override; + + /** + * Export all log records that have not been exported yet. + * + * NOTE: Timeout functionality not supported yet. + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of + * all its logs and passes them to the exporter. Any subsequent calls to + * ForceFlush or Shutdown will return immediately without doing anything. + * + * NOTE: Timeout functionality not supported yet. + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Class destructor which invokes the Shutdown() method. + */ + virtual ~BatchLogProcessor() override; + +private: + /** + * The background routine performed by the worker thread. + */ + void DoBackgroundWork(); + + /** + * Exports all logs to the configured exporter. + * + * @param was_force_flush_called - A flag to check if the current export is the result + * of a call to ForceFlush method. If true, then we have to + * notify the main thread to wake it up in the ForceFlush + * method. + */ + void Export(const bool was_for_flush_called); + + /** + * Called when Shutdown() is invoked. Completely drains the queue of all log records and + * passes them to the exporter. + */ + void DrainQueue(); + + /* The configured backend log exporter */ + std::unique_ptr exporter_; + + /* Configurable parameters as per the official *trace* specs */ + const size_t max_queue_size_; + const std::chrono::milliseconds scheduled_delay_millis_; + const size_t max_export_batch_size_; + + /* Synchronization primitives */ + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_cv_m_; + + /* The buffer/queue to which the ended logs are added */ + common::CircularBuffer buffer_; + + /* Important boolean flags to handle the workflow of the processor */ + std::atomic is_shutdown_{false}; + std::atomic is_force_flush_{false}; + std::atomic is_force_flush_notified_{false}; + + /* The background worker thread */ + std::thread worker_thread_; +}; + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index e2e7c2c9158..44d8909472b 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -1,4 +1,4 @@ add_library(opentelemetry_logs logger_provider.cc logger.cc - simple_log_processor.cc) + simple_log_processor.cc batch_log_processor.cc) target_link_libraries(opentelemetry_logs opentelemetry_common) diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc new file mode 100644 index 00000000000..e61f9a31fd3 --- /dev/null +++ b/sdk/src/logs/batch_log_processor.cc @@ -0,0 +1,213 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/batch_log_processor.h" + +#include +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBufferRange; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, + const size_t max_queue_size, + const std::chrono::milliseconds scheduled_delay_millis, + const size_t max_export_batch_size) + : exporter_(std::move(exporter)), + max_queue_size_(max_queue_size), + scheduled_delay_millis_(scheduled_delay_millis), + max_export_batch_size_(max_export_batch_size), + buffer_(max_queue_size_), + worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) +{} + +std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept +{ + return exporter_->MakeRecordable(); +} + +void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +{ + if (is_shutdown_.load() == true) + { + return; + } + + if (buffer_.Add(record) == false) + { + return; + } + + // If the queue gets at least half full a preemptive notification is + // sent to the worker thread to start a new export cycle. + if (buffer_.size() >= max_queue_size_ / 2) + { + // signal the worker thread + cv_.notify_one(); + } +} + +bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (is_shutdown_.load() == true) + { + return false; + } + + is_force_flush_ = true; + + // Keep attempting to wake up the worker thread + while (is_force_flush_.load() == true) + { + cv_.notify_one(); + } + + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(force_flush_cv_m_); + while (is_force_flush_notified_.load() == false) + { + force_flush_cv_.wait(lk); + } + + // Notify the worker thread + is_force_flush_notified_ = false; + + return true; +} + +void BatchLogProcessor::DoBackgroundWork() +{ + auto timeout = scheduled_delay_millis_; + + while (true) + { + // Wait for `timeout` milliseconds + std::unique_lock lk(cv_m_); + cv_.wait_for(lk, timeout); + + if (is_shutdown_.load() == true) + { + DrainQueue(); + return; + } + + bool was_force_flush_called = is_force_flush_.load(); + + // Check if this export was the result of a force flush. + if (was_force_flush_called == true) + { + // Since this export was the result of a force flush, signal the + // main thread that the worker thread has been notified + is_force_flush_ = false; + } + else + { + // If the buffer was empty during the entire `timeout` time interval, + // go back to waiting. If this was a spurious wake-up, we export only if + // `buffer_` is not empty. This is acceptable because batching is a best + // mechanism effort here. + if (buffer_.empty() == true) + { + continue; + } + } + + auto start = std::chrono::steady_clock::now(); + Export(was_force_flush_called); + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + // Subtract the duration of this export call from the next `timeout`. + timeout = scheduled_delay_millis_ - duration; + } +} + +void BatchLogProcessor::Export(const bool was_force_flush_called) +{ + std::vector> records_arr; + + size_t num_records_to_export; + + if (was_force_flush_called == true) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + buffer_.Consume( + num_records_to_export, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + + // Notify the main thread in case this export was the result of a force flush. + if (was_force_flush_called == true) + { + is_force_flush_notified_ = true; + while (is_force_flush_notified_.load() == true) + { + force_flush_cv_.notify_one(); + } + } +} + +void BatchLogProcessor::DrainQueue() +{ + while (buffer_.empty() == false) + { + Export(false); + } +} + +bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + is_shutdown_.store(true); + + cv_.notify_one(); + worker_thread_.join(); + if (exporter_ != nullptr) + { + return exporter_->Shutdown(); + } + + return true; +} + +BatchLogProcessor::~BatchLogProcessor() +{ + if (is_shutdown_.load() == false) + { + Shutdown(); + } +} + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/logs/BUILD b/sdk/test/logs/BUILD index 9e12e08ab95..e70527243f0 100644 --- a/sdk/test/logs/BUILD +++ b/sdk/test/logs/BUILD @@ -42,3 +42,14 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "batch_log_processor_test", + srcs = [ + "batch_log_processor_test.cc", + ], + deps = [ + "//sdk/src/logs", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index f59c6a1926b..84b865d226e 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,5 +1,5 @@ -foreach(testname logger_provider_sdk_test logger_sdk_test - simple_log_processor_test log_record_test) +foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test + simple_log_processor_test batch_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc new file mode 100644 index 00000000000..baf27de8395 --- /dev/null +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -0,0 +1,276 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/batch_log_processor.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" + +#include +#include +#include + +using namespace opentelemetry::sdk::logs; + +/** + * A sample log exporter + * for testing the batch log processor + */ +class MockLogExporter final : public LogExporter +{ +public: + MockLogExporter(std::shared_ptr>> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed, + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) + : logs_received_(logs_received), + is_shutdown_(is_shutdown), + is_export_completed_(is_export_completed), + export_delay_(export_delay) + {} + + std::unique_ptr MakeRecordable() noexcept + { + return std::unique_ptr(new LogRecord()); + } + + // Export method stores the logs received into a shared list of record names + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override + { + *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis + + for (auto &record : records) + { + auto log = std::unique_ptr(static_cast(record.release())); + if (log != nullptr) + { + logs_received_->push_back(std::move(log)); + } + } + + *is_export_completed_ = true; + return ExportResult::kSuccess; + } + + // toggles the boolean flag marking this exporter as shut down + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + *is_shutdown_ = true; + return true; + } + +private: + std::shared_ptr>> logs_received_; + std::shared_ptr> is_shutdown_; + std::shared_ptr> is_export_completed_; + const std::chrono::milliseconds export_delay_; +}; + +/** + * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above. + */ +class BatchLogProcessorTest : public testing::Test // ::testing::Test +{ +public: + // returns a batch log processor that received a batch of log records, a shared pointer to a + // is_shutdown flag, and the processor configuration options (default if unspecified) + std::shared_ptr GetMockProcessor( + std::shared_ptr>> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed = + std::shared_ptr>(new std::atomic(false)), + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), + const size_t max_queue_size = 2048, + const size_t max_export_batch_size = 512) + { + return std::shared_ptr( + new BatchLogProcessor(std::unique_ptr(new MockLogExporter( + logs_received, is_shutdown, is_export_completed, export_delay)), + max_queue_size, scheduled_delay_millis, max_export_batch_size)); + } +}; + +TEST_F(BatchLogProcessorTest, TestShutdown) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr>> logs_received( + new std::vector>); + std::shared_ptr> is_shutdown(new std::atomic(false)); + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + + // Create a few test log records and send them to the processor + const int num_logs = 3; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + // Test that shutting down the processor will first wait for the + // current batch of logs to be sent to the log exporter + // by checking the number of logs sent and the names of the logs sent + EXPECT_EQ(true, batch_processor->Shutdown()); + + EXPECT_EQ(num_logs, logs_received->size()); + + // Assume logs are received by exporter in same order as sent by processor + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(BatchLogProcessorTest, TestForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } + + // Create some more logs to make sure that the processor still works + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs * 2, logs_received->size()); + for (int i = 0; i < num_logs * 2; ++i) + { + EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); + } +} + +TEST_F(BatchLogProcessorTest, TestManyLogsLoss) +{ + /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + + const int max_queue_size = 4096; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + + // Create max_queue_size log records + for (int i = 0; i < max_queue_size; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + // Log should be exported by now + EXPECT_GE(max_queue_size, logs_received->size()); +} + +TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) +{ + /* Test that no logs are lost when sending max_queue_size logs */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + + EXPECT_TRUE(batch_processor->ForceFlush()); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } +} + +TEST_F(BatchLogProcessorTest, TestScheduledDelayMillis) +{ + /* Test that max_export_batch_size logs are exported every scheduled_delay_millis + seconds */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + std::shared_ptr>> logs_received( + new std::vector>); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds scheduled_delay_millis(2000); + const size_t max_export_batch_size = 512; + + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, scheduled_delay_millis); + + for (int i = 0; i < max_export_batch_size; ++i) + { + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); + batch_processor->OnReceive(std::move(log)); + } + // Sleep for scheduled_delay_millis milliseconds + std::this_thread::sleep_for(scheduled_delay_millis); + + // small delay to give time to export, which is being performed + // asynchronously by the worker thread (this thread will not + // forcibly join() the main thread unless processor's shutdown() is called). + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Logs should be exported by now + EXPECT_TRUE(is_export_completed->load()); + EXPECT_EQ(max_export_batch_size, logs_received->size()); + for (size_t i = 0; i < max_export_batch_size; ++i) + { + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); + } +} diff --git a/third_party/opentelemetry-proto b/third_party/opentelemetry-proto index 59c488bfb8f..f11e0538fd7 160000 --- a/third_party/opentelemetry-proto +++ b/third_party/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 59c488bfb8fb6d0458ad6425758b70259ff4a2bd +Subproject commit f11e0538fd7dc30127ca6bfb2062e5d9f782b77b