Skip to content

Commit

Permalink
Create shared context for updating span pipeline from TracerProvider …
Browse files Browse the repository at this point in the history
…and affecting Tracer. (#650)
  • Loading branch information
jsuereth authored Apr 16, 2021
1 parent b0f27a4 commit 7b07013
Show file tree
Hide file tree
Showing 26 changed files with 327 additions and 267 deletions.
4 changes: 2 additions & 2 deletions examples/batch/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ void initTracer()
// We export `kNumSpans` after every `schedule_delay_millis` milliseconds.
options.max_export_batch_size = kNumSpans;

auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::BatchSpanProcessor(std::move(exporter), options));

auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor));
new sdktrace::TracerProvider(std::move(processor)));
// Set the global trace provider.
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
42 changes: 42 additions & 0 deletions examples/http/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
cc_binary(
name = "example_http_client",
srcs = [
"client.cc",
"tracer_common.hpp",
],
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
],
"//conditions:default": [],
}),
deps = [
"//api",
"//exporters/ostream:ostream_span_exporter",
"//ext:headers",
"//ext/src/http/client/curl:http_client_curl",
"//sdk/src/trace",
],
)

cc_binary(
name = "example_http_server",
srcs = [
"server.cc",
"server.hpp",
"tracer_common.hpp",
],
deps = [
"//api",
"//exporters/ostream:ostream_span_exporter",
"//ext:headers",
"//sdk/src/trace",
],
)
9 changes: 5 additions & 4 deletions examples/http/tracer_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace {
void initTracer() {
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({}),
std::make_shared<opentelemetry::sdk::trace::AlwaysOnSampler>()));
// Default is an always-on sampler.
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(context));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
7 changes: 4 additions & 3 deletions examples/multithreaded/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({})));
auto provider =
nostd::shared_ptr<opentelemetry::trace::TracerProvider>(new sdktrace::TracerProvider(
std::move(processor), opentelemetry::sdk::resource::Resource::Create({})));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 3 additions & 2 deletions examples/otlp/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ void InitTracer()
{
// Create OTLP exporter instance
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(new otlp::OtlpExporter(opts));
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<trace::TracerProvider>(new sdktrace::TracerProvider(processor));
auto provider =
nostd::shared_ptr<trace::TracerProvider>(new sdktrace::TracerProvider(std::move(processor)));
// Set the global trace provider
trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 2 additions & 3 deletions examples/simple/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor, opentelemetry::sdk::resource::Resource::Create({}),
std::make_shared<opentelemetry::sdk::trace::AlwaysOnSampler>()));
new sdktrace::TracerProvider(std::move(processor)));

// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/test/otlp_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ TEST_F(OtlpExporterTestPeer, ExportIntegrationTest)

auto exporter = GetExporter(stub_interface);

auto processor = std::shared_ptr<sdk::trace::SpanProcessor>(
auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::SimpleSpanProcessor(std::move(exporter)));
auto provider =
nostd::shared_ptr<trace::TracerProvider>(new sdk::trace::TracerProvider(processor));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
new sdk::trace::TracerProvider(std::move(processor)));
auto tracer = provider->GetTracer("test");

EXPECT_CALL(*mock_stub, Export(_, _, _))
Expand Down
5 changes: 3 additions & 2 deletions ext/include/opentelemetry/ext/zpages/zpages.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ class ZPages
/** Replaces the global tracer provider with an instance that exports to tracez. */
void ReplaceGlobalProvider()
{
std::shared_ptr<opentelemetry::sdk::trace::SpanProcessor> tracez_processor(
// GCC 4.8 can't infer the type coercion.
std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor> processor(
MakeSpanProcessor().release());
auto tracez_provider_ = opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(tracez_processor));
new opentelemetry::sdk::trace::TracerProvider(std::move(processor)));
opentelemetry::trace::Provider::SetTracerProvider(tracez_provider_);
}

Expand Down
14 changes: 14 additions & 0 deletions ext/src/http/client/curl/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@ cc_library(
"http_client_curl.cc",
"http_client_factory_curl.cc",
],
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
include_prefix = "src/http/client/curl",
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
"-DEFAULTLIB:Ws2_32.lib",
],
"//conditions:default": [],
}),
deps = [
"//api",
"//ext:headers",
Expand Down
13 changes: 0 additions & 13 deletions ext/test/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@ cc_test(
srcs = [
"curl_http_test.cc",
],
# TODO: Move copts/linkopts for static CURL usage into shared bzl file.
copts = [
"-DCURL_STATICLIB",
"-DWITH_CURL",
],
linkopts = select({
"//bazel:windows": [
"-DEFAULTLIB:advapi32.lib",
"-DEFAULTLIB:crypt32.lib",
"-DEFAULTLIB:Normaliz.lib",
],
"//conditions:default": [],
}),
deps = [
"//ext:headers",
"//ext/src/http/client/curl:http_client_curl",
Expand Down
5 changes: 3 additions & 2 deletions ext/test/w3c_tracecontext_test/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ void initTracer()
{
auto exporter = std::unique_ptr<sdktrace::SpanExporter>(
new opentelemetry::exporter::trace::OStreamSpanExporter);
auto processor = std::shared_ptr<sdktrace::SpanProcessor>(
auto processor = std::unique_ptr<sdktrace::SpanProcessor>(
new sdktrace::SimpleSpanProcessor(std::move(exporter)));
auto context = std::make_shared<sdktrace::TracerContext>(std::move(processor));
auto provider = nostd::shared_ptr<opentelemetry::trace::TracerProvider>(
new sdktrace::TracerProvider(processor));
new sdktrace::TracerProvider(context));
// Set the global trace provider
opentelemetry::trace::Provider::SetTracerProvider(provider);
}
Expand Down
5 changes: 3 additions & 2 deletions ext/test/zpages/tracez_data_aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class TracezDataAggregatorTest : public ::testing::Test
void SetUp() override
{
std::shared_ptr<TracezSharedData> shared_data(new TracezSharedData());
std::shared_ptr<TracezSpanProcessor> processor(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
tracez_data_aggregator = std::unique_ptr<TracezDataAggregator>(
new TracezDataAggregator(shared_data, milliseconds(10)));
}
Expand Down
6 changes: 5 additions & 1 deletion ext/test/zpages/tracez_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ class TracezProcessor : public ::testing::Test
shared_data = std::shared_ptr<TracezSharedData>(new TracezSharedData());
processor = std::shared_ptr<TracezSpanProcessor>(new TracezSpanProcessor(shared_data));
auto resource = opentelemetry::sdk::resource::Resource::Create({});
// Note: we make a *different* processor for the tracercontext. THis is because
// all the tests use shared data, and we want to make sure this works correctly.
auto context = std::make_shared<TracerContext>(
std::unique_ptr<SpanProcessor>(new TracezSpanProcessor(shared_data)), resource);

tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(processor, resource));
tracer = std::shared_ptr<opentelemetry::trace::Tracer>(new Tracer(context));
auto spans = shared_data->GetSpanSnapshot();
running = spans.running;
completed = std::move(spans.completed);
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/common/atomic_unique_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class AtomicUniquePtr
public:
AtomicUniquePtr() noexcept {}

explicit AtomicUniquePtr(std::unique_ptr<T> &&other) noexcept : ptr_(other.release()) {}

~AtomicUniquePtr() noexcept { Reset(); }

T &operator*() const noexcept { return *Get(); }
Expand Down
40 changes: 10 additions & 30 deletions sdk/include/opentelemetry/sdk/trace/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/samplers/always_on.h"
#include "opentelemetry/sdk/trace/tracer_context.h"
#include "opentelemetry/trace/noop.h"
#include "opentelemetry/trace/tracer.h"
#include "opentelemetry/version.h"
Expand All @@ -18,33 +19,8 @@ namespace trace
class Tracer final : public trace_api::Tracer, public std::enable_shared_from_this<Tracer>
{
public:
/**
* Initialize a new tracer.
* @param processor The span processor for this tracer. This must not be a
* nullptr.
*/
explicit Tracer(std::shared_ptr<SpanProcessor> processor,
const opentelemetry::sdk::resource::Resource &resource,
std::shared_ptr<Sampler> sampler = std::make_shared<AlwaysOnSampler>()) noexcept;

/**
* Set the span processor associated with this tracer.
* @param processor The new span processor for this tracer. This must not be
* a nullptr.
*/
void SetProcessor(std::shared_ptr<SpanProcessor> processor) noexcept;

/**
* Obtain the span processor associated with this tracer.
* @return The span processor for this tracer.
*/
std::shared_ptr<SpanProcessor> GetProcessor() const noexcept;

/**
* Obtain the sampler associated with this tracer.
* @return The sampler for this tracer.
*/
std::shared_ptr<Sampler> GetSampler() const noexcept;
/** Construct a new Tracer with the given context pipeline. */
explicit Tracer(std::shared_ptr<sdk::trace::TracerContext> context) noexcept;

nostd::shared_ptr<trace_api::Span> StartSpan(
nostd::string_view name,
Expand All @@ -56,10 +32,14 @@ class Tracer final : public trace_api::Tracer, public std::enable_shared_from_th

void CloseWithMicroseconds(uint64_t timeout) noexcept override;

/** Returns the currently active span processor. */
SpanProcessor &GetActiveProcessor() noexcept { return context_->GetActiveProcessor(); }

// Note: Test only
Sampler &GetSampler() { return context_->GetSampler(); }

private:
opentelemetry::sdk::common::AtomicSharedPtr<SpanProcessor> processor_;
const std::shared_ptr<Sampler> sampler_;
const opentelemetry::sdk::resource::Resource &resource_;
std::shared_ptr<sdk::trace::TracerContext> context_;
};
} // namespace trace
} // namespace sdk
Expand Down
84 changes: 84 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/tracer_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include "opentelemetry/sdk/common/atomic_unique_ptr.h"
#include "opentelemetry/sdk/resource/resource.h"
#include "opentelemetry/sdk/trace/processor.h"
#include "opentelemetry/sdk/trace/samplers/always_on.h"
#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace trace
{

/**
* A class which stores the TracerProvider context.
*
* This class meets the following design criteria:
* - A shared reference between TracerProvider and Tracers instantiated.
* - A thread-safe class that allows updating/altering processor/exporter pipelines
* and sampling config.
* - The owner/destroyer of Processors/Exporters. These will remain active until
* this class is destroyed. I.e. Sampling, Exporting, flushing etc. are all ok if this
* object is alive, and they will work together. If this object is destroyed, then
* no shared references to Processor, Exporter, Recordable etc. should exist, and all
* associated pipelines will have been flushed.
*/
class TracerContext
{
public:
explicit TracerContext(std::unique_ptr<SpanProcessor> processor,
opentelemetry::sdk::resource::Resource resource =
opentelemetry::sdk::resource::Resource::Create({}),
std::unique_ptr<Sampler> sampler =
std::unique_ptr<AlwaysOnSampler>(new AlwaysOnSampler)) noexcept;
/**
* Attaches a span processor to this tracer context.
*
* @param processor The new span processor for this tracer. This must not be
* a nullptr. Ownership is given to the `TracerContext`.
*/
void RegisterPipeline(std::unique_ptr<SpanProcessor> processor) noexcept;

/**
* Obtain the sampler associated with this tracer.
* @return The sampler for this tracer.
*/
Sampler &GetSampler() const noexcept;

/**
* Obtain the (conceptual) active processor.
*
* Note: When more than one processor is active, this will
* return an "aggregate" processor
*/
SpanProcessor &GetActiveProcessor() const noexcept;

/**
* Obtain the resource associated with this tracer context.
* @return The resource for this tracer context.
*/
const opentelemetry::sdk::resource::Resource &GetResource() const noexcept;

/**
* Force all active SpanProcessors to flush any buffered spans
* within the given timeout.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shutdown the span processor associated with this tracer provider.
*/
bool Shutdown() noexcept;

private:
// This is an atomic pointer so we can adapt the processor pipeline dynamically.
opentelemetry::sdk::common::AtomicUniquePtr<SpanProcessor> processor_;
opentelemetry::sdk::resource::Resource resource_;
std::unique_ptr<Sampler> sampler_;
};

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading

0 comments on commit 7b07013

Please sign in to comment.