diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index e1e1be6955..bba1ab6190 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -4,6 +4,7 @@ ### Added - Add OTLP HTTP Metrics Exporter [#1020](https://github.com/open-telemetry/opentelemetry-rust/pull/1020). +- Add tonic compression support [#1165](https://github.com/open-telemetry/opentelemetry-rust/pull/1165). ## v0.12.0 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 0d580ae2bf..6bd18d9eef 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -70,6 +70,7 @@ default = ["grpc-tonic", "trace"] # grpc using tonic grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] +gzip-tonic = ["tonic/gzip"] tls = ["tonic/tls"] tls-roots = ["tls", "tonic/tls-roots"] diff --git a/opentelemetry-otlp/src/exporter/grpcio.rs b/opentelemetry-otlp/src/exporter/grpcio.rs index fd3e6a24f4..70a389b71d 100644 --- a/opentelemetry-otlp/src/exporter/grpcio.rs +++ b/opentelemetry-otlp/src/exporter/grpcio.rs @@ -1,3 +1,4 @@ +use crate::exporter::Compression; use crate::ExportConfig; #[cfg(feature = "serialize")] use serde::{Deserialize, Serialize}; @@ -47,14 +48,6 @@ pub struct Credentials { pub key: String, } -/// The compression algorithm to use when sending data. -#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] -#[derive(Clone, Copy, Debug)] -pub enum Compression { - /// Compresses data using gzip. - Gzip, -} - impl From for grpcio::CompressionAlgorithms { fn from(compression: Compression) -> Self { match compression { diff --git a/opentelemetry-otlp/src/exporter/mod.rs b/opentelemetry-otlp/src/exporter/mod.rs index 1d03ebc6d0..88e74088b7 100644 --- a/opentelemetry-otlp/src/exporter/mod.rs +++ b/opentelemetry-otlp/src/exporter/mod.rs @@ -8,8 +8,11 @@ use crate::exporter::grpcio::GrpcioExporterBuilder; use crate::exporter::http::HttpExporterBuilder; #[cfg(feature = "grpc-tonic")] use crate::exporter::tonic::TonicExporterBuilder; -use crate::Protocol; +use crate::{Error, Protocol}; +#[cfg(feature = "serialize")] +use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::fmt::{Display, Formatter}; use std::str::FromStr; use std::time::Duration; @@ -21,6 +24,8 @@ pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT"; pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT; /// Protocol the exporter will use. Either `http/protobuf` or `grpc`. pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION"; #[cfg(feature = "http-proto")] /// Default protocol, using http-proto. @@ -79,6 +84,33 @@ impl Default for ExportConfig { } } +/// The compression algorithm to use when sending data. +#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Compression { + /// Compresses data using gzip. + Gzip, +} + +impl Display for Compression { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Compression::Gzip => write!(f, "gzip"), + } + } +} + +impl FromStr for Compression { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "gzip" => Ok(Compression::Gzip), + _ => Err(Error::UnsupportedCompressionAlgorithm(s.to_string())), + } + } +} + /// default protocol based on enabled features fn default_protocol() -> Protocol { match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT { @@ -217,13 +249,15 @@ impl WithExportConfig for B { mod tests { // If an env test fails then the mutex will be poisoned and the following error will be displayed. const LOCK_POISONED_MESSAGE: &str = "one of the other pipeline builder from env tests failed"; + use crate::exporter::{ default_endpoint, default_protocol, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL_GRPC, OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, }; - use crate::{new_exporter, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL}; + use crate::{new_exporter, Compression, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL}; + use std::str::FromStr; use std::sync::Mutex; // Make sure env tests are not running concurrently @@ -345,4 +379,15 @@ mod tests { std::env::remove_var(OTEL_EXPORTER_OTLP_TIMEOUT); assert!(std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT).is_err()); } + + #[test] + fn test_compression_parse() { + assert_eq!(Compression::from_str("gzip").unwrap(), Compression::Gzip); + Compression::from_str("bad_compression").expect_err("bad compression"); + } + + #[test] + fn test_compression_to_str() { + assert_eq!(Compression::Gzip.to_string(), "gzip"); + } } diff --git a/opentelemetry-otlp/src/exporter/tonic.rs b/opentelemetry-otlp/src/exporter/tonic.rs index 1cf50a4a5c..ab1d3bfb2f 100644 --- a/opentelemetry-otlp/src/exporter/tonic.rs +++ b/opentelemetry-otlp/src/exporter/tonic.rs @@ -1,5 +1,7 @@ -use crate::ExportConfig; +use crate::exporter::Compression; +use crate::{ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION}; use std::fmt::{Debug, Formatter}; +use tonic::codec::CompressionEncoding; use tonic::metadata::MetadataMap; #[cfg(feature = "tls")] use tonic::transport::ClientTlsConfig; @@ -18,6 +20,39 @@ pub struct TonicConfig { /// TLS settings for the collector endpoint. #[cfg(feature = "tls")] pub tls_config: Option, + + /// The compression algorithm to use when communicating with the collector. + pub compression: Option, +} + +impl TryFrom for tonic::codec::CompressionEncoding { + type Error = crate::Error; + + fn try_from(value: Compression) -> Result { + match value { + #[cfg(feature = "gzip-tonic")] + Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip), + #[cfg(not(feature = "gzip-tonic"))] + Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm( + value.to_string(), + )), + } + } +} + +pub(crate) fn resolve_compression( + tonic_config: &TonicConfig, + env_override: &'static str, +) -> Result, crate::Error> { + if let Some(compression) = tonic_config.compression { + Ok(Some(compression.try_into()?)) + } else if let Ok(compression) = std::env::var(env_override) { + Ok(Some(compression.parse::()?.try_into()?)) + } else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) { + Ok(Some(compression.parse::()?.try_into()?)) + } else { + Ok(None) + } } /// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol. @@ -60,6 +95,7 @@ impl Default for TonicExporterBuilder { )), #[cfg(feature = "tls")] tls_config: None, + compression: None, }; TonicExporterBuilder { @@ -94,6 +130,12 @@ impl TonicExporterBuilder { self } + /// Set the compression algorithm to use when communicating with the collector. + pub fn with_compression(mut self, compression: Compression) -> Self { + self.tonic_config.compression = Some(compression); + self + } + /// Use `channel` as tonic's transport channel. /// this will override tls config and should only be used /// when working with non-HTTP transports. @@ -119,6 +161,8 @@ impl TonicExporterBuilder { #[cfg(test)] mod tests { + #[cfg(feature = "gzip-tonic")] + use crate::exporter::Compression; use crate::TonicExporterBuilder; use tonic::metadata::{MetadataMap, MetadataValue}; @@ -151,4 +195,14 @@ mod tests { .len() ); } + + #[test] + #[cfg(feature = "gzip-tonic")] + fn test_with_compression() { + // metadata should merge with the current one with priority instead of just replacing it + let mut metadata = MetadataMap::new(); + metadata.insert("foo", "bar".parse().unwrap()); + let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); + assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); + } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index a70d565239..9429b99ab5 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -189,24 +189,28 @@ mod metric; mod span; mod transform; +pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; #[cfg(feature = "trace")] pub use crate::span::{ - OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, + OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, }; #[cfg(feature = "metrics")] pub use crate::metric::{ MetricsExporter, MetricsExporterBuilder, OtlpMetricPipeline, - OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, + OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, }; #[cfg(feature = "logs")] -pub use crate::logs::*; +pub use crate::logs::{ + LogExporter, LogExporterBuilder, OtlpLogPipeline, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, +}; pub use crate::exporter::{ - HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, + HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL, OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT, OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, @@ -217,7 +221,7 @@ use opentelemetry_sdk::export::ExportError; use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[cfg(feature = "grpc-sys")] -pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioConfig, GrpcioExporterBuilder}; +pub use crate::exporter::grpcio::{Credentials, GrpcioConfig, GrpcioExporterBuilder}; #[cfg(feature = "http-proto")] pub use crate::exporter::http::HttpExporterBuilder; #[cfg(feature = "grpc-tonic")] @@ -347,6 +351,10 @@ pub enum Error { /// The pipeline will need a exporter to complete setup. Throw this error if none is provided. #[error("no exporter builder is provided, please provide one using with_exporter() method")] NoExporterBuilder, + + /// Unsupported compression algorithm. + #[error("unsupported compression algorithm '{0}'")] + UnsupportedCompressionAlgorithm(String), } #[cfg(feature = "grpc-tonic")] diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index d73223a0b5..52430acc5f 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -4,7 +4,7 @@ #[cfg(feature = "grpc-tonic")] use { - crate::exporter::tonic::{TonicConfig, TonicExporterBuilder}, + crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder}, opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient as TonicLogsServiceClient, ExportLogsServiceRequest as TonicRequest, @@ -57,6 +57,9 @@ use std::{ use opentelemetry_api::logs::{LogError, LoggerProvider}; use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel}; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; + impl OtlpPipeline { /// Create a OTLP logging pipeline. pub fn logging(self) -> OtlpLogPipeline { @@ -232,10 +235,16 @@ impl LogExporter { tonic_config: TonicConfig, channel: tonic::transport::Channel, ) -> Result { + let mut log_exporter = TonicLogsServiceClient::new(channel); + if let Some(compression) = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)? + { + log_exporter = log_exporter.send_compressed(compression); + } Ok(LogExporter::Tonic { timeout: config.timeout, metadata: tonic_config.metadata, - log_exporter: TonicLogsServiceClient::new(channel), + log_exporter, }) } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index 76b457d769..e6c15f75b5 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -4,7 +4,6 @@ //! //! Currently, OTEL metrics exporter only support GRPC connection via tonic on tokio runtime. -use crate::exporter::tonic::TonicExporterBuilder; use crate::transform::sink; use crate::{Error, OtlpPipeline}; use async_trait::async_trait; @@ -13,10 +12,7 @@ use opentelemetry_api::{ global, metrics::{MetricsError, Result}, }; -#[cfg(feature = "grpc-tonic")] -use opentelemetry_proto::tonic::collector::metrics::v1::{ - metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, -}; + use opentelemetry_sdk::{ metrics::{ data::{ResourceMetrics, Temporality}, @@ -31,16 +27,23 @@ use opentelemetry_sdk::{ Resource, }; use std::fmt::{Debug, Formatter}; -#[cfg(feature = "grpc-tonic")] -use std::str::FromStr; use std::sync::Mutex; use std::time; use std::time::Duration; +use tonic::codegen::{Body, StdError}; use tonic::metadata::KeyAndValueRef; #[cfg(feature = "grpc-tonic")] -use tonic::transport::Channel; -#[cfg(feature = "grpc-tonic")] -use tonic::Request; +use { + crate::exporter::tonic::{resolve_compression, TonicExporterBuilder}, + opentelemetry_proto::tonic::collector::metrics::v1::{ + metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, + }, + std::str::FromStr, + tonic::codegen::Bytes, + tonic::transport::Channel, + tonic::Request, +}; + #[cfg(feature = "http-proto")] use { crate::exporter::http::HttpExporterBuilder, @@ -61,7 +64,8 @@ use { pub const OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"; /// Max waiting time for the backend to process each metrics batch, defaults to 10s. pub const OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT"; - +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION"; impl OtlpPipeline { /// Create a OTLP metrics pipeline. pub fn metrics(self, rt: RT) -> OtlpMetricPipeline @@ -356,6 +360,8 @@ impl MetricsExporter { }, Err(_) => config.timeout, }; + let compression = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_METRICS_COMPRESSION)?; let endpoint = Channel::from_shared(endpoint).map_err::(Into::into)?; @@ -376,11 +382,18 @@ impl MetricsExporter { tokio::spawn(async move { match export_builder.interceptor { Some(interceptor) => { - let client = MetricsServiceClient::with_interceptor(channel, interceptor); + let mut client = MetricsServiceClient::with_interceptor(channel, interceptor); + if let Some(compression) = compression { + client = client.send_compressed(compression); + } + export_sink(client, receiver).await } None => { - let client = MetricsServiceClient::new(channel); + let mut client = MetricsServiceClient::new(channel); + if let Some(compression) = compression { + client = client.send_compressed(compression) + } export_sink(client, receiver).await } } @@ -465,7 +478,7 @@ async fn http_send_request( Ok(()) } -use tonic::codegen::{Body, Bytes, StdError}; +#[cfg(feature = "grpc-tonic")] async fn export_sink( mut client: MetricsServiceClient, mut receiver: tokio::sync::mpsc::Receiver, diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 8b6870f3b0..d217ef3025 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -9,7 +9,7 @@ use std::time::Duration; use std::str::FromStr; #[cfg(feature = "grpc-tonic")] use { - crate::exporter::tonic::{TonicConfig, TonicExporterBuilder}, + crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder}, opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient as TonicTraceServiceClient, ExportTraceServiceRequest as TonicRequest, @@ -74,6 +74,8 @@ use sdk::runtime::RuntimeChannel; pub const OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"; /// Max waiting time for the backend to process each spans batch, defaults to 10s. pub const OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT"; +/// Compression algorithm to use, defaults to none. +pub const OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION"; impl OtlpPipeline { /// Create a OTLP tracing pipeline. @@ -386,10 +388,17 @@ impl SpanExporter { tonic_config: TonicConfig, channel: tonic::transport::Channel, ) -> Result { + let mut trace_exporter = TonicTraceServiceClient::new(channel); + if let Some(compression) = + resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)? + { + trace_exporter = trace_exporter.send_compressed(compression) + } + Ok(SpanExporter::Tonic { timeout: config.timeout, metadata: tonic_config.metadata, - trace_exporter: TonicTraceServiceClient::new(channel), + trace_exporter, }) } diff --git a/opentelemetry-otlp/tests/smoke.rs b/opentelemetry-otlp/tests/smoke.rs index d7e6c5a32d..278d61df0b 100644 --- a/opentelemetry-otlp/tests/smoke.rs +++ b/opentelemetry-otlp/tests/smoke.rs @@ -9,6 +9,8 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use std::{net::SocketAddr, sync::Mutex}; use tokio::sync::mpsc; use tokio_stream::wrappers::TcpListenerStream; +#[cfg(feature = "gzip-tonic")] +use tonic::codec::CompressionEncoding; struct MockServer { tx: Mutex>, @@ -57,6 +59,10 @@ async fn setup() -> (SocketAddr, mpsc::Receiver) { }); let (req_tx, req_rx) = mpsc::channel(10); + #[cfg(feature = "gzip-tonic")] + let service = TraceServiceServer::new(MockServer::new(req_tx)) + .accept_compressed(CompressionEncoding::Gzip); + #[cfg(not(feature = "gzip-tonic"))] let service = TraceServiceServer::new(MockServer::new(req_tx)); tokio::task::spawn(async move { tonic::transport::Server::builder() @@ -80,6 +86,13 @@ async fn smoke_tracer() { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter( + #[cfg(feature = "gzip-tonic")] + opentelemetry_otlp::new_exporter() + .tonic() + .with_compression(opentelemetry_otlp::Compression::Gzip) + .with_endpoint(format!("http://{}", addr)) + .with_metadata(metadata), + #[cfg(not(feature = "gzip-tonic"))] opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(format!("http://{}", addr))