Skip to content

Commit

Permalink
Addressed reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
snehilchopra committed Aug 1, 2020
1 parent e136f83 commit 9bf95d8
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 231 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include <cstdint>
#include <memory>

#include "opentelemetry/sdk/common/atomic_unique_ptr.h"
#include "opentelemetry/sdk/common/circular_buffer_range.h"
#include "opentelemetry/version.h"
#include "src/common/atomic_unique_ptr.h"
#include "src/common/circular_buffer_range.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down
File renamed without changes.
44 changes: 12 additions & 32 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

/* Forward declaration of the Circular Buffer to avoid pulling in external dependencies in
* //sdk:headers */
namespace common
{
template <class T>
class CircularBuffer;
}

namespace trace
{

Expand Down Expand Up @@ -54,6 +48,7 @@ class BatchSpanProcessor : public SpanProcessor

/**
* Called when a span is started.
*
* NOTE: This method is a no-op.
*
* @param span - The span that just started
Expand All @@ -70,19 +65,19 @@ class BatchSpanProcessor : public SpanProcessor
/**
* Export all ended spans that have not been exported yet.
*
* @param timeout - An optional timeout. A default timeout of 0 means no timeout.
* NOTE: Timeout functionality not supported yet.
*/
void ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override;
std::chrono::microseconds timeout = std::chrono::milliseconds(0)) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its ended spans and passes them to the exporter. Any subsequent calls to OnStart, OnEnd,
* ForceFlush or Shutdown will return immediately without doing anything.
*
* @param timeout - An optional timeout. A default timeout of 0 means no timeout.
* NOTE: Timeout functionality not supported yet.
*/
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override;
void Shutdown(std::chrono::microseconds timeout = std::chrono::milliseconds(0)) noexcept override;

/**
* Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be
Expand All @@ -101,39 +96,24 @@ class BatchSpanProcessor : public SpanProcessor
/**
* Exports all ended spans to the configured exporter.
*
* @param buffer - The buffer with ended spans to export
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(std::unique_ptr<common::CircularBuffer<Recordable>> &buffer,
const bool was_for_flush_called);
void Export(const bool was_for_flush_called);

/**
* Called when Shutdown() is invoked. Completely drains the queue of all its ended spans and
* passes them to the exporter.
*/
void DrainQueue();

/**
* Consumes and copies the appropriate amount of spans from the buffer_
* to a copy buffer which is then exported. This helps unblock all producers
* and increases the overall synchronization performance.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, we consume
* and copy the entire buffer_ to the copy buffer. Otherwise,
* we calculate the appropriate size to export.
* @return A unique pointer to the copy buffer
*/
std::unique_ptr<common::CircularBuffer<Recordable>> CopySpans(const bool was_force_flush_called);

/* The configured backend exporter */
std::unique_ptr<SpanExporter> exporter_;

/* The background worker thread */
std::unique_ptr<std::thread> worker_thread_;
std::thread worker_thread_;

/* Configurable parameters as per the official specs */
const std::chrono::milliseconds schedule_delay_millis_;
Expand All @@ -148,9 +128,9 @@ class BatchSpanProcessor : public SpanProcessor
std::unique_ptr<common::CircularBuffer<Recordable>> buffer_;

/* Important boolean flags to handle the workflow of the processor */
volatile bool is_shutdown_{false};
volatile bool is_force_flush_{false};
volatile bool is_force_flush_notified_{false};
std::atomic<bool> is_shutdown_{false};
std::atomic<bool> is_force_flush_{false};
std::atomic<bool> is_force_flush_notified_{false};
};

} // namespace trace
Expand Down
35 changes: 0 additions & 35 deletions sdk/src/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@

package(default_visibility = ["//visibility:public"])

cc_library(
name = "atomic_unique_ptr",
hdrs = [
"atomic_unique_ptr.h",
],
include_prefix = "src/common",
deps = [
"//api",
],
)

cc_library(
name = "random",
srcs = ["random.cc"],
Expand All @@ -38,27 +27,3 @@ cc_library(
"//sdk/src/common/platform:fork",
],
)

cc_library(
name = "circular_buffer_range",
hdrs = [
"circular_buffer_range.h",
],
include_prefix = "src/common",
deps = [
"//api",
],
)

cc_library(
name = "circular_buffer",
hdrs = [
"circular_buffer.h",
],
include_prefix = "src/common",
deps = [
":atomic_unique_ptr",
":circular_buffer_range",
"//api",
],
)
1 change: 0 additions & 1 deletion sdk/src/trace/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ cc_library(
deps = [
"//api",
"//sdk:headers",
"//sdk/src/common:circular_buffer",
],
)
Loading

0 comments on commit 9bf95d8

Please sign in to comment.