Skip to content

Commit

Permalink
[WIP] Implement new SpanExporter API for Jaeger
Browse files Browse the repository at this point in the history
Key points
- decouple exporter from uploaders via channel and spawned task
- some uploaders are a shared I/O resource and cannot be multiplexed
    - necessitates a task queue
    - eg, HttpClient will spawn many I/O tasks internally, AgentUploader
      is a single I/O resource. Different level of abstraction.
- Synchronous API not supported without a Runtime argument (can we
  thread one through?)
  • Loading branch information
jwilm committed Apr 21, 2022
1 parent cc2175d commit 6f75887
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
"opentelemetry-datadog",
"opentelemetry-dynatrace",
"opentelemetry-http",
# "opentelemetry-jaeger",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-proto",
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
Expand Down
88 changes: 45 additions & 43 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,25 +228,25 @@ impl AgentPipeline {
/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
);

builder = builder.with_simple_exporter(exporter);
builder = builder.with_config(config);

Ok(builder.build())
}
// pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
// let mut builder = sdk::trace::TracerProvider::builder();

// let (config, process) = build_config_and_process(
// builder.sdk_provided_resource(),
// self.trace_config.take(),
// self.transformation_config.service_name.take(),
// );
// let exporter = Exporter::new(
// process.into(),
// self.transformation_config.export_instrument_library,
// self.build_sync_agent_uploader()?,
// );

// builder = builder.with_simple_exporter(exporter);
// builder = builder.with_config(config);

// Ok(builder.build())
// }

/// Build a `TracerProvider` using a async exporter and configurations from the pipeline.
///
Expand Down Expand Up @@ -275,7 +275,9 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -287,10 +289,10 @@ impl AgentPipeline {
/// tracer provider.
///
/// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple()?;
install_tracer_provider_and_get_tracer(tracer_provider)
}
// pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
// let tracer_provider = self.build_simple()?;
// install_tracer_provider_and_get_tracer(tracer_provider)
// }

/// Similar to [`build_batch`][AgentPipeline::build_batch] but also returns a tracer from the
/// tracer provider.
Expand Down Expand Up @@ -321,28 +323,27 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
uploader,
))
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
Ok(exporter)
}

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
let builder = sdk::trace::TracerProvider::builder();
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
))
}
// pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
// let builder = sdk::trace::TracerProvider::builder();
// let (_, process) = build_config_and_process(
// builder.sdk_provided_resource(),
// self.trace_config.take(),
// self.transformation_config.service_name.take(),
// );
// Ok(Exporter::new(
// process.into(),
// self.transformation_config.export_instrument_library,
// self.build_sync_agent_uploader()?,
// ))
// }

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
where
Expand All @@ -358,6 +359,7 @@ impl AgentPipeline {
Ok(Box::new(AsyncUploader::Agent(agent)))
}

#[allow(dead_code)] // TODO jwilm
fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mod tests {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
// TODO jwilm
let exporter = builder.build_sync_agent_exporter().unwrap();
assert_eq!(exporter.process.service_name, "test service");
env::set_var("OTEL_SERVICE_NAME", "")
Expand Down
79 changes: 56 additions & 23 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::future::Future;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand All @@ -45,22 +48,59 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
tx: mpsc::Sender<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
}

impl Exporter {
fn new(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
Exporter {
process,
export_instrumentation_lib,
uploader,
) -> (Exporter, impl Future<Output = ()>) {
let (tx, rx) = futures::channel::mpsc::channel(64);
(
Exporter { tx },
ExporterTask {
rx,
process,
export_instrumentation_lib,
uploader,
}
.run(),
)
}
}

struct ExporterTask {
rx: mpsc::Receiver<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
// TODO jwilm: this might benefit from a ExporterMessage so that we can
// send Shutdown and break the loop.
while let Some((batch, tx)) = self.rx.next().await {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// TODO jwilm: is ignoring the err (fail to send) correct here?
let _ = tx.send(res);
}
}
}
Expand All @@ -74,23 +114,16 @@ pub struct Process {
pub tags: Vec<KeyValue>,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send((batch, tx)) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

self.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
Box::pin(async move { rx.await? })
}
}

Expand Down

0 comments on commit 6f75887

Please sign in to comment.