Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compression support for otlp tonic #1165

Merged
merged 7 commits into from
Jul 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added
- Add OTLP HTTP Metrics Exporter [#1020](https://github.com/open-telemetry/opentelemetry-rust/pull/1020).
- Add tonic compression support [#1165](https://github.com/open-telemetry/opentelemetry-rust/pull/1165).

## v0.12.0

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ default = ["grpc-tonic", "trace"]

# grpc using tonic
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"]
gzip-tonic = ["tonic/gzip"]
tls = ["tonic/tls"]
tls-roots = ["tls", "tonic/tls-roots"]

Expand Down
9 changes: 1 addition & 8 deletions opentelemetry-otlp/src/exporter/grpcio.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::exporter::Compression;
use crate::ExportConfig;
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -47,14 +48,6 @@ pub struct Credentials {
pub key: String,
}

/// The compression algorithm to use when sending data.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Copy, Debug)]
pub enum Compression {
/// Compresses data using gzip.
Gzip,
}

impl From<Compression> for grpcio::CompressionAlgorithms {
fn from(compression: Compression) -> Self {
match compression {
Expand Down
49 changes: 47 additions & 2 deletions opentelemetry-otlp/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use crate::exporter::grpcio::GrpcioExporterBuilder;
use crate::exporter::http::HttpExporterBuilder;
#[cfg(feature = "grpc-tonic")]
use crate::exporter::tonic::TonicExporterBuilder;
use crate::Protocol;
use crate::{Error, Protocol};
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -21,6 +24,8 @@ pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT;
/// Protocol the exporter will use. Either `http/protobuf` or `grpc`.
pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION";

#[cfg(feature = "http-proto")]
/// Default protocol, using http-proto.
Expand Down Expand Up @@ -79,6 +84,33 @@ impl Default for ExportConfig {
}
}

/// The compression algorithm to use when sending data.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Compression {
/// Compresses data using gzip.
Gzip,
}

impl Display for Compression {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Compression::Gzip => write!(f, "gzip"),
}
}
}

impl FromStr for Compression {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"gzip" => Ok(Compression::Gzip),
_ => Err(Error::UnsupportedCompressionAlgorithm(s.to_string())),
}
}
}

/// default protocol based on enabled features
fn default_protocol() -> Protocol {
match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT {
Expand Down Expand Up @@ -217,13 +249,15 @@ impl<B: HasExportConfig> WithExportConfig for B {
mod tests {
// If an env test fails then the mutex will be poisoned and the following error will be displayed.
const LOCK_POISONED_MESSAGE: &str = "one of the other pipeline builder from env tests failed";

use crate::exporter::{
default_endpoint, default_protocol, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT,
OTEL_EXPORTER_OTLP_PROTOCOL_GRPC, OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF,
OTEL_EXPORTER_OTLP_TIMEOUT, OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
};
use crate::{new_exporter, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL};
use crate::{new_exporter, Compression, Protocol, OTEL_EXPORTER_OTLP_PROTOCOL};
use std::str::FromStr;
use std::sync::Mutex;

// Make sure env tests are not running concurrently
Expand Down Expand Up @@ -345,4 +379,15 @@ mod tests {
std::env::remove_var(OTEL_EXPORTER_OTLP_TIMEOUT);
assert!(std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT).is_err());
}

#[test]
fn test_compression_parse() {
assert_eq!(Compression::from_str("gzip").unwrap(), Compression::Gzip);
Compression::from_str("bad_compression").expect_err("bad compression");
}

#[test]
fn test_compression_to_str() {
assert_eq!(Compression::Gzip.to_string(), "gzip");
}
}
56 changes: 55 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::ExportConfig;
use crate::exporter::Compression;
use crate::{ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION};
use std::fmt::{Debug, Formatter};
use tonic::codec::CompressionEncoding;
use tonic::metadata::MetadataMap;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;
Expand All @@ -18,6 +20,39 @@ pub struct TonicConfig {
/// TLS settings for the collector endpoint.
#[cfg(feature = "tls")]
pub tls_config: Option<ClientTlsConfig>,

/// The compression algorithm to use when communicating with the collector.
pub compression: Option<Compression>,
}

impl TryFrom<Compression> for tonic::codec::CompressionEncoding {
type Error = crate::Error;

fn try_from(value: Compression) -> Result<Self, Self::Error> {
match value {
#[cfg(feature = "gzip-tonic")]
Compression::Gzip => Ok(tonic::codec::CompressionEncoding::Gzip),
#[cfg(not(feature = "gzip-tonic"))]
Compression::Gzip => Err(crate::Error::UnsupportedCompressionAlgorithm(
value.to_string(),
)),
}
}
}

pub(crate) fn resolve_compression(
tonic_config: &TonicConfig,
env_override: &'static str,
) -> Result<Option<CompressionEncoding>, crate::Error> {
if let Some(compression) = tonic_config.compression {
BrynCooke marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(compression.try_into()?));
}
if let Ok(compression) = std::env::var(env_override) {
return Ok(Some(compression.parse::<Compression>()?.try_into()?));
} else if let Ok(compression) = std::env::var(OTEL_EXPORTER_OTLP_COMPRESSION) {
return Ok(Some(compression.parse::<Compression>()?.try_into()?));
};
Ok(None)
}

/// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol.
Expand Down Expand Up @@ -60,6 +95,7 @@ impl Default for TonicExporterBuilder {
)),
#[cfg(feature = "tls")]
tls_config: None,
compression: None,
};

TonicExporterBuilder {
Expand Down Expand Up @@ -94,6 +130,12 @@ impl TonicExporterBuilder {
self
}

/// Set the compression algorithm to use when communicating with the collector.
pub fn with_compression(mut self, compression: Compression) -> Self {
self.tonic_config.compression = Some(compression);
self
}

/// Use `channel` as tonic's transport channel.
/// this will override tls config and should only be used
/// when working with non-HTTP transports.
Expand All @@ -119,6 +161,8 @@ impl TonicExporterBuilder {

#[cfg(test)]
mod tests {
#[cfg(feature = "gzip-tonic")]
use crate::exporter::Compression;
use crate::TonicExporterBuilder;
use tonic::metadata::{MetadataMap, MetadataValue};

Expand Down Expand Up @@ -151,4 +195,14 @@ mod tests {
.len()
);
}

#[test]
#[cfg(feature = "gzip-tonic")]
fn test_with_compression() {
// metadata should merge with the current one with priority instead of just replacing it
let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}
}
20 changes: 14 additions & 6 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,28 @@ mod metric;
mod span;
mod transform;

pub use crate::exporter::Compression;
pub use crate::exporter::ExportConfig;
#[cfg(feature = "trace")]
pub use crate::span::{
OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
OtlpTracePipeline, SpanExporter, SpanExporterBuilder, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
};

#[cfg(feature = "metrics")]
pub use crate::metric::{
MetricsExporter, MetricsExporterBuilder, OtlpMetricPipeline,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
};

#[cfg(feature = "logs")]
pub use crate::logs::*;
pub use crate::logs::{
LogExporter, LogExporterBuilder, OtlpLogPipeline, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
};

pub use crate::exporter::{
HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT,
HasExportConfig, WithExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT, OTEL_EXPORTER_OTLP_TIMEOUT,
OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
Expand All @@ -217,7 +221,7 @@ use opentelemetry_sdk::export::ExportError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[cfg(feature = "grpc-sys")]
pub use crate::exporter::grpcio::{Compression, Credentials, GrpcioConfig, GrpcioExporterBuilder};
pub use crate::exporter::grpcio::{Credentials, GrpcioConfig, GrpcioExporterBuilder};
#[cfg(feature = "http-proto")]
pub use crate::exporter::http::HttpExporterBuilder;
#[cfg(feature = "grpc-tonic")]
Expand Down Expand Up @@ -347,6 +351,10 @@ pub enum Error {
/// The pipeline will need a exporter to complete setup. Throw this error if none is provided.
#[error("no exporter builder is provided, please provide one using with_exporter() method")]
NoExporterBuilder,

/// Unsupported compression algorithm.
#[error("unsupported compression algorithm '{0}'")]
UnsupportedCompressionAlgorithm(String),
}

#[cfg(feature = "grpc-tonic")]
Expand Down
13 changes: 11 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#[cfg(feature = "grpc-tonic")]
use {
crate::exporter::tonic::{TonicConfig, TonicExporterBuilder},
crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder},
opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient as TonicLogsServiceClient,
ExportLogsServiceRequest as TonicRequest,
Expand Down Expand Up @@ -57,6 +57,9 @@ use std::{
use opentelemetry_api::logs::{LogError, LoggerProvider};
use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";

impl OtlpPipeline {
/// Create a OTLP logging pipeline.
pub fn logging(self) -> OtlpLogPipeline {
Expand Down Expand Up @@ -232,10 +235,16 @@ impl LogExporter {
tonic_config: TonicConfig,
channel: tonic::transport::Channel,
) -> Result<Self, crate::Error> {
let mut log_exporter = TonicLogsServiceClient::new(channel);
if let Some(compression) =
resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_LOGS_COMPRESSION)?
{
log_exporter = log_exporter.send_compressed(compression);
}
Ok(LogExporter::Tonic {
timeout: config.timeout,
metadata: tonic_config.metadata,
log_exporter: TonicLogsServiceClient::new(channel),
log_exporter,
})
}

Expand Down
Loading