diff --git a/ext/include/opentelemetry/ext/zpages/latency_boundaries.h b/ext/include/opentelemetry/ext/zpages/latency_boundaries.h new file mode 100644 index 00000000000..0548406e183 --- /dev/null +++ b/ext/include/opentelemetry/ext/zpages/latency_boundaries.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include + +#include "opentelemetry/version.h" + +using std::chrono::microseconds; +using std::chrono::milliseconds; +using std::chrono::nanoseconds; +using std::chrono::seconds; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/** + * kLatencyBoundaries is a constant array that contains the 9 latency + * boundaries. Each value in the array represents the lower limit(inclusive) of + * the boundary(in nano seconds) and the upper limit(exclusive) of the boundary + * is the lower limit of the next one. The upper limit of the last boundary is + * INF. + */ +const std::array kLatencyBoundaries = { + nanoseconds(0), + nanoseconds(microseconds(10)), + nanoseconds(microseconds(100)), + nanoseconds(milliseconds(1)), + nanoseconds(milliseconds(10)), + nanoseconds(milliseconds(100)), + nanoseconds(seconds(1)), + nanoseconds(seconds(10)), + nanoseconds(seconds(100)), +}; + +/** + * LatencyBoundary enum is used to index into the kLatencyBoundaries container. + * Using this enum lets you access the latency boundary at each index without + * using magic numbers + */ +enum LatencyBoundary +{ + k0MicroTo10Micro, + k10MicroTo100Micro, + k100MicroTo1Milli, + k1MilliTo10Milli, + k10MilliTo100Milli, + k100MilliTo1Second, + k1SecondTo10Second, + k10SecondTo100Second, + k100SecondToMax +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/include/opentelemetry/ext/zpages/tracez_data.h b/ext/include/opentelemetry/ext/zpages/tracez_data.h new file mode 100644 index 00000000000..1e6696d8ade --- /dev/null +++ b/ext/include/opentelemetry/ext/zpages/tracez_data.h @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/ext/zpages/threadsafe_span_data.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/trace/span_data.h" +#include "opentelemetry/trace/canonical_code.h" +#include "opentelemetry/trace/span_id.h" +#include "opentelemetry/trace/trace_id.h" +#include "opentelemetry/version.h" + +using opentelemetry::ext::zpages::ThreadsafeSpanData; +using opentelemetry::trace::CanonicalCode; +using opentelemetry::trace::SpanId; +using opentelemetry::trace::TraceId; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +/** + * kMaxNumberOfSampleSpans is the maximum number of running, completed or error + * sample spans stored at any given time for a given span name. + * This limit is introduced to reduce memory usage by trimming sample spans + * stored. + */ +const int kMaxNumberOfSampleSpans = 5; + +/** + * TracezData is the data to be displayed for tracez zpages that is stored for + * each span name. + */ +struct TracezData +{ + /** + * TODO: At this time the maximum count is unknown but a larger data type + * might have to be used in the future to store these counts to avoid overflow + */ + unsigned int running_span_count; + unsigned int error_span_count; + + /** + * completed_span_count_per_latency_bucket is an array that stores the count + * of spans for each of the 9 latency buckets. + */ + std::array completed_span_count_per_latency_bucket; + + /** + * sample_latency_spans is an array of lists, each index of the array + * corresponds to a latency boundary(of which there are 9). + * The list in each index stores the sample spans for that latency boundary. + */ + std::array, kLatencyBoundaries.size()> sample_latency_spans; + + /** + * sample_error_spans is a list that stores the error samples for a span name. + */ + std::list sample_error_spans; + + /** + * sample_running_spans is a list that stores the running span samples for a + * span name. + */ + std::list sample_running_spans; + + TracezData() + { + running_span_count = 0; + error_span_count = 0; + completed_span_count_per_latency_bucket.fill(0); + } +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h b/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h new file mode 100644 index 00000000000..6d2d0a054e3 --- /dev/null +++ b/ext/include/opentelemetry/ext/zpages/tracez_data_aggregator.h @@ -0,0 +1,168 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "opentelemetry/ext/zpages/latency_boundaries.h" +#include "opentelemetry/ext/zpages/tracez_data.h" +#include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/trace/span_data.h" +#include "opentelemetry/trace/canonical_code.h" + +using opentelemetry::trace::CanonicalCode; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ +/** + * TracezDataAggregator object is responsible for collecting raw data and + * converting it to useful information that can be made available to + * display on the tracez zpage. + * + * When this object is created it starts a thread that calls a function + * periodically to update the aggregated data with new spans. + * + * The only exposed function is a getter that returns a copy of the aggregated + * data when requested. This function is ensured to be called in sequence to the + * aggregate spans function which is called periodically. + * + * TODO: Consider a singleton pattern for this class, not sure if multiple + * instances of this class should exist. + */ +class TracezDataAggregator +{ +public: + /** + * Constructor creates a thread that calls a function to aggregate span data + * at regular intervals. + * @param span_processor is the tracez span processor to be set + * @param update_interval the time duration for updating the aggregated data. + */ + TracezDataAggregator(std::shared_ptr span_processor, + milliseconds update_interval = milliseconds(10)); + + /** Ends the thread set up in the constructor and destroys the object **/ + ~TracezDataAggregator(); + + /** + * GetAggregatedTracezData returns a copy of the updated data. + * @returns a map with the span name as key and the tracez span data as value. + */ + std::map GetAggregatedTracezData(); + +private: + /** + * AggregateSpans is the function that is called to update the aggregated data + * with newly completed and running span data + */ + void AggregateSpans(); + + /** + * AggregateCompletedSpans is the function that is called to update the + * aggregation with the data of newly completed spans. + * @param completed_spans are the newly completed spans. + */ + void AggregateCompletedSpans(std::vector> &completed_spans); + + /** + * AggregateRunningSpans aggregates the data for all running spans received + * from the span processor. Running spans are not cleared by the span + * processor and multiple calls to this function may contain running spans for + * which data has already been collected in a previous call. Additionally, + * span names can change while span is running and there seems to be + * no trivial to way to know if it is a new or old running span so at every + * call to this function the available running span data is reset and + * recalculated. At this time there is no unique way to identify a span + * object once this is done, there might be some better ways to do this. + * TODO : SpanProcessor is never notified when a span name is changed while it + * is running and that is propogated to the data aggregator. The running span + * name if changed while it is running will not be updated in the data + * aggregator till the span is completed. + * @param running_spans is the running spans to be aggregated. + */ + void AggregateRunningSpans(std::unordered_set &running_spans); + + /** + * AggregateStatusOKSpans is the function called to update the data of spans + * with status code OK. + * @param ok_span is the span who's data is to be aggregated + */ + void AggregateStatusOKSpan(std::unique_ptr &ok_span); + + /** + * AggregateStatusErrorSpans is the function that is called to update the + * data of error spans + * @param error_span is the error span who's data is to be aggregated + */ + void AggregateStatusErrorSpan(std::unique_ptr &error_span); + + /** + * ClearRunningSpanData is a function that is used to clear all running span + * at the beginning of a call to AggregateSpan data. + * Running span data has to be cleared before aggregation because running + * span data is recalculated at every call to AggregateSpans. + */ + void ClearRunningSpanData(); + + /** + * FindLatencyBoundary finds the latency boundary to which the duration of + * the given span_data belongs to + * @ param span_data is the ThreadsafeSpanData whose duration for which the latency + * boundary is to be found + * @ returns LatencyBoundary is the latency boundary that the duration belongs + * to + */ + LatencyBoundary FindLatencyBoundary(std::unique_ptr &ok_span); + + /** + * InsertIntoSampleSpanList is a helper function that is called to insert + * a given span into a sample span list. A function is used for insertion + * because list size is to be limited at a set maximum. + * @param sample_spans the sample span list into which span is to be inserted + * @param span_data the span_data to be inserted into list + */ + void InsertIntoSampleSpanList(std::list &sample_spans, + ThreadsafeSpanData &span_data); + + /** Instance of span processor used to collect raw data **/ + std::shared_ptr tracez_span_processor_; + + /** + * Tree map with key being the name of the span and value being a unique ptr + * that stores the tracez span data for the given span name + * A tree map is preferred to a hash map because the the data is to be ordered + * in alphabetical order of span name. + * TODO : A possible memory concern if there are too many unique + * span names, one solution could be to implement a LRU cache that trims the + * DS based on frequency of usage of a span name. + */ + std::map aggregated_tracez_data_; + std::mutex mtx_; + + /** A boolean that is set to true in the constructor and false in the + * destructor to start and end execution of aggregate spans **/ + std::atomic execute_; + + /** Thread that executes aggregate spans at regurlar intervals during this + object's lifetime**/ + std::thread aggregate_spans_thread_; + + /** Condition variable that notifies the thread when object is about to be + destroyed **/ + std::condition_variable cv_; +}; + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/src/zpages/CMakeLists.txt b/ext/src/zpages/CMakeLists.txt index eeef261fba0..851ccd927d3 100644 --- a/ext/src/zpages/CMakeLists.txt +++ b/ext/src/zpages/CMakeLists.txt @@ -1,6 +1,8 @@ add_library( opentelemetry_zpages - tracez_processor.cc ../../include/opentelemetry/ext/zpages/tracez_processor.h) + tracez_processor.cc tracez_data_aggregator.cc + ../../include/opentelemetry/ext/zpages/tracez_processor.h + ../../include/opentelemetry/ext/zpages/tracez_data_aggregator.h) target_include_directories(opentelemetry_zpages PUBLIC ../../include) diff --git a/ext/src/zpages/tracez_data_aggregator.cc b/ext/src/zpages/tracez_data_aggregator.cc new file mode 100644 index 00000000000..478b38eb963 --- /dev/null +++ b/ext/src/zpages/tracez_data_aggregator.cc @@ -0,0 +1,185 @@ +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace ext +{ +namespace zpages +{ + +TracezDataAggregator::TracezDataAggregator(std::shared_ptr span_processor, + milliseconds update_interval) +{ + tracez_span_processor_ = span_processor; + + // Start a thread that calls AggregateSpans periodically or till notified. + execute_.store(true, std::memory_order_release); + aggregate_spans_thread_ = std::thread([this, update_interval]() { + while (execute_.load(std::memory_order_acquire)) + { + std::unique_lock lock(mtx_); + AggregateSpans(); + cv_.wait_for(lock, update_interval); + } + }); +} + +TracezDataAggregator::~TracezDataAggregator() +{ + // Notify and join the thread so object can be destroyed without wait for wake + if (execute_.load(std::memory_order_acquire)) + { + execute_.store(false, std::memory_order_release); + cv_.notify_one(); + aggregate_spans_thread_.join(); + } +} + +std::map TracezDataAggregator::GetAggregatedTracezData() +{ + std::unique_lock lock(mtx_); + return aggregated_tracez_data_; +} + +LatencyBoundary TracezDataAggregator::FindLatencyBoundary( + std::unique_ptr &span_data) +{ + const auto &span_data_duration = span_data->GetDuration(); + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size() - 1; boundary++) + { + if (span_data_duration < kLatencyBoundaries[boundary + 1]) + return (LatencyBoundary)boundary; + } + return LatencyBoundary::k100SecondToMax; +} + +void TracezDataAggregator::InsertIntoSampleSpanList(std::list &sample_spans, + ThreadsafeSpanData &span_data) +{ + /** + * Check to see if the sample span list size exceeds the set limit, if it does + * free up memory and remove the earliest inserted sample before appending + */ + if (sample_spans.size() == kMaxNumberOfSampleSpans) + { + sample_spans.pop_front(); + } + sample_spans.push_back(ThreadsafeSpanData(span_data)); +} + +void TracezDataAggregator::ClearRunningSpanData() +{ + auto it = aggregated_tracez_data_.begin(); + while (it != aggregated_tracez_data_.end()) + { + it->second.running_span_count = 0; + it->second.sample_running_spans.clear(); + + // Check if any data exists in the struct, if not delete entry + bool is_completed_span_count_zero = true; + for (const auto &completed_span_count : it->second.completed_span_count_per_latency_bucket) + { + if (completed_span_count > 0) + is_completed_span_count_zero = false; + } + + if (it->second.error_span_count == 0 && is_completed_span_count_zero) + { + it = aggregated_tracez_data_.erase(it); + } + else + { + ++it; + } + } +} + +void TracezDataAggregator::AggregateStatusOKSpan(std::unique_ptr &ok_span) +{ + // Find and update boundary of aggregated data that span belongs + auto boundary_name = FindLatencyBoundary(ok_span); + + // Get the data for name in aggrgation and update count and sample spans + auto &tracez_data = aggregated_tracez_data_.at(ok_span->GetName().data()); + InsertIntoSampleSpanList(tracez_data.sample_latency_spans[boundary_name], *ok_span.get()); + tracez_data.completed_span_count_per_latency_bucket[boundary_name]++; +} + +void TracezDataAggregator::AggregateStatusErrorSpan(std::unique_ptr &error_span) +{ + // Get data for name in aggregation and update count and sample spans + auto &tracez_data = aggregated_tracez_data_.at(error_span->GetName().data()); + InsertIntoSampleSpanList(tracez_data.sample_error_spans, *error_span.get()); + tracez_data.error_span_count++; +} + +void TracezDataAggregator::AggregateCompletedSpans( + std::vector> &completed_spans) +{ + for (auto &completed_span : completed_spans) + { + std::string span_name = completed_span->GetName().data(); + + if (aggregated_tracez_data_.find(span_name) == aggregated_tracez_data_.end()) + { + aggregated_tracez_data_[span_name] = TracezData(); + } + + if (completed_span->GetStatus() == CanonicalCode::OK) + AggregateStatusOKSpan(completed_span); + else + AggregateStatusErrorSpan(completed_span); + } +} + +void TracezDataAggregator::AggregateRunningSpans( + std::unordered_set &running_spans) +{ + for (auto &running_span : running_spans) + { + std::string span_name = running_span->GetName().data(); + + if (aggregated_tracez_data_.find(span_name) == aggregated_tracez_data_.end()) + { + aggregated_tracez_data_[span_name] = TracezData(); + } + + auto &tracez_data = aggregated_tracez_data_[span_name]; + InsertIntoSampleSpanList(aggregated_tracez_data_[span_name].sample_running_spans, + *running_span); + tracez_data.running_span_count++; + } +} + +void TracezDataAggregator::AggregateSpans() +{ + auto span_snapshot = tracez_span_processor_->GetSpanSnapshot(); + /** + * TODO: At this time in the project, there is no way of uniquely identifying + * a span(their id's are not being set yet). + * If in the future this is added then clearing of running spans will not bee + * required. + * For now this step of clearing and recalculating running span data is + * required because it is unkown which spans have moved from running to + * completed since the previous call. Additionally, the span name can change + * for spans while they are running. + * + * A better approach for identifying moved spans would have been to map + * span id to span name, find these span names in the aggregated data and then + * delete only this information for running span data as opposed to clearing + * all running span data. However this cannot be done at this time because, + * unique identifiers to span data have not been added yet. + * + * A few things to note: + * i) Duplicate running spans may be recieved from the span processor in one + * multiple successive calls to this function. + * ii) Only the newly completed spans are recieved by this function. + * Completed spans will not be seen more than once + **/ + ClearRunningSpanData(); + AggregateCompletedSpans(span_snapshot.completed); + AggregateRunningSpans(span_snapshot.running); +} + +} // namespace zpages +} // namespace ext +OPENTELEMETRY_END_NAMESPACE diff --git a/ext/test/zpages/BUILD b/ext/test/zpages/BUILD index 1e71b420d9b..07c287d2fa9 100644 --- a/ext/test/zpages/BUILD +++ b/ext/test/zpages/BUILD @@ -10,6 +10,18 @@ cc_test( ], ) +cc_test( + name = "tracez_data_aggregator_tests", + srcs = [ + "tracez_data_aggregator_test.cc", + ], + deps = [ + "//ext/src/zpages", + "//sdk/src/trace", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "tracez_processor_tests", srcs = [ diff --git a/ext/test/zpages/CMakeLists.txt b/ext/test/zpages/CMakeLists.txt index 6f45e656d3c..34c36373165 100644 --- a/ext/test/zpages/CMakeLists.txt +++ b/ext/test/zpages/CMakeLists.txt @@ -1,4 +1,5 @@ -foreach(testname tracez_processor_test threadsafe_span_data_test) +foreach(testname tracez_processor_test tracez_data_aggregator_test + threadsafe_span_data_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_zpages) diff --git a/ext/test/zpages/tracez_data_aggregator_test.cc b/ext/test/zpages/tracez_data_aggregator_test.cc new file mode 100644 index 00000000000..e1e3d132929 --- /dev/null +++ b/ext/test/zpages/tracez_data_aggregator_test.cc @@ -0,0 +1,665 @@ +#include "opentelemetry/ext/zpages/tracez_data_aggregator.h" + +#include + +#include "opentelemetry/ext/zpages/tracez_processor.h" +#include "opentelemetry/sdk/trace/recordable.h" +#include "opentelemetry/sdk/trace/tracer.h" + +using namespace opentelemetry::sdk::trace; +using namespace opentelemetry::ext::zpages; +namespace nostd = opentelemetry::nostd; +namespace common = opentelemetry::common; +using opentelemetry::core::SteadyTimestamp; +using opentelemetry::v0::trace::Span; + +const std::string span_name1 = "span 1"; +const std::string span_name2 = "span 2"; +const std::string span_name3 = "span 3"; + +/** + * TODO: Due to the absence of way to simulate the passing of time in the + * testing framework, synthetic delays had to be added in the tests to get the + * object in question to perform correctly. Later on if something like this is + * added the tests should be modified accordingly so that there is no external + * dependency. + * Additionally later on it would be better check for the span id(when set) + * rather than span name. + */ + +/** Test fixture for setting up the data aggregator and tracer for each test **/ +class TracezDataAggregatorTest : public ::testing::Test +{ +protected: + void SetUp() override + { + std::shared_ptr processor(new TracezSpanProcessor()); + tracer = std::shared_ptr(new Tracer(processor)); + tracez_data_aggregator = std::unique_ptr( + new TracezDataAggregator(processor, milliseconds(10))); + } + + std::unique_ptr tracez_data_aggregator; + std::shared_ptr tracer; +}; + +/** + * Helper function to check if the counts of running, error and latency spans + * match what is expected + */ +void VerifySpanCountsInTracezData( + const std::string &span_name, + const TracezData &aggregated_data, + unsigned int running_span_count, + unsigned int error_span_count, + std::array completed_span_count_per_latency_bucket) +{ + // Asserts are needed to check the size of the container because they may need + // to be checked and if size checks fail it must be stopped + EXPECT_EQ(aggregated_data.running_span_count, running_span_count) + << " Count of running spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.sample_running_spans.size(), + std::min(running_span_count, kMaxNumberOfSampleSpans)) + << " Size of sample running spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.error_span_count, error_span_count) + << " Count of error spans incorrect for " << span_name << "\n"; + + EXPECT_EQ(aggregated_data.sample_error_spans.size(), + std::min(error_span_count, kMaxNumberOfSampleSpans)) + << " Count of running spans incorrect for " << span_name << "\n"; + + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + EXPECT_EQ(aggregated_data.completed_span_count_per_latency_bucket[boundary], + completed_span_count_per_latency_bucket[boundary]) + << " Count of completed spans in latency boundary " << boundary << " incorrect for " + << span_name << "\n"; + EXPECT_EQ(aggregated_data.sample_latency_spans[boundary].size(), + std::min(completed_span_count_per_latency_bucket[boundary], + kMaxNumberOfSampleSpans)) + << " Count of sample completed spans in latency boundary " << boundary << " incorrect for " + << span_name << "\n"; + } +} + +/**************************** No Span Test ************************************/ + +/** Test to check if data aggregator works as expected when there are no spans + * **/ +TEST_F(TracezDataAggregatorTest, NoSpans) +{ + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 0); +} + +/*********************** Single span tests ************************************/ + +/** Test to check if data aggregator works as expected when there are + * is exactly a single running span **/ +TEST_F(TracezDataAggregatorTest, SingleRunningSpan) +{ + // Start the span get the data + auto span_first = tracer->StartSpan(span_name1); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + // Check to see if span name exists + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + auto &aggregated_data = data.at(span_name1); + + // Verify span counts then content of spans + VerifySpanCountsInTracezData(span_name1, aggregated_data, 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), 1); + ASSERT_EQ(aggregated_data.sample_running_spans.front().GetName().data(), span_name1); +} + +/** Test to check if data aggregator works as expected when there is exactly one + * completed span **/ +TEST_F(TracezDataAggregatorTest, SingleCompletedSpan) +{ + // Start and end the span at a specified times + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + tracer->StartSpan(span_name1, start)->End(end); + + // Get the data and make sure span name exists in the data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + auto &aggregated_data = data.at(span_name1); + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check if the span is correctly updated in the first boundary + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro].size(), 1); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +/** Test to check if data aggregator works as expected when there is exactly + * one error span **/ +TEST_F(TracezDataAggregatorTest, SingleErrorSpan) +{ + // Start and end a single error span + tracer->StartSpan(span_name1) + ->SetStatus(opentelemetry::trace::CanonicalCode::CANCELLED, "span cancelled"); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + // Check to see if span name can be found in aggregation + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + auto &aggregated_data = data.at(span_name1); + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 1, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check the value of the error span introduced + ASSERT_EQ(aggregated_data.sample_error_spans.size(), 1); + ASSERT_EQ(aggregated_data.sample_error_spans.front().GetName().data(), span_name1); +} + +/************************* Multiple span tests ********************************/ + +/** Test to check if multiple running spans behaves as expected**/ +TEST_F(TracezDataAggregatorTest, MultipleRunningSpans) +{ + // A container that maps a span name to the number of spans to start with that + // span name + std::unordered_map running_span_name_to_count({ + {span_name1, 1}, + {span_name2, 2}, + {span_name3, 3}, + }); + + // Start and store spans based on the above map + std::vector> running_span_container; + for (auto span_name : running_span_name_to_count) + { + for (int count = 0; count < span_name.second; count++) + running_span_container.push_back(tracer->StartSpan(span_name.first)); + } + + // give time for aggregation and then get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), running_span_name_to_count.size()); + + // Check to see if the running span counts were updated correctly + for (auto &span_name : running_span_name_to_count) + { + ASSERT_TRUE(data.find(span_name.first) != data.end()); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_name.first, data.at(span_name.first), span_name.second, 0, + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(data.at(span_name.first).sample_running_spans.size(), span_name.second); + for (auto &span_sample : data.at(span_name.first).sample_running_spans) + { + ASSERT_EQ(span_sample.GetName().data(), span_name.first); + } + } +} + +/** Test to check if multiple completed spans updates the aggregated data + * correctly **/ +TEST_F(TracezDataAggregatorTest, MultipleCompletedSpan) +{ + // Start spans with span name and the corresponding durations in one of the 9 + // latency buckets + const std::unordered_map>> + span_name_to_duration( + {{span_name1, {{nanoseconds(10), nanoseconds(4600)}, {}, {}, {}, {}, {}, {}, {}, {}}}, + {span_name2, + {{}, + {nanoseconds(38888), nanoseconds(98768)}, + {nanoseconds(983251)}, + {}, + {}, + {}, + {}, + {}, + {}}}, + {span_name3, + {{}, + {}, + {}, + {nanoseconds(1234567), nanoseconds(1234567)}, + {}, + {}, + {}, + {}, + {nanoseconds(9999999999999)}}}}); + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + for (auto &span : span_name_to_duration) + { + for (auto &buckets : span.second) + { + for (auto &duration : buckets) + { + long long int end_time = duration.count() + 1; + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + end.end_steady_time = SteadyTimestamp(nanoseconds(end_time)); + tracer->StartSpan(span.first, start)->End(end); + } + } + } + + // Give time for aggregation and get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), span_name_to_duration.size()); + + for (auto &span : span_name_to_duration) + { + ASSERT_TRUE(data.find(span.first) != data.end()); + auto &aggregated_data = data.at(span.first); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData( + span.first, aggregated_data, 0, 0, + {(unsigned int)span.second[0].size(), (unsigned int)span.second[1].size(), + (unsigned int)span.second[2].size(), (unsigned int)span.second[3].size(), + (unsigned int)span.second[4].size(), (unsigned int)span.second[5].size(), + (unsigned int)span.second[6].size(), (unsigned int)span.second[7].size(), + (unsigned int)span.second[8].size()}); + + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + ASSERT_EQ(aggregated_data.sample_latency_spans[boundary].size(), + span.second[boundary].size()); + auto latency_sample = aggregated_data.sample_latency_spans[boundary].begin(); + for (unsigned int idx = 0; idx < span.second[boundary].size(); idx++) + { + ASSERT_EQ(span.second[boundary][idx].count(), latency_sample->GetDuration().count()); + latency_sample = std::next(latency_sample); + } + } + } +} + +/** + * This test checks to see if the aggregated data is updated correctly + * when there are multiple error spans. + * It checks both the count of error spans and the error samples + */ +TEST_F(TracezDataAggregatorTest, MultipleErrorSpans) +{ + // Container to store the span names --> error messges for the span name + std::unordered_map> span_name_to_error( + {{span_name1, {"span 1 error"}}, + {span_name2, {"span 2 error 1", "span 2 error 2"}}, + {span_name3, + {"span 3 error 1", "span 3 error 2", "span 3 error 3", "span 3 error 4", + "span 3 error 5"}}}); + + // Start spans with the error messages based on the map + for (auto &span_error : span_name_to_error) + { + for (auto error_desc : span_error.second) + tracer->StartSpan(span_error.first) + ->SetStatus(opentelemetry::trace::CanonicalCode::CANCELLED, error_desc); + } + + // Give some time and then get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), span_name_to_error.size()); + + // Check if error spans were updated correctly for the different span names + for (auto &span_error : span_name_to_error) + { + // First try to find the span name in aggregation, then check the count of + // the error spans and then check values + ASSERT_TRUE(data.find(span_error.first) != data.end()); + + auto &aggregated_data = data.at(span_error.first); + + // Make sure counts of spans are in order + VerifySpanCountsInTracezData(span_error.first, aggregated_data, 0, span_error.second.size(), + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.error_span_count, span_error.second.size()); + + auto error_sample = aggregated_data.sample_error_spans.begin(); + for (unsigned int idx = 0; idx < span_error.second.size(); idx++) + { + ASSERT_EQ(span_error.second[idx], error_sample->GetDescription()); + error_sample = std::next(error_sample); + } + } +} + +/************************ Sample spans tests **********************************/ + +/** + * This test checks to see that the maximum number of running samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was recieved + */ +TEST_F(TracezDataAggregatorTest, RunningSampleSpansOverCapacity) +{ + int running_span_count = 6; + // Start and store spans based on the above map + std::vector> running_span_container; + for (int count = 0; count < running_span_count; count++) + running_span_container.push_back(tracer->StartSpan(span_name1)); + + std::this_thread::sleep_for(milliseconds(500)); + // Fetch data and check if span name is spresent + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + // Check if error spans are updated according to spans started + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 6, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), kMaxNumberOfSampleSpans); +} + +/** + * This test checks to see that the maximum number of error samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was recieved + */ +TEST_F(TracezDataAggregatorTest, ErrorSampleSpansOverCapacity) +{ + // Create error spans with the descriptions in the vector + std::vector span_error_descriptions = {"error span 1", "error span 2", + "error span 3", "error span 4", + "error span 5", "error span 6"}; + for (auto span_error_description : span_error_descriptions) + tracer->StartSpan(span_name1) + ->SetStatus(opentelemetry::trace::CanonicalCode::CANCELLED, span_error_description); + + std::this_thread::sleep_for(milliseconds(500)); + + // Fetch data and check if span name is spresent + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + + // Check if error spans are updated according to spans started + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, span_error_descriptions.size(), + {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check if the latest 5 error spans exist out of the total 6 that were + // introduced + auto error_sample = aggregated_data.sample_error_spans.begin(); + for (unsigned int idx = 1; idx < span_error_descriptions.size(); idx++) + { + ASSERT_EQ(error_sample->GetDescription(), span_error_descriptions[idx]); + error_sample = std::next(error_sample); + } +} + +/** + * This test checks to see that the maximum number of latency samples(5) for a + * bucket is not exceeded. If there are more spans than this for a single bucket + * it removes the earliest span that was recieved + */ +TEST_F(TracezDataAggregatorTest, CompletedSampleSpansOverCapacity) +{ + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + + // Start and end 6 spans with the same name that fall into the first latency + // bucket + std::vector> timestamps = { + make_pair(nanoseconds(10), nanoseconds(100)), + make_pair(nanoseconds(1), nanoseconds(10000)), + make_pair(nanoseconds(1000), nanoseconds(3000)), + make_pair(nanoseconds(12), nanoseconds(12)), + make_pair(nanoseconds(10), nanoseconds(5000)), + make_pair(nanoseconds(10), nanoseconds(60))}; + for (auto timestamp : timestamps) + { + start.start_steady_time = SteadyTimestamp(timestamp.first); + end.end_steady_time = SteadyTimestamp(timestamp.second); + tracer->StartSpan(span_name1, start)->End(end); + } + + // Give some time and get data + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, + {(unsigned int)timestamps.size(), 0, 0, 0, 0, 0, 0, 0, 0}); + + // Check the count of completed spans in the buckets and the samples stored + auto latency_sample = + aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro].begin(); + + // idx starts from 1 and not 0 because there are 6 completed spans in the same + // bucket the and the first one is removed + for (unsigned int idx = 1; idx < timestamps.size(); idx++) + { + ASSERT_EQ(latency_sample->GetDuration().count(), + timestamps[idx].second.count() - timestamps[idx].first.count()); + latency_sample = std::next(latency_sample); + } +} + +/************************* Miscellaneous tests ********************************/ + +/** Test to see if the span names are in alphabetical order **/ +TEST_F(TracezDataAggregatorTest, SpanNameInAlphabeticalOrder) +{ + std::vector span_names = {span_name1, span_name2, span_name3}; + + auto span_first = tracer->StartSpan(span_name2); + tracer->StartSpan(span_name1)->End(); + tracer->StartSpan(span_name3) + ->SetStatus(opentelemetry::trace::CanonicalCode::CANCELLED, "span cancelled"); + std::this_thread::sleep_for(milliseconds(500)); + // Get data and check if span name exists in aggregation + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), span_names.size()); + + int span_names_idx = 0; + for (auto &spans : data) + { + ASSERT_EQ(spans.first, span_names[span_names_idx]); + span_names_idx++; + } +} + +/** This test checks to see that there is no double counting of running spans + * when get aggregated data is called twice**/ +TEST_F(TracezDataAggregatorTest, AdditionToRunningSpans) +{ + // Start a span and check the data + auto span_first = tracer->StartSpan(span_name1); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + // Start another span and check to see if there is no double counting of spans + auto span_second = tracer->StartSpan(span_name1); + + // Give some time and get updated data + std::this_thread::sleep_for(milliseconds(500)); + data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 2, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_EQ(aggregated_data.sample_running_spans.size(), 2); + for (auto &sample_span : aggregated_data.sample_running_spans) + { + ASSERT_EQ(sample_span.GetName().data(), span_name1); + } +} + +/** This test checks to see that once a running span is completed it the + * aggregated data is updated correctly **/ +TEST_F(TracezDataAggregatorTest, RemovalOfRunningSpanWhenCompleted) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + + // Start a span and make sure data is updated + auto span_first = tracer->StartSpan(span_name1, start); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + // End the span and make sure running span is removed and completed span is + // updated, there should be only one completed span + span_first->End(end); + std::this_thread::sleep_for(milliseconds(500)); + + // Make sure sample span still exists before next aggregation + ASSERT_TRUE(data.find(span_name1) != data.end()); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + + data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + // Check if completed span fields are correctly updated + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +TEST_F(TracezDataAggregatorTest, RunningSpanChangesNameBeforeCompletion) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(10)); + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(40)); + + // Start a span and make sure data is updated + auto span_first = tracer->StartSpan(span_name1, start); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName().data(), span_name1); + + // End the span and make sure running span is removed and completed span is + // updated, there should be only one completed span + span_first->UpdateName(span_name2); + span_first->End(end); + + // Check if sample span is present before fetching updated data + std::this_thread::sleep_for(milliseconds(500)); + ASSERT_TRUE(data.find(span_name1) != data.end()); + ASSERT_EQ(data.at(span_name1).sample_running_spans.front().GetName(), span_name1); + + data = tracez_data_aggregator->GetAggregatedTracezData(); + + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name2) != data.end()); + + // Check if completed span fields are correctly updated + auto &aggregated_data = data.at(span_name2); + VerifySpanCountsInTracezData(span_name2, aggregated_data, 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + ASSERT_EQ(aggregated_data.sample_latency_spans[LatencyBoundary::k0MicroTo10Micro] + .front() + .GetDuration() + .count(), + 30); +} + +/** Test to check if the span latencies with duration at the edge of boundaries + * fall in the correct bucket **/ +TEST_F(TracezDataAggregatorTest, EdgeSpanLatenciesFallInCorrectBoundaries) +{ + opentelemetry::trace::StartSpanOptions start; + opentelemetry::trace::EndSpanOptions end; + + // Start and end 6 spans with the same name that fall into the first latency + // bucket + std::vector durations = { + nanoseconds(0), nanoseconds(10000), nanoseconds(100000), + nanoseconds(1000000), nanoseconds(10000000), nanoseconds(100000000), + nanoseconds(1000000000), nanoseconds(10000000000), nanoseconds(100000000000)}; + for (auto duration : durations) + { + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + end.end_steady_time = SteadyTimestamp(nanoseconds(duration.count() + 1)); + tracer->StartSpan(span_name1, start)->End(end); + } + + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_EQ(data.size(), 1); + ASSERT_TRUE(data.find(span_name1) != data.end()); + + std::this_thread::sleep_for(milliseconds(500)); + auto &aggregated_data = data.at(span_name1); + VerifySpanCountsInTracezData(span_name1, aggregated_data, 0, 0, {1, 1, 1, 1, 1, 1, 1, 1, 1}); + + // Check if the latency boundary is updated correctly + for (unsigned int boundary = 0; boundary < kLatencyBoundaries.size(); boundary++) + { + ASSERT_EQ(aggregated_data.sample_latency_spans[boundary].front().GetDuration().count(), + durations[boundary].count()); + } +} + +/** This test makes sure that the data is consistent when there are multiple + * calls to the data aggegator with no change in data **/ +TEST_F(TracezDataAggregatorTest, NoChangeInBetweenCallsToAggregator) +{ + opentelemetry::trace::StartSpanOptions start; + start.start_steady_time = SteadyTimestamp(nanoseconds(1)); + + opentelemetry::trace::EndSpanOptions end; + end.end_steady_time = SteadyTimestamp(nanoseconds(1)); + + tracer->StartSpan(span_name1, start)->End(end); + auto running_span = tracer->StartSpan(span_name2); + tracer->StartSpan(span_name3) + ->SetStatus(opentelemetry::trace::CanonicalCode::CANCELLED, "span cancelled"); + std::this_thread::sleep_for(milliseconds(500)); + auto data = tracez_data_aggregator->GetAggregatedTracezData(); + std::this_thread::sleep_for(milliseconds(500)); + // Get data and check if span name exists in aggregation + data = tracez_data_aggregator->GetAggregatedTracezData(); + ASSERT_TRUE(data.find(span_name1) != data.end()); + VerifySpanCountsInTracezData(span_name1, data.at(span_name1), 0, 0, {1, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_TRUE(data.find(span_name2) != data.end()); + VerifySpanCountsInTracezData(span_name2, data.at(span_name2), 1, 0, {0, 0, 0, 0, 0, 0, 0, 0, 0}); + + ASSERT_TRUE(data.find(span_name3) != data.end()); + VerifySpanCountsInTracezData(span_name3, data.at(span_name3), 0, 1, {0, 0, 0, 0, 0, 0, 0, 0, 0}); +}