Skip to content

Commit

Permalink
Add support for concurrent exports
Browse files Browse the repository at this point in the history
Applications generating significant span volume can end up dropping data
due to the synchronous export step. According to the opentelemetry spec,

    This function will never be called concurrently for the same exporter
    instance. It can be called again only after the current call returns.

However, it does not place a restriction on concurrent I/O or anything
of that nature. There is an [ongoing discussion] about tweaking the
language to make this more clear.

With that in mind, this commit makes the exporters return a future that
can be spawned concurrently. Unfortunately, this means that the
`export()` method can no longer be async while taking &mut self. The
latter is desirable to enforce the no concurrent calls line of the spec,
so the choice is made here to return a future instead with the lifetime
decoupled from self. This resulted in a bit of additional verbosity, but
for the most part the async code can still be shoved into an async fn
for the ergonomics.

The main exception to this is the `jaeger` exporter which internally
requires a bunch of mutable references. I plan to discuss with the
opentelemetry team the overall goal of this PR and get buy-in before
making more invasive changes to support this in the jaeger exporter.

[ongoing discussion]: open-telemetry/opentelemetry-specification#2434
  • Loading branch information
jwilm committed Apr 20, 2022
1 parent fdf6401 commit d18c5c5
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 150 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
"opentelemetry-datadog",
"opentelemetry-dynatrace",
"opentelemetry-http",
"opentelemetry-jaeger",
# "opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-proto",
Expand All @@ -16,24 +16,24 @@ members = [
"opentelemetry-stackdriver",
"opentelemetry-zipkin",
"opentelemetry-zpages",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-udp",
"examples/async",
# "examples/actix-http",
# "examples/actix-http-tracing",
# "examples/actix-udp",
# "examples/async",
"examples/aws-xray",
"examples/basic",
# "examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/basic-otlp-http",
"examples/datadog",
"examples/dynatrace",
"examples/external-otlp-tonic-tokio",
"examples/grpc",
# "examples/grpc",
"examples/http",
"examples/hyper-prometheus",
"examples/tracing-grpc",
# "examples/tracing-grpc",
"examples/zipkin",
"examples/multiple-span-processors",
# "examples/multiple-span-processors",
"examples/zpages"
]
exclude = ["examples/external-otlp-grpcio-async-std"]
1 change: 1 addition & 0 deletions opentelemetry-datadog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ thiserror = "1.0"
itertools = "0.10"
http = "0.2"
lazy_static = "1.4"
futures = "0.3"

[dev-dependencies]
base64 = "0.13"
Expand Down
74 changes: 45 additions & 29 deletions opentelemetry-datadog/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use model::FieldMappingFn;
use std::fmt::{Debug, Formatter};

use crate::exporter::model::FieldMapping;
use async_trait::async_trait;
use futures::future::BoxFuture;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::sdk::export::trace;
Expand All @@ -32,7 +32,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";

/// Datadog span exporter
pub struct DatadogExporter {
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
request_url: Uri,
model_config: ModelConfig,
version: ApiVersion,
Expand All @@ -47,7 +47,7 @@ impl DatadogExporter {
model_config: ModelConfig,
request_url: Uri,
version: ApiVersion,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -62,6 +62,27 @@ impl DatadogExporter {
service_name_mapping,
}
}

fn build_request(&self, batch: Vec<SpanData>) -> Result<http::Request<Vec<u8>>, TraceError> {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;

Ok(req)
}
}

impl Debug for DatadogExporter {
Expand Down Expand Up @@ -92,8 +113,7 @@ pub struct DatadogPipelineBuilder {
agent_endpoint: String,
trace_config: Option<sdk::trace::Config>,
version: ApiVersion,
client: Option<Box<dyn HttpClient>>,

client: Option<Arc<dyn HttpClient>>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -120,15 +140,15 @@ impl Default for DatadogPipelineBuilder {
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "surf-client"),
not(feature = "reqwest-blocking-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
}
}
}
Expand Down Expand Up @@ -275,7 +295,7 @@ impl DatadogPipelineBuilder {
/// Choose the http client used by uploader
pub fn with_http_client<T: HttpClient + 'static>(
mut self,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
) -> Self {
self.client = Some(client);
self
Expand Down Expand Up @@ -333,28 +353,24 @@ fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
.collect()
}

#[async_trait]
async fn send_request(
client: Arc<dyn HttpClient>,
request: http::Request<Vec<u8>>,
) -> trace::ExportResult {
let _ = client.send(request).await?.error_for_status()?;
Ok(())
}

impl trace::SpanExporter for DatadogExporter {
/// Export spans to datadog-agent
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let request = match self.build_request(batch) {
Ok(req) => req,
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let client = self.client.clone();
Box::pin(send_request(client, request))
}
}

Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-otlp/src/exporter/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{ExportConfig, Protocol};
use opentelemetry_http::HttpClient;
use std::collections::HashMap;
use std::sync::Arc;

/// Configuration of the http transport
#[cfg(feature = "http-proto")]
Expand All @@ -15,7 +16,7 @@ use std::collections::HashMap;
)]
pub struct HttpConfig {
/// Select the HTTP client
pub client: Option<Box<dyn HttpClient>>,
pub client: Option<Arc<dyn HttpClient>>,

/// Additional headers to send to the collector.
pub headers: Option<HashMap<String, String>>,
Expand All @@ -30,19 +31,19 @@ impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(
not(feature = "reqwest-blocking-client"),
not(feature = "surf-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "surf-client"),
Expand Down Expand Up @@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder {
impl HttpExporterBuilder {
/// Assign client implementation
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
self.http_config.client = Some(Box::new(client));
self.http_config.client = Some(Arc::new(client));
self
}

Expand Down
Loading

0 comments on commit d18c5c5

Please sign in to comment.