Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jaeger): remove internal message queue between exporter and exporting tasks #848

Merged
merged 4 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
indexmap = "=1.8"
once_cell = "1.12.0"
pin-project-lite = { version = "0.2.9", optional = true }
pin-project-lite = { version = "0.2", optional = true }
thiserror = "1"
tokio-stream = { version = "0.1", optional = true }

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ once_cell = "1.12"
opentelemetry = { version = "0.17", default-features = false, features = ["trace"], path = "../opentelemetry" }
opentelemetry-http = { version = "0.6", path = "../opentelemetry-http", optional = true }
opentelemetry-semantic-conventions = { version = "0.9", path = "../opentelemetry-semantic-conventions" }
pin-project-lite = { version = "0.2.9", optional = true }
pin-project-lite = { version = "0.2", optional = true }
reqwest = { version = "0.11", default-features = false, optional = true }
surf = { version = "2.0", optional = true }
thiserror = "1.0"
thrift = "0.15"
thrift = "0.16"
tokio = { version = "1.0", features = ["net", "sync"], optional = true }
wasm-bindgen = { version = "0.2", optional = true }
wasm-bindgen-futures = { version = "0.4.18", optional = true }
Expand Down
27 changes: 12 additions & 15 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::sdk;
use opentelemetry::sdk::trace::{Config, TracerProvider};
use opentelemetry::trace::TraceError;
use std::borrow::BorrowMut;
use std::sync::Arc;
use std::{env, net};

/// The max size of UDP packet we want to send, synced with jaeger-agent
Expand Down Expand Up @@ -235,7 +236,7 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new_sync(
let exporter = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
Expand Down Expand Up @@ -273,12 +274,7 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand Down Expand Up @@ -322,11 +318,10 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
Ok(Exporter::new_async(
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
runtime,
uploader,
))
}
Expand All @@ -337,14 +332,14 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new_sync(
Ok(Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
))
}

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Arc<dyn Uploader>, TraceError>
where
R: JaegerTraceRuntime,
{
Expand All @@ -355,17 +350,19 @@ impl AgentPipeline {
self.auto_split_batch,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::Agent(agent)))
Ok(Arc::new(AsyncUploader::Agent(futures::lock::Mutex::new(
agent,
))))
}

fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
self.auto_split_batch,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(SyncUploader::Agent(agent)))
Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod collector_client_tests {
.with_endpoint("localhost:6831")
.with_http_client(test_http_client::TestHttpClient);
let (_, process) = build_config_and_process(None, None);
let mut uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let res = futures_executor::block_on(async {
uploader
.upload(Batch::new(process.into(), Vec::new()))
Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError};
use std::borrow::BorrowMut;
use std::convert::TryFrom;
use std::env;
use std::sync::Arc;
#[cfg(feature = "collector_client")]
use std::time::Duration;

Expand Down Expand Up @@ -411,12 +412,7 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -436,7 +432,7 @@ impl CollectorPipeline {
install_tracer_provider_and_get_tracer(tracer_provider)
}

fn build_uploader<R>(self) -> Result<Box<dyn Uploader>, crate::Error>
fn build_uploader<R>(self) -> Result<Arc<dyn Uploader>, crate::Error>
where
R: JaegerTraceRuntime,
{
Expand All @@ -461,14 +457,14 @@ impl CollectorPipeline {
)?;

let collector = AsyncHttpClient::new(endpoint, client);
Ok(Box::new(AsyncUploader::<R>::Collector(collector)))
Ok(Arc::new(AsyncUploader::<R>::Collector(collector)))
}
#[cfg(feature = "wasm_collector_client")]
ClientConfig::Wasm => {
let collector =
WasmCollector::new(endpoint, self.collector_username, self.collector_password)
.map_err::<crate::Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::<R>::WasmCollector(collector)))
Ok(Arc::new(AsyncUploader::<R>::WasmCollector(collector)))
}
}
}
Expand Down
124 changes: 22 additions & 102 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::sync::Arc;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand All @@ -44,108 +43,25 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";

#[derive(Debug)]
enum ExportMessage {
Export {
batch: Vec<trace::SpanData>,
tx: oneshot::Sender<trace::ExportResult>,
},
Shutdown,
}

/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
tx: mpsc::Sender<ExportMessage>,

// In the switch to concurrent exports, the non-test code which used this
// value was moved into the ExporterTask implementation. However, there's
// still a test that relies on this value being here, thus the
// allow(dead_code).
#[allow(dead_code)]
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Arc<dyn Uploader>,
process: jaeger::Process,
}

impl Exporter {
fn new_async<R>(
process: jaeger::Process,
export_instrumentation_lib: bool,
runtime: R,
uploader: Box<dyn Uploader>,
) -> Exporter
where
R: JaegerTraceRuntime,
{
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};

runtime.spawn(Box::pin(exporter_task.run()));

Exporter { tx, process }
}

fn new_sync(
fn new(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
uploader: Arc<dyn Uploader>,
) -> Exporter {
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
Exporter {
export_instrumentation_lib,
uploader,
process: process.clone(),
};

std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
});

Exporter { tx, process }
}
}

struct ExporterTask {
rx: mpsc::Receiver<ExportMessage>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
while let Some(message) = self.rx.next().await {
match message {
ExportMessage::Export { batch, tx } => {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// Errors here might be completely expected if the receiver didn't
// care about the result.
let _ = tx.send(res);
}
ExportMessage::Shutdown => break,
}
process,
}
}
}
Expand All @@ -160,19 +76,23 @@ pub struct Process {
}

impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

Box::pin(async move { rx.await? })
}

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);
let uploader = self.uploader.clone();
Box::pin(async move {
uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
})
}
}

Expand Down
16 changes: 10 additions & 6 deletions opentelemetry-jaeger/src/exporter/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ use crate::exporter::thrift::jaeger::Batch;
use crate::exporter::JaegerTraceRuntime;

#[async_trait]
pub(crate) trait Uploader: std::fmt::Debug + Send {
async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult;
pub(crate) trait Uploader: Debug + Send + Sync {
async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult;
}

#[derive(Debug)]
pub(crate) enum SyncUploader {
Agent(agent::AgentSyncClientUdp),
Agent(std::sync::Mutex<agent::AgentSyncClientUdp>),
}

#[async_trait]
impl Uploader for SyncUploader {
async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult {
async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult {
match self {
SyncUploader::Agent(client) => {
// TODO Implement retry behaviour
client
.lock()
.expect("Failed to lock agent client")
.emit_batch(batch)
.map_err::<crate::Error, _>(Into::into)?;
}
Expand All @@ -39,7 +41,7 @@ impl Uploader for SyncUploader {
#[derive(Debug)]
pub(crate) enum AsyncUploader<R: JaegerTraceRuntime> {
/// Agent async client
Agent(agent::AgentAsyncClientUdp<R>),
Agent(futures::lock::Mutex<agent::AgentAsyncClientUdp<R>>),
/// Collector sync client
#[cfg(feature = "collector_client")]
Collector(collector::AsyncHttpClient),
Expand All @@ -49,11 +51,13 @@ pub(crate) enum AsyncUploader<R: JaegerTraceRuntime> {

#[async_trait]
impl<R: JaegerTraceRuntime> Uploader for AsyncUploader<R> {
async fn upload(&mut self, batch: Batch) -> ExportResult {
async fn upload(&self, batch: Batch) -> ExportResult {
match self {
Self::Agent(client) => {
// TODO Implement retry behaviour
client
.lock()
.await
.emit_batch(batch)
.await
.map_err::<crate::Error, _>(Into::into)?;
Expand Down