Skip to content

Commit

Permalink
Add batch log processor implementation with test coverage (open-telem…
Browse files Browse the repository at this point in the history
  • Loading branch information
Karen Xu authored Dec 22, 2020
1 parent b80dccb commit 5e946f9
Show file tree
Hide file tree
Showing 7 changed files with 643 additions and 4 deletions.
139 changes: 139 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

namespace logs
{

/**
* This is an implementation of the LogProcessor which creates batches of finished logs and passes
* the export-friendly log data representations to the configured LogExporter.
*/
class BatchLogProcessor : public LogProcessor
{
public:
/**
* Creates a batch log processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
*
* @param exporter - The backend exporter to pass the logs to
* @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
* dropped.
* @param scheduled_delay_millis - The time interval between two consecutive exports.
* @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
* equal to max_queue_size
*/
explicit BatchLogProcessor(
std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512);

/** Makes a new recordable **/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;

/**
* Called when the Logger's log method creates a log record
* @param record the log record
*/

void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;

/**
* Export all log records that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter. Any subsequent calls to
* ForceFlush or Shutdown will return immediately without doing anything.
*
* NOTE: Timeout functionality not supported yet.
*/
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Class destructor which invokes the Shutdown() method.
*/
virtual ~BatchLogProcessor() override;

private:
/**
* The background routine performed by the worker thread.
*/
void DoBackgroundWork();

/**
* Exports all logs to the configured exporter.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(const bool was_for_flush_called);

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
* passes them to the exporter.
*/
void DrainQueue();

/* The configured backend log exporter */
std::unique_ptr<LogExporter> exporter_;

/* Configurable parameters as per the official *trace* specs */
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_{false};
std::atomic<bool> is_force_flush_{false};
std::atomic<bool> is_force_flush_notified_{false};

/* The background worker thread */
std::thread worker_thread_;
};

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
2 changes: 1 addition & 1 deletion sdk/src/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(opentelemetry_logs logger_provider.cc logger.cc
simple_log_processor.cc)
simple_log_processor.cc batch_log_processor.cc)

target_link_libraries(opentelemetry_logs opentelemetry_common)
213 changes: 213 additions & 0 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "opentelemetry/sdk/logs/batch_log_processor.h"

#include <vector>
using opentelemetry::sdk::common::AtomicUniquePtr;
using opentelemetry::sdk::common::CircularBufferRange;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size,
const std::chrono::milliseconds scheduled_delay_millis,
const size_t max_export_batch_size)
: exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
scheduled_delay_millis_(scheduled_delay_millis),
max_export_batch_size_(max_export_batch_size),
buffer_(max_queue_size_),
worker_thread_(&BatchLogProcessor::DoBackgroundWork, this)
{}

std::unique_ptr<Recordable> BatchLogProcessor::MakeRecordable() noexcept
{
return exporter_->MakeRecordable();
}

void BatchLogProcessor::OnReceive(std::unique_ptr<Recordable> &&record) noexcept
{
if (is_shutdown_.load() == true)
{
return;
}

if (buffer_.Add(record) == false)
{
return;
}

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if (buffer_.size() >= max_queue_size_ / 2)
{
// signal the worker thread
cv_.notify_one();
}
}

bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
if (is_shutdown_.load() == true)
{
return false;
}

is_force_flush_ = true;

// Keep attempting to wake up the worker thread
while (is_force_flush_.load() == true)
{
cv_.notify_one();
}

// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk(force_flush_cv_m_);
while (is_force_flush_notified_.load() == false)
{
force_flush_cv_.wait(lk);
}

// Notify the worker thread
is_force_flush_notified_ = false;

return true;
}

void BatchLogProcessor::DoBackgroundWork()
{
auto timeout = scheduled_delay_millis_;

while (true)
{
// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(cv_m_);
cv_.wait_for(lk, timeout);

if (is_shutdown_.load() == true)
{
DrainQueue();
return;
}

bool was_force_flush_called = is_force_flush_.load();

// Check if this export was the result of a force flush.
if (was_force_flush_called == true)
{
// Since this export was the result of a force flush, signal the
// main thread that the worker thread has been notified
is_force_flush_ = false;
}
else
{
// If the buffer was empty during the entire `timeout` time interval,
// go back to waiting. If this was a spurious wake-up, we export only if
// `buffer_` is not empty. This is acceptable because batching is a best
// mechanism effort here.
if (buffer_.empty() == true)
{
continue;
}
}

auto start = std::chrono::steady_clock::now();
Export(was_force_flush_called);
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

// Subtract the duration of this export call from the next `timeout`.
timeout = scheduled_delay_millis_ - duration;
}
}

void BatchLogProcessor::Export(const bool was_force_flush_called)
{
std::vector<std::unique_ptr<Recordable>> records_arr;

size_t num_records_to_export;

if (was_force_flush_called == true)
{
num_records_to_export = buffer_.size();
}
else
{
num_records_to_export =
buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size();
}

buffer_.Consume(
num_records_to_export, [&](CircularBufferRange<AtomicUniquePtr<Recordable>> range) noexcept {
range.ForEach([&](AtomicUniquePtr<Recordable> &ptr) {
std::unique_ptr<Recordable> swap_ptr = std::unique_ptr<Recordable>(nullptr);
ptr.Swap(swap_ptr);
records_arr.push_back(std::unique_ptr<Recordable>(swap_ptr.release()));
return true;
});
});

exporter_->Export(
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()));

// Notify the main thread in case this export was the result of a force flush.
if (was_force_flush_called == true)
{
is_force_flush_notified_ = true;
while (is_force_flush_notified_.load() == true)
{
force_flush_cv_.notify_one();
}
}
}

void BatchLogProcessor::DrainQueue()
{
while (buffer_.empty() == false)
{
Export(false);
}
}

bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
{
return exporter_->Shutdown();
}

return true;
}

BatchLogProcessor::~BatchLogProcessor()
{
if (is_shutdown_.load() == false)
{
Shutdown();
}
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/logs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "batch_log_processor_test",
srcs = [
"batch_log_processor_test.cc",
],
deps = [
"//sdk/src/logs",
"@com_google_googletest//:gtest_main",
],
)
4 changes: 2 additions & 2 deletions sdk/test/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
foreach(testname logger_provider_sdk_test logger_sdk_test
simple_log_processor_test log_record_test)
foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test
simple_log_processor_test batch_log_processor_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs)
Expand Down
Loading

0 comments on commit 5e946f9

Please sign in to comment.