Skip to content

Commit

Permalink
zPages: Source and Test files for Tracez DataAggregator (open-telemet…
Browse files Browse the repository at this point in the history
  • Loading branch information
kmanghat authored Aug 4, 2020
1 parent a34106a commit 102567e
Show file tree
Hide file tree
Showing 8 changed files with 1,175 additions and 2 deletions.
57 changes: 57 additions & 0 deletions ext/include/opentelemetry/ext/zpages/latency_boundaries.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <array>
#include <chrono>

#include "opentelemetry/version.h"

using std::chrono::microseconds;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
using std::chrono::seconds;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{
/**
* kLatencyBoundaries is a constant array that contains the 9 latency
* boundaries. Each value in the array represents the lower limit(inclusive) of
* the boundary(in nano seconds) and the upper limit(exclusive) of the boundary
* is the lower limit of the next one. The upper limit of the last boundary is
* INF.
*/
const std::array<nanoseconds, 9> kLatencyBoundaries = {
nanoseconds(0),
nanoseconds(microseconds(10)),
nanoseconds(microseconds(100)),
nanoseconds(milliseconds(1)),
nanoseconds(milliseconds(10)),
nanoseconds(milliseconds(100)),
nanoseconds(seconds(1)),
nanoseconds(seconds(10)),
nanoseconds(seconds(100)),
};

/**
* LatencyBoundary enum is used to index into the kLatencyBoundaries container.
* Using this enum lets you access the latency boundary at each index without
* using magic numbers
*/
enum LatencyBoundary
{
k0MicroTo10Micro,
k10MicroTo100Micro,
k100MicroTo1Milli,
k1MilliTo10Milli,
k10MilliTo100Milli,
k100MilliTo1Second,
k1SecondTo10Second,
k10SecondTo100Second,
k100SecondToMax
};

} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
83 changes: 83 additions & 0 deletions ext/include/opentelemetry/ext/zpages/tracez_data.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#pragma once

#include <array>
#include <iostream>
#include <list>
#include <string>

#include "opentelemetry/ext/zpages/threadsafe_span_data.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/trace/span_data.h"
#include "opentelemetry/trace/canonical_code.h"
#include "opentelemetry/trace/span_id.h"
#include "opentelemetry/trace/trace_id.h"
#include "opentelemetry/version.h"

using opentelemetry::ext::zpages::ThreadsafeSpanData;
using opentelemetry::trace::CanonicalCode;
using opentelemetry::trace::SpanId;
using opentelemetry::trace::TraceId;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{

/**
* kMaxNumberOfSampleSpans is the maximum number of running, completed or error
* sample spans stored at any given time for a given span name.
* This limit is introduced to reduce memory usage by trimming sample spans
* stored.
*/
const int kMaxNumberOfSampleSpans = 5;

/**
* TracezData is the data to be displayed for tracez zpages that is stored for
* each span name.
*/
struct TracezData
{
/**
* TODO: At this time the maximum count is unknown but a larger data type
* might have to be used in the future to store these counts to avoid overflow
*/
unsigned int running_span_count;
unsigned int error_span_count;

/**
* completed_span_count_per_latency_bucket is an array that stores the count
* of spans for each of the 9 latency buckets.
*/
std::array<unsigned int, kLatencyBoundaries.size()> completed_span_count_per_latency_bucket;

/**
* sample_latency_spans is an array of lists, each index of the array
* corresponds to a latency boundary(of which there are 9).
* The list in each index stores the sample spans for that latency boundary.
*/
std::array<std::list<ThreadsafeSpanData>, kLatencyBoundaries.size()> sample_latency_spans;

/**
* sample_error_spans is a list that stores the error samples for a span name.
*/
std::list<ThreadsafeSpanData> sample_error_spans;

/**
* sample_running_spans is a list that stores the running span samples for a
* span name.
*/
std::list<ThreadsafeSpanData> sample_running_spans;

TracezData()
{
running_span_count = 0;
error_span_count = 0;
completed_span_count_per_latency_bucket.fill(0);
}
};

} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
168 changes: 168 additions & 0 deletions ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#pragma once

#include <array>
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>

#include "opentelemetry/ext/zpages/latency_boundaries.h"
#include "opentelemetry/ext/zpages/tracez_data.h"
#include "opentelemetry/ext/zpages/tracez_processor.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/sdk/trace/span_data.h"
#include "opentelemetry/trace/canonical_code.h"

using opentelemetry::trace::CanonicalCode;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace ext
{
namespace zpages
{
/**
* TracezDataAggregator object is responsible for collecting raw data and
* converting it to useful information that can be made available to
* display on the tracez zpage.
*
* When this object is created it starts a thread that calls a function
* periodically to update the aggregated data with new spans.
*
* The only exposed function is a getter that returns a copy of the aggregated
* data when requested. This function is ensured to be called in sequence to the
* aggregate spans function which is called periodically.
*
* TODO: Consider a singleton pattern for this class, not sure if multiple
* instances of this class should exist.
*/
class TracezDataAggregator
{
public:
/**
* Constructor creates a thread that calls a function to aggregate span data
* at regular intervals.
* @param span_processor is the tracez span processor to be set
* @param update_interval the time duration for updating the aggregated data.
*/
TracezDataAggregator(std::shared_ptr<TracezSpanProcessor> span_processor,
milliseconds update_interval = milliseconds(10));

/** Ends the thread set up in the constructor and destroys the object **/
~TracezDataAggregator();

/**
* GetAggregatedTracezData returns a copy of the updated data.
* @returns a map with the span name as key and the tracez span data as value.
*/
std::map<std::string, TracezData> GetAggregatedTracezData();

private:
/**
* AggregateSpans is the function that is called to update the aggregated data
* with newly completed and running span data
*/
void AggregateSpans();

/**
* AggregateCompletedSpans is the function that is called to update the
* aggregation with the data of newly completed spans.
* @param completed_spans are the newly completed spans.
*/
void AggregateCompletedSpans(std::vector<std::unique_ptr<ThreadsafeSpanData>> &completed_spans);

/**
* AggregateRunningSpans aggregates the data for all running spans received
* from the span processor. Running spans are not cleared by the span
* processor and multiple calls to this function may contain running spans for
* which data has already been collected in a previous call. Additionally,
* span names can change while span is running and there seems to be
* no trivial to way to know if it is a new or old running span so at every
* call to this function the available running span data is reset and
* recalculated. At this time there is no unique way to identify a span
* object once this is done, there might be some better ways to do this.
* TODO : SpanProcessor is never notified when a span name is changed while it
* is running and that is propogated to the data aggregator. The running span
* name if changed while it is running will not be updated in the data
* aggregator till the span is completed.
* @param running_spans is the running spans to be aggregated.
*/
void AggregateRunningSpans(std::unordered_set<ThreadsafeSpanData *> &running_spans);

/**
* AggregateStatusOKSpans is the function called to update the data of spans
* with status code OK.
* @param ok_span is the span who's data is to be aggregated
*/
void AggregateStatusOKSpan(std::unique_ptr<ThreadsafeSpanData> &ok_span);

/**
* AggregateStatusErrorSpans is the function that is called to update the
* data of error spans
* @param error_span is the error span who's data is to be aggregated
*/
void AggregateStatusErrorSpan(std::unique_ptr<ThreadsafeSpanData> &error_span);

/**
* ClearRunningSpanData is a function that is used to clear all running span
* at the beginning of a call to AggregateSpan data.
* Running span data has to be cleared before aggregation because running
* span data is recalculated at every call to AggregateSpans.
*/
void ClearRunningSpanData();

/**
* FindLatencyBoundary finds the latency boundary to which the duration of
* the given span_data belongs to
* @ param span_data is the ThreadsafeSpanData whose duration for which the latency
* boundary is to be found
* @ returns LatencyBoundary is the latency boundary that the duration belongs
* to
*/
LatencyBoundary FindLatencyBoundary(std::unique_ptr<ThreadsafeSpanData> &ok_span);

/**
* InsertIntoSampleSpanList is a helper function that is called to insert
* a given span into a sample span list. A function is used for insertion
* because list size is to be limited at a set maximum.
* @param sample_spans the sample span list into which span is to be inserted
* @param span_data the span_data to be inserted into list
*/
void InsertIntoSampleSpanList(std::list<ThreadsafeSpanData> &sample_spans,
ThreadsafeSpanData &span_data);

/** Instance of span processor used to collect raw data **/
std::shared_ptr<TracezSpanProcessor> tracez_span_processor_;

/**
* Tree map with key being the name of the span and value being a unique ptr
* that stores the tracez span data for the given span name
* A tree map is preferred to a hash map because the the data is to be ordered
* in alphabetical order of span name.
* TODO : A possible memory concern if there are too many unique
* span names, one solution could be to implement a LRU cache that trims the
* DS based on frequency of usage of a span name.
*/
std::map<std::string, TracezData> aggregated_tracez_data_;
std::mutex mtx_;

/** A boolean that is set to true in the constructor and false in the
* destructor to start and end execution of aggregate spans **/
std::atomic<bool> execute_;

/** Thread that executes aggregate spans at regurlar intervals during this
object's lifetime**/
std::thread aggregate_spans_thread_;

/** Condition variable that notifies the thread when object is about to be
destroyed **/
std::condition_variable cv_;
};

} // namespace zpages
} // namespace ext
OPENTELEMETRY_END_NAMESPACE
4 changes: 3 additions & 1 deletion ext/src/zpages/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
add_library(
opentelemetry_zpages
tracez_processor.cc ../../include/opentelemetry/ext/zpages/tracez_processor.h)
tracez_processor.cc tracez_data_aggregator.cc
../../include/opentelemetry/ext/zpages/tracez_processor.h
../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h)

target_include_directories(opentelemetry_zpages PUBLIC ../../include)

Expand Down
Loading

0 comments on commit 102567e

Please sign in to comment.