Skip to content

Commit

Permalink
[WIP] Implement new SpanExporter API for Jaeger
Browse files Browse the repository at this point in the history
Key points
- decouple exporter from uploaders via channel and spawned task
- some uploaders are a shared I/O resource and cannot be multiplexed
    - necessitates a task queue
    - eg, HttpClient will spawn many I/O tasks internally, AgentUploader
      is a single I/O resource. Different level of abstraction.
- Synchronous API not supported without a Runtime argument. I updated
  the API to thread one through, but maybe this is undesirable. I'm also
  exploiting the fact in the Actix examples that it uses Tokio under the
  hood to pass through the Tokio runtime token.
- Tests pass save for a couple of flakey environment ones which is
  likely a race condition.
  • Loading branch information
jwilm committed Apr 21, 2022
1 parent cc2175d commit 0617ca8
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 60 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
"opentelemetry-datadog",
"opentelemetry-dynatrace",
"opentelemetry-http",
# "opentelemetry-jaeger",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-proto",
Expand All @@ -16,24 +16,24 @@ members = [
"opentelemetry-stackdriver",
"opentelemetry-zipkin",
"opentelemetry-zpages",
# "examples/actix-http",
# "examples/actix-http-tracing",
# "examples/actix-udp",
# "examples/async",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-udp",
"examples/async",
"examples/aws-xray",
# "examples/basic",
"examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/basic-otlp-http",
"examples/datadog",
"examples/dynatrace",
"examples/external-otlp-tonic-tokio",
# "examples/grpc",
"examples/grpc",
"examples/http",
"examples/hyper-prometheus",
# "examples/tracing-grpc",
"examples/tracing-grpc",
"examples/zipkin",
# "examples/multiple-span-processors",
"examples/multiple-span-processors",
"examples/zpages"
]
exclude = ["examples/external-otlp-grpcio-async-std"]
8 changes: 5 additions & 3 deletions examples/actix-udp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
fn init_tracer<R: JaegerTraceRuntime>(runtime: R) -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
Expand All @@ -19,7 +21,7 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry::KeyValue::new("exporter", "jaeger"),
]),
))
.install_simple()
.install_simple(runtime)
}

async fn index() -> &'static str {
Expand All @@ -34,7 +36,7 @@ async fn index() -> &'static str {
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let _tracer = init_tracer().expect("Failed to initialise tracer.");
let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer.");

HttpServer::new(|| {
App::new()
Expand Down
8 changes: 5 additions & 3 deletions examples/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use hello_world::HelloRequest;
use opentelemetry::global;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceResult;
use opentelemetry::{
propagation::Injector,
sdk::trace::Tracer,
trace::{TraceContextExt, Tracer as _},
Context, KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);

Expand All @@ -28,16 +30,16 @@ pub mod hello_world {
tonic::include_proto!("helloworld");
}

fn tracing_init() -> TraceResult<Tracer> {
fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> TraceResult<Tracer> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-client")
.install_simple()
.install_simple(runtime)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = tracing_init()?;
let tracer = tracing_init(Tokio)?;
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
let span = tracer.start("client-request");
let cx = Context::current_with_span(span);
Expand Down
8 changes: 5 additions & 3 deletions examples/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{
propagation::Extractor,
trace::{Span, Tracer},
KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;
use std::error::Error;

pub mod hello_world {
Expand Down Expand Up @@ -59,16 +61,16 @@ impl Greeter for MyGreeter {
}
}

fn tracing_init() -> Result<impl Tracer, TraceError> {
fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> Result<impl Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-server")
.install_simple()
.install_simple(runtime)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _tracer = tracing_init()?;
let _tracer = tracing_init(Tokio)?;
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
Expand Down
43 changes: 29 additions & 14 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,24 @@ impl AgentPipeline {
/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
pub fn build_simple<R: JaegerTraceRuntime>(
mut self,
runtime: R,
) -> Result<TracerProvider, TraceError> {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new(
let (exporter, task) = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
);

runtime.spawn(Box::pin(task));
builder = builder.with_simple_exporter(exporter);
builder = builder.with_config(config);

Expand Down Expand Up @@ -275,7 +279,9 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -287,8 +293,11 @@ impl AgentPipeline {
/// tracer provider.
///
/// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple()?;
pub fn install_simple<R: JaegerTraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple(runtime)?;
install_tracer_provider_and_get_tracer(tracer_provider)
}

Expand Down Expand Up @@ -321,27 +330,32 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
uploader,
))
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
Ok(exporter)
}

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
pub fn build_sync_agent_exporter<R: JaegerTraceRuntime>(
mut self,
runtime: R,
) -> Result<crate::Exporter, TraceError> {
let builder = sdk::trace::TracerProvider::builder();
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new(
let (exporter, task) = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
))
);

runtime.spawn(Box::pin(task));
Ok(exporter)
}

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
Expand All @@ -358,6 +372,7 @@ impl AgentPipeline {
Ok(Box::new(AsyncUploader::Agent(agent)))
}

#[allow(dead_code)] // TODO jwilm
fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,9 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);

Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ mod tests {
);
}

#[test]
fn test_read_from_env() {
#[tokio::test]
async fn test_read_from_env() {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
let exporter = builder.build_sync_agent_exporter().unwrap();
let exporter = builder
.build_sync_agent_exporter(opentelemetry::runtime::Tokio)
.unwrap();
assert_eq!(exporter.process.service_name, "test service");
env::set_var("OTEL_SERVICE_NAME", "")
}
Expand Down
Loading

0 comments on commit 0617ca8

Please sign in to comment.