diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 1b16fd8ef0e..b6abe5209a6 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -4771,7 +4771,6 @@ expression: "&schema" }, "endpoint": { "description": "The endpoint to send to", - "default": "default", "type": "string" } }, @@ -4813,7 +4812,6 @@ expression: "&schema" "properties": { "endpoint": { "description": "The endpoint to send to", - "default": "default", "type": "string" } }, @@ -4949,8 +4947,7 @@ expression: "&schema" "properties": { "endpoint": { "description": "The endpoint to send reports to", - "type": "string", - "format": "uri" + "type": "string" }, "password": { "description": "The optional password", @@ -5336,6 +5333,9 @@ expression: "&schema" "zipkin": { "description": "Zipkin exporter configuration", "type": "object", + "required": [ + "endpoint" + ], "properties": { "batch_processor": { "description": "Batch processor configuration", @@ -5395,7 +5395,6 @@ expression: "&schema" }, "endpoint": { "description": "The endpoint to send to", - "default": "default", "type": "string" } }, diff --git a/apollo-router/src/plugins/telemetry/endpoint.rs b/apollo-router/src/plugins/telemetry/endpoint.rs new file mode 100644 index 00000000000..de25bcb006b --- /dev/null +++ b/apollo-router/src/plugins/telemetry/endpoint.rs @@ -0,0 +1,275 @@ +use std::fmt::Formatter; +use std::net::SocketAddr; +use std::str::FromStr; + +use http::uri::Authority; +use http::Uri; +use schemars::gen::SchemaGenerator; +use schemars::schema::Schema; +use schemars::JsonSchema; +use serde::de::Error; +use serde::de::Visitor; +use serde::Deserialize; +use serde::Deserializer; + +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub(crate) struct UriEndpoint { + uri: Option, +} + +impl UriEndpoint { + pub(crate) fn to_uri(&self, default_endpoint: &Uri) -> Option { + self.uri.as_ref().map(|uri| { + let mut parts = uri.clone().into_parts(); + if parts.scheme.is_none() { + parts.scheme = default_endpoint.scheme().cloned(); + } + + match (&parts.authority, default_endpoint.authority()) { + (None, Some(default_authority)) => { + parts.authority = Some(default_authority.clone()); + } + (Some(authority), Some(default_authority)) => { + let host = if authority.host().is_empty() { + default_authority.host() + } else { + authority.host() + }; + let port = if authority.port().is_none() { + default_authority.port() + } else { + authority.port() + }; + + if let Some(port) = port { + parts.authority = Some( + Authority::from_str(format!("{}:{}", host, port).as_str()) + .expect("host and port must have come from a valid uri, qed"), + ) + } else { + parts.authority = Some( + Authority::from_str(host) + .expect("host must have come from a valid uri, qed"), + ); + } + } + _ => {} + } + + Uri::from_parts(parts) + .expect("uri cannot be invalid as it was constructed from existing parts") + }) + } +} + +impl<'de> Deserialize<'de> for UriEndpoint { + fn deserialize>(deserializer: D) -> Result { + struct EndpointVisitor; + + impl<'de> Visitor<'de> for EndpointVisitor { + type Value = UriEndpoint; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("a valid uri or 'default'") + } + + fn visit_str(self, v: &str) -> Result + where + E: Error, + { + if v == "default" { + // This is a legacy of the old config format, where the 'default' was accepted. + // Users should just not set the endpoint if they want the default. + return Ok(UriEndpoint::default()); + } + match Uri::from_str(v) { + Ok(uri) => Ok(UriEndpoint { uri: Some(uri) }), + Err(_) => Err(Error::custom(format!( + "invalid endpoint: {}. Expected a valid uri or 'default'", + v + ))), + } + } + } + + deserializer.deserialize_str(EndpointVisitor) + } +} + +impl JsonSchema for UriEndpoint { + fn schema_name() -> String { + "UriEndpoint".to_string() + } + + fn json_schema(gen: &mut SchemaGenerator) -> Schema { + gen.subschema_for::() + } +} + +impl From for UriEndpoint { + fn from(uri: Uri) -> Self { + UriEndpoint { uri: Some(uri) } + } +} + +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub(crate) struct SocketEndpoint { + socket: Option, +} + +impl SocketEndpoint { + pub(crate) fn to_socket(&self) -> Option { + self.socket + } +} + +impl<'de> Deserialize<'de> for SocketEndpoint { + fn deserialize>(deserializer: D) -> Result { + struct EndpointVisitor; + + impl<'de> Visitor<'de> for EndpointVisitor { + type Value = SocketEndpoint; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("a valid uri or 'default'") + } + + fn visit_str(self, v: &str) -> Result + where + E: Error, + { + if v == "default" { + // This is a legacy of the old config format, where the 'default' was accepted. + // Users should just not set the endpoint if they want the default. + return Ok(SocketEndpoint::default()); + } + match SocketAddr::from_str(v) { + Ok(socket) => Ok(SocketEndpoint { + socket: Some(socket), + }), + Err(_) => Err(Error::custom(format!( + "invalid endpoint: {}. Expected a valid socket or 'default'", + v + ))), + } + } + } + + deserializer.deserialize_str(EndpointVisitor) + } +} + +impl JsonSchema for SocketEndpoint { + fn schema_name() -> String { + "SocketEndpoint".to_string() + } + + fn json_schema(gen: &mut SchemaGenerator) -> Schema { + gen.subschema_for::() + } +} + +impl From for SocketEndpoint { + fn from(socket: SocketAddr) -> Self { + SocketEndpoint { + socket: Some(socket), + } + } +} + +#[cfg(test)] +mod test { + use std::net::SocketAddr; + use std::str::FromStr; + + use http::Uri; + + use crate::plugins::telemetry::endpoint::SocketEndpoint; + use crate::plugins::telemetry::endpoint::UriEndpoint; + + #[test] + fn test_parse_uri_default() { + let endpoint = serde_yaml::from_str::("default").unwrap(); + assert_eq!(endpoint, UriEndpoint::default()); + } + #[test] + fn test_parse_uri() { + let endpoint = serde_yaml::from_str::("http://example.com:2000/path").unwrap(); + assert_eq!( + endpoint, + Uri::from_static("http://example.com:2000/path").into() + ); + } + + #[test] + fn test_parse_uri_error() { + let error = serde_yaml::from_str::("example.com:2000/path") + .expect_err("expected error"); + assert_eq!(error.to_string(), "invalid endpoint: example.com:2000/path. Expected a valid uri or 'default' at line 1 column 1"); + } + + #[test] + fn test_to_url() { + assert_eq!( + UriEndpoint::from(Uri::from_static("http://example.com:2000/path1")) + .to_uri(&Uri::from_static("http://localhost:9411/path2")) + .unwrap(), + Uri::from_static("http://example.com:2000/path1") + ); + assert_eq!( + UriEndpoint::from(Uri::from_static("http://example.com:2000")) + .to_uri(&Uri::from_static("http://localhost:9411/path2")) + .unwrap(), + Uri::from_static("http://example.com:2000") + ); + assert_eq!( + UriEndpoint::from(Uri::from_static("http://example.com/path1")) + .to_uri(&Uri::from_static("http://localhost:9411/path2")) + .unwrap(), + Uri::from_static("http://example.com:9411/path1") + ); + assert_eq!( + UriEndpoint::from(Uri::from_static("http://:2000/path1")) + .to_uri(&Uri::from_static("http://localhost:9411/path2")) + .unwrap(), + Uri::from_static("http://localhost:2000/path1") + ); + assert_eq!( + UriEndpoint::from(Uri::from_static("/path1")) + .to_uri(&Uri::from_static("http://localhost:9411/path2")) + .unwrap(), + Uri::from_static("http://localhost:9411/path1") + ); + } + + #[test] + fn test_parse_socket_default() { + let endpoint = serde_yaml::from_str::("default").unwrap(); + assert_eq!(endpoint, SocketEndpoint::default()); + } + #[test] + fn test_parse_socket() { + let endpoint = serde_yaml::from_str::("127.0.0.1:8000").unwrap(); + assert_eq!( + endpoint, + SocketAddr::from_str("127.0.0.1:8000").unwrap().into() + ); + } + + #[test] + fn test_parse_socket_error() { + let error = serde_yaml::from_str::("example.com:2000/path") + .expect_err("expected error"); + assert_eq!(error.to_string(), "invalid endpoint: example.com:2000/path. Expected a valid socket or 'default' at line 1 column 1"); + } + + #[test] + fn test_to_socket() { + assert_eq!( + SocketEndpoint::from(SocketAddr::from_str("127.0.0.1:8000").unwrap()) + .to_socket() + .unwrap(), + SocketAddr::from_str("127.0.0.1:8000").unwrap() + ); + } +} diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 9a35aeabeee..afba0ca250a 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -127,6 +127,7 @@ use crate::ListenAddr; pub(crate) mod apollo; pub(crate) mod apollo_exporter; pub(crate) mod config; +mod endpoint; pub(crate) mod formatters; pub(crate) mod metrics; mod otlp; diff --git a/apollo-router/src/plugins/telemetry/otlp.rs b/apollo-router/src/plugins/telemetry/otlp.rs index 28df4095852..69ccfa52585 100644 --- a/apollo-router/src/plugins/telemetry/otlp.rs +++ b/apollo-router/src/plugins/telemetry/otlp.rs @@ -1,8 +1,10 @@ //! Shared configuration for Otlp tracing and metrics. use std::collections::HashMap; +use http::Uri; use indexmap::map::Entry; use indexmap::IndexMap; +use lazy_static::lazy_static; use opentelemetry::sdk::metrics::reader::TemporalitySelector; use opentelemetry::sdk::metrics::InstrumentKind; use opentelemetry_otlp::HttpExporterBuilder; @@ -10,7 +12,6 @@ use opentelemetry_otlp::TonicExporterBuilder; use opentelemetry_otlp::WithExportConfig; use schemars::JsonSchema; use serde::Deserialize; -use serde::Deserializer; use serde::Serialize; use serde_json::Value; use tonic::metadata::MetadataMap; @@ -21,16 +22,19 @@ use tower::BoxError; use url::Url; use crate::plugins::telemetry::config::GenericWith; -use crate::plugins::telemetry::tracing::parse_url_for_endpoint; +use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::tracing::BatchProcessorConfig; -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +lazy_static! { + static ref DEFAULT_GRPC_ENDPOINT: Uri = Uri::from_static("http://127.0.0.1:4317"); + static ref DEFAULT_HTTP_ENDPOINT: Uri = Uri::from_static("http://127.0.0.1:4318"); +} + +#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] #[serde(deny_unknown_fields)] pub(crate) struct Config { /// The endpoint to send data to - #[serde(deserialize_with = "deser_endpoint")] - #[schemars(with = "String")] - pub(crate) endpoint: Endpoint, + pub(crate) endpoint: UriEndpoint, /// The protocol to use when sending data #[serde(default)] @@ -58,28 +62,17 @@ impl Config { pub(crate) fn exporter + From>( &self, ) -> Result { - let endpoint = match (self.endpoint.clone(), &self.protocol) { - // # https://github.com/apollographql/router/issues/2036 - // Opentelemetry rust incorrectly defaults to https - // This will override the defaults to that of the spec - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md - (Endpoint::Default(_), Protocol::Http) => { - Url::parse("http://localhost:4318").expect("default url is valid") - } - // Default is GRPC - (Endpoint::Default(_), Protocol::Grpc) => { - Url::parse("http://localhost:4317").expect("default url is valid") - } - (Endpoint::Url(s), _) => s, - }; match self.protocol { Protocol::Grpc => { + let endpoint = self.endpoint.to_uri(&DEFAULT_GRPC_ENDPOINT); let grpc = self.grpc.clone(); let exporter = opentelemetry_otlp::new_exporter() .tonic() .with_env() .with_timeout(self.batch_processor.max_export_timeout) - .with_endpoint(endpoint.as_str()) + .with(&endpoint, |b, endpoint| { + b.with_endpoint(endpoint.to_string()) + }) .with(&grpc.try_from(&endpoint)?, |b, t| { b.with_tls_config(t.clone()) }) @@ -88,12 +81,15 @@ impl Config { Ok(exporter) } Protocol::Http => { + let endpoint = self.endpoint.to_uri(&DEFAULT_HTTP_ENDPOINT); let http = self.http.clone(); let exporter = opentelemetry_otlp::new_exporter() .http() .with_env() .with_timeout(self.batch_processor.max_export_timeout) - .with_endpoint(endpoint.as_str()) + .with(&endpoint, |b, endpoint| { + b.with_endpoint(endpoint.to_string()) + }) .with_headers(http.headers) .into(); @@ -103,33 +99,6 @@ impl Config { } } -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields, rename_all = "snake_case", untagged)] -pub(crate) enum Endpoint { - Default(EndpointDefault), - Url(Url), -} - -fn deser_endpoint<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - if s == "default" { - return Ok(Endpoint::Default(EndpointDefault::Default)); - } - - let url = parse_url_for_endpoint(s).map_err(serde::de::Error::custom)?; - - Ok(Endpoint::Url(url)) -} - -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields, rename_all = "snake_case")] -pub(crate) enum EndpointDefault { - Default, -} - #[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] #[serde(deny_unknown_fields, default)] pub(crate) struct HttpExporter { @@ -165,25 +134,36 @@ fn header_map(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Sch impl GrpcExporter { // Return a TlsConfig if it has something actually set. - pub(crate) fn try_from(self, endpoint: &Url) -> Result, BoxError> { - let domain_name = self.default_tls_domain(endpoint); - - if self.ca.is_some() || self.key.is_some() || self.cert.is_some() || domain_name.is_some() { - Some( - ClientTlsConfig::new() - .with(&domain_name, |b, d| b.domain_name(*d)) - .try_with(&self.ca, |b, c| { - Ok(b.ca_certificate(Certificate::from_pem(c))) - })? - .try_with( - &self.cert.clone().zip(self.key.clone()), - |b, (cert, key)| Ok(b.identity(Identity::from_pem(cert, key))), - ), - ) - .transpose() - } else { - Ok(None) + pub(crate) fn try_from( + self, + endpoint: &Option, + ) -> Result, BoxError> { + if let Some(endpoint) = endpoint { + let endpoint = endpoint.to_string().parse::().map_err(|e| { + BoxError::from(format!("invalid GRPC endpoint {}, {}", endpoint, e)) + })?; + let domain_name = self.default_tls_domain(&endpoint); + + if self.ca.is_some() + || self.key.is_some() + || self.cert.is_some() + || domain_name.is_some() + { + return Some( + ClientTlsConfig::new() + .with(&domain_name, |b, d| b.domain_name(*d)) + .try_with(&self.ca, |b, c| { + Ok(b.ca_certificate(Certificate::from_pem(c))) + })? + .try_with( + &self.cert.clone().zip(self.key.clone()), + |b, (cert, key)| Ok(b.identity(Identity::from_pem(cert, key))), + ), + ) + .transpose(); + } } + Ok(None) } fn default_tls_domain<'a>(&'a self, endpoint: &'a Url) -> Option<&'a str> { @@ -317,30 +297,6 @@ mod metadata_map_serde { mod tests { use super::*; - #[test] - fn endpoint_configuration() { - let config: Config = serde_yaml::from_str("endpoint: default").unwrap(); - assert_eq!(config.endpoint, Endpoint::Default(EndpointDefault::Default)); - - let config: Config = serde_yaml::from_str("endpoint: collector:1234").unwrap(); - assert_eq!( - config.endpoint, - Endpoint::Url(Url::parse("http://collector:1234").unwrap()) - ); - - let config: Config = serde_yaml::from_str("endpoint: https://collector:1234").unwrap(); - assert_eq!( - config.endpoint, - Endpoint::Url(Url::parse("https://collector:1234").unwrap()) - ); - - let config: Config = serde_yaml::from_str("endpoint: 127.0.0.1:1234").unwrap(); - assert_eq!( - config.endpoint, - Endpoint::Url(Url::parse("http://127.0.0.1:1234").unwrap()) - ); - } - #[test] fn endpoint_grpc_defaulting_no_scheme() { let url = Url::parse("api.apm.com:433").unwrap(); diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog.rs b/apollo-router/src/plugins/telemetry/tracing/datadog.rs index 4d4cddbc29d..4142ccc7636 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; +use http::Uri; use lazy_static::lazy_static; use opentelemetry::sdk; use opentelemetry::sdk::trace::BatchSpanProcessor; @@ -12,14 +13,11 @@ use opentelemetry_semantic_conventions::resource::SERVICE_NAME; use opentelemetry_semantic_conventions::resource::SERVICE_VERSION; use schemars::JsonSchema; use serde::Deserialize; -use serde::Serialize; use tower::BoxError; -use super::agent_endpoint; -use super::deser_endpoint; -use super::AgentEndpoint; use crate::plugins::telemetry::config::GenericWith; use crate::plugins::telemetry::config::Trace; +use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -34,15 +32,14 @@ lazy_static! { map.insert("subgraph_request", "graphql.operation.name"); map }; + static ref DEFAULT_ENDPOINT: Uri = Uri::from_static("http://localhost:8126/v0.4/traces"); } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] #[serde(deny_unknown_fields)] pub(crate) struct Config { /// The endpoint to send to - #[serde(deserialize_with = "deser_endpoint")] - #[schemars(schema_with = "agent_endpoint")] - pub(crate) endpoint: AgentEndpoint, + pub(crate) endpoint: UriEndpoint, /// batch processor configuration #[serde(default)] @@ -56,14 +53,10 @@ pub(crate) struct Config { impl TracingConfigurator for Config { fn apply(&self, builder: Builder, trace: &Trace) -> Result { tracing::info!("Configuring Datadog tracing: {}", self.batch_processor); - let url = match &self.endpoint { - AgentEndpoint::Default(_) => None, - AgentEndpoint::Url(s) => Some(s), - }; let enable_span_mapping = self.enable_span_mapping.then_some(true); let trace_config: sdk::trace::Config = trace.into(); let exporter = opentelemetry_datadog::new_pipeline() - .with(&url, |builder, e| { + .with(&self.endpoint.to_uri(&DEFAULT_ENDPOINT), |builder, e| { builder.with_agent_endpoint(e.to_string().trim_end_matches('/')) }) .with(&enable_span_mapping, |builder, _e| { @@ -105,32 +98,3 @@ impl TracingConfigurator for Config { )) } } - -#[cfg(test)] -mod tests { - use reqwest::Url; - - use super::*; - use crate::plugins::telemetry::tracing::AgentDefault; - - #[test] - fn endpoint_configuration() { - let config: Config = serde_yaml::from_str("endpoint: default").unwrap(); - assert_eq!( - AgentEndpoint::Default(AgentDefault::Default), - config.endpoint - ); - - let config: Config = serde_yaml::from_str("endpoint: collector:1234").unwrap(); - assert_eq!( - AgentEndpoint::Url(Url::parse("http://collector:1234").unwrap()), - config.endpoint - ); - - let config: Config = serde_yaml::from_str("endpoint: https://collector:1234").unwrap(); - assert_eq!( - AgentEndpoint::Url(Url::parse("https://collector:1234").unwrap()), - config.endpoint - ); - } -} diff --git a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs index ac123a0e6cc..d7106561f54 100644 --- a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs +++ b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs @@ -1,25 +1,27 @@ //! Configuration for jaeger tracing. use std::fmt::Debug; +use http::Uri; +use lazy_static::lazy_static; use opentelemetry::runtime; use opentelemetry::sdk::trace::BatchSpanProcessor; use opentelemetry::sdk::trace::Builder; use schemars::JsonSchema; use serde::Deserialize; -use serde::Serialize; use tower::BoxError; -use url::Url; -use super::agent_endpoint; -use super::deser_endpoint; -use super::AgentEndpoint; use crate::plugins::telemetry::config::GenericWith; use crate::plugins::telemetry::config::Trace; +use crate::plugins::telemetry::endpoint::SocketEndpoint; +use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +lazy_static! { + static ref DEFAULT_ENDPOINT: Uri = Uri::from_static("http://localhost:14268/api/traces"); +} +#[derive(Debug, Clone, Deserialize, JsonSchema)] #[serde(deny_unknown_fields, untagged)] pub(crate) enum Config { Agent { @@ -40,20 +42,18 @@ pub(crate) enum Config { }, } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema)] #[serde(deny_unknown_fields)] pub(crate) struct AgentConfig { /// The endpoint to send to - #[schemars(schema_with = "agent_endpoint")] - #[serde(deserialize_with = "deser_endpoint")] - endpoint: AgentEndpoint, + endpoint: SocketEndpoint, } -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, JsonSchema)] #[serde(deny_unknown_fields)] pub(crate) struct CollectorConfig { /// The endpoint to send reports to - endpoint: Url, + endpoint: UriEndpoint, /// The optional username username: Option, /// The optional password @@ -68,20 +68,10 @@ impl TracingConfigurator for Config { batch_processor, } => { tracing::info!("Configuring Jaeger tracing: {}", batch_processor); - let socket = match &agent.endpoint { - AgentEndpoint::Default(_) => None, - AgentEndpoint::Url(u) => { - let socket_addr = u - .socket_addrs(|| None)? - .pop() - .ok_or_else(|| format!("cannot resolve url ({u}) for jaeger agent"))?; - Some(socket_addr) - } - }; let exporter = opentelemetry_jaeger::new_agent_pipeline() .with_trace_config(trace_config.into()) .with_service_name(trace_config.service_name.clone()) - .with(&socket, |b, s| b.with_endpoint(s)) + .with(&agent.endpoint.to_socket(), |b, s| b.with_endpoint(s)) .build_async_agent_exporter(opentelemetry::runtime::Tokio)?; Ok(builder.with_span_processor( BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) @@ -95,14 +85,14 @@ impl TracingConfigurator for Config { batch_processor, } => { tracing::info!("Configuring Jaeger tracing: {}", batch_processor); - // We are waiting for a release of https://github.com/open-telemetry/opentelemetry-rust/issues/894 - // Until that time we need to wrap a tracer provider with Jeager in. let exporter = opentelemetry_jaeger::new_collector_pipeline() .with_trace_config(trace_config.into()) .with_service_name(trace_config.service_name.clone()) .with(&collector.username, |b, u| b.with_username(u)) .with(&collector.password, |b, p| b.with_password(p)) - .with_endpoint(&collector.endpoint.to_string()) + .with(&collector.endpoint.to_uri(&DEFAULT_ENDPOINT), |b, p| { + b.with_endpoint(p.to_string()) + }) .with_reqwest() .with_batch_processor_config(batch_processor.clone().into()) .build_collector_exporter::()?; diff --git a/apollo-router/src/plugins/telemetry/tracing/mod.rs b/apollo-router/src/plugins/telemetry/tracing/mod.rs index efdf05ef146..4f9ebd21a3c 100644 --- a/apollo-router/src/plugins/telemetry/tracing/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/mod.rs @@ -11,14 +11,10 @@ use opentelemetry::sdk::trace::SpanProcessor; use opentelemetry::trace::TraceResult; use opentelemetry::Context; use opentelemetry::KeyValue; -use reqwest::Url; use schemars::JsonSchema; use serde::Deserialize; -use serde::Deserializer; use serde::Serialize; -use serde_json::Value; use tower::BoxError; -use url::ParseError; use crate::plugins::telemetry::config::Trace; @@ -34,68 +30,6 @@ pub(crate) trait TracingConfigurator { fn apply(&self, builder: Builder, trace_config: &Trace) -> Result; } -schemar_fn!( - agent_endpoint, - String, - Some(Value::String("default".to_string())), - "The agent endpoint to send reports to" -); -/// The endpoint to send reports to -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields, rename_all = "snake_case", untagged)] -pub(crate) enum AgentEndpoint { - /// The default agent endpoint - Default(AgentDefault), - /// A custom URL endpoint - Url(Url), -} - -/// The default agent endpoint -#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, JsonSchema)] -#[serde(deny_unknown_fields, rename_all = "snake_case")] -pub(crate) enum AgentDefault { - /// The default agent endpoint - Default, -} - -pub(crate) fn parse_url_for_endpoint(mut s: String) -> Result { - match Url::parse(&s) { - Ok(url) => { - // support the case of 'collector:4317' where url parses 'collector' - // as the scheme instead of the host - if url.host().is_none() && (url.scheme() != "http" || url.scheme() != "https") { - s = format!("http://{s}"); - Url::parse(&s) - } else { - Ok(url) - } - } - Err(err) => { - match err { - // support the case of '127.0.0.1:4317' where url is interpreted - // as a relative url without a base - ParseError::RelativeUrlWithoutBase => { - s = format!("http://{s}"); - Url::parse(&s) - } - _ => Err(err), - } - } - } -} - -pub(crate) fn deser_endpoint<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let s = String::deserialize(deserializer)?; - if s == "default" { - return Ok(AgentEndpoint::Default(AgentDefault::Default)); - } - let url = parse_url_for_endpoint(s).map_err(serde::de::Error::custom)?; - Ok(AgentEndpoint::Url(url)) -} - #[derive(Debug)] struct ApolloFilterSpanProcessor { delegate: T, diff --git a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs index 935f4fc10e2..da5d97cbdd0 100644 --- a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs +++ b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs @@ -1,70 +1,42 @@ //! Configuration for zipkin tracing. +use http::Uri; +use lazy_static::lazy_static; use opentelemetry::sdk::trace::BatchSpanProcessor; use opentelemetry::sdk::trace::Builder; use schemars::JsonSchema; use serde::Deserialize; -use serde::Deserializer; -use serde::Serialize; use tower::BoxError; -use url::Url; -use super::AgentDefault; -use super::AgentEndpoint; use crate::plugins::telemetry::config::GenericWith; use crate::plugins::telemetry::config::Trace; +use crate::plugins::telemetry::endpoint::UriEndpoint; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +lazy_static! { + static ref DEFAULT_ENDPOINT: Uri = Uri::from_static("http://localhost:9411/api/v2/spans"); +} + +#[derive(Debug, Clone, Deserialize, JsonSchema, Default)] #[serde(deny_unknown_fields)] pub(crate) struct Config { /// The endpoint to send to - #[schemars(with = "String", default = "default_agent_endpoint")] - #[serde(deserialize_with = "deser_endpoint")] - pub(crate) endpoint: AgentEndpoint, + pub(crate) endpoint: UriEndpoint, /// Batch processor configuration #[serde(default)] pub(crate) batch_processor: BatchProcessorConfig, } -const fn default_agent_endpoint() -> &'static str { - "default" -} - -pub(crate) fn deser_endpoint<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let mut s = String::deserialize(deserializer)?; - if s == "default" { - return Ok(AgentEndpoint::Default(AgentDefault::Default)); - } - let mut url = Url::parse(&s).map_err(serde::de::Error::custom)?; - - // support the case of 'collector:4317' where url parses 'collector' - // as the scheme instead of the host - if url.host().is_none() && (url.scheme() != "http" || url.scheme() != "https") { - s = format!("http://{s}/api/v2/spans"); - - url = Url::parse(&s).map_err(serde::de::Error::custom)?; - } - Ok(AgentEndpoint::Url(url)) -} - impl TracingConfigurator for Config { fn apply(&self, builder: Builder, trace_config: &Trace) -> Result { tracing::info!("configuring Zipkin tracing: {}", self.batch_processor); - let collector_endpoint = match &self.endpoint { - AgentEndpoint::Default(_) => None, - AgentEndpoint::Url(url) => Some(url), - }; let exporter = opentelemetry_zipkin::new_pipeline() .with_trace_config(trace_config.into()) .with_service_name(trace_config.service_name.clone()) - .with(&collector_endpoint, |b, endpoint| { + .with(&self.endpoint.to_uri(&DEFAULT_ENDPOINT), |b, endpoint| { b.with_collector_endpoint(endpoint.to_string()) }) .init_exporter()?;