Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move processing of endpoints to UriEndpoint and SocketEndpoint #3950

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4771,7 +4771,6 @@ expression: "&schema"
},
"endpoint": {
"description": "The endpoint to send to",
"default": "default",
"type": "string"
}
},
Expand Down Expand Up @@ -4813,7 +4812,6 @@ expression: "&schema"
"properties": {
"endpoint": {
"description": "The endpoint to send to",
"default": "default",
"type": "string"
}
},
Expand Down Expand Up @@ -4949,8 +4947,7 @@ expression: "&schema"
"properties": {
"endpoint": {
"description": "The endpoint to send reports to",
"type": "string",
"format": "uri"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may follow up to reintroduce format into the schema.

Copy link
Contributor Author

@BrynCooke BrynCooke Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is a mistake. We accept default, which is not a URI, so this must just be a string.
We may move away from accepting default in future.

"type": "string"
},
"password": {
"description": "The optional password",
Expand Down Expand Up @@ -5336,6 +5333,9 @@ expression: "&schema"
"zipkin": {
"description": "Zipkin exporter configuration",
"type": "object",
"required": [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been required all along.

"endpoint"
],
"properties": {
"batch_processor": {
"description": "Batch processor configuration",
Expand Down Expand Up @@ -5395,7 +5395,6 @@ expression: "&schema"
},
"endpoint": {
"description": "The endpoint to send to",
"default": "default",
"type": "string"
}
},
Expand Down
298 changes: 298 additions & 0 deletions apollo-router/src/plugins/telemetry/endpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
use std::fmt::Formatter;
use std::net::SocketAddr;
use std::str::FromStr;

use http::uri::Authority;
use http::Uri;
use schemars::gen::SchemaGenerator;
use schemars::schema::Schema;
use schemars::JsonSchema;
use serde::de::Error;
use serde::de::Visitor;
use serde::Deserialize;
use serde::Deserializer;

#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub(crate) struct UriEndpoint {
uri: Option<Uri>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option is only needed to support the legacy format with default ? If yes, could you document it here please ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

impl UriEndpoint {
/// Converts an endpoint to a URI using the default endpoint as reference for any URI parts that are missing.
pub(crate) fn to_uri(&self, default_endpoint: &Uri) -> Option<Uri> {
self.uri.as_ref().map(|uri| {
let mut parts = uri.clone().into_parts();
if parts.scheme.is_none() {
parts.scheme = default_endpoint.scheme().cloned();
}

match (&parts.authority, default_endpoint.authority()) {
(None, Some(default_authority)) => {
parts.authority = Some(default_authority.clone());
}
(Some(authority), Some(default_authority)) => {
let host = if authority.host().is_empty() {
default_authority.host()
} else {
authority.host()
};
let port = if authority.port().is_none() {
default_authority.port()
} else {
authority.port()
};

if let Some(port) = port {
parts.authority = Some(
Authority::from_str(format!("{}:{}", host, port).as_str())
.expect("host and port must have come from a valid uri, qed"),
)
} else {
parts.authority = Some(
Authority::from_str(host)
.expect("host must have come from a valid uri, qed"),
);
}
}
_ => {}
}

if parts.path_and_query.is_none() {
parts.path_and_query = default_endpoint.path_and_query().cloned();
}

Uri::from_parts(parts)
.expect("uri cannot be invalid as it was constructed from existing parts")
})
}
}

impl<'de> Deserialize<'de> for UriEndpoint {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct EndpointVisitor;

impl<'de> Visitor<'de> for EndpointVisitor {
type Value = UriEndpoint;

fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a valid uri or 'default'")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
if v == "default" {
// This is a legacy of the old config format, where the 'default' was accepted.
// Users should just not set the endpoint if they want the default.
return Ok(UriEndpoint::default());
}
match Uri::from_str(v) {
Ok(uri) => Ok(UriEndpoint { uri: Some(uri) }),
Err(_) => Err(Error::custom(format!(
"invalid endpoint: {}. Expected a valid uri or 'default'",
v
))),
}
}
}

deserializer.deserialize_str(EndpointVisitor)
}
}

impl JsonSchema for UriEndpoint {
fn schema_name() -> String {
"UriEndpoint".to_string()
}

fn json_schema(gen: &mut SchemaGenerator) -> Schema {
gen.subschema_for::<String>()
}
}

impl From<Uri> for UriEndpoint {
fn from(uri: Uri) -> Self {
UriEndpoint { uri: Some(uri) }
}
}

#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub(crate) struct SocketEndpoint {
socket: Option<SocketAddr>,
}

impl SocketEndpoint {
pub(crate) fn to_socket(&self) -> Option<SocketAddr> {
self.socket
}
}

impl<'de> Deserialize<'de> for SocketEndpoint {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct EndpointVisitor;

impl<'de> Visitor<'de> for EndpointVisitor {
type Value = SocketEndpoint;

fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a valid uri or 'default'")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
if v == "default" {
// This is a legacy of the old config format, where the 'default' was accepted.
// Users should just not set the endpoint if they want the default.
return Ok(SocketEndpoint::default());
}
match SocketAddr::from_str(v) {
Ok(socket) => Ok(SocketEndpoint {
socket: Some(socket),
}),
Err(_) => Err(Error::custom(format!(
"invalid endpoint: {}. Expected a valid socket or 'default'",
v
))),
}
}
}

deserializer.deserialize_str(EndpointVisitor)
}
}

impl JsonSchema for SocketEndpoint {
fn schema_name() -> String {
"SocketEndpoint".to_string()
}

fn json_schema(gen: &mut SchemaGenerator) -> Schema {
gen.subschema_for::<String>()
}
}

impl From<SocketAddr> for SocketEndpoint {
fn from(socket: SocketAddr) -> Self {
SocketEndpoint {
socket: Some(socket),
}
}
}

#[cfg(test)]
mod test {
use std::net::SocketAddr;
use std::str::FromStr;

use http::Uri;

use crate::plugins::telemetry::endpoint::SocketEndpoint;
use crate::plugins::telemetry::endpoint::UriEndpoint;

#[test]
fn test_parse_uri_default() {
let endpoint = serde_yaml::from_str::<UriEndpoint>("default").unwrap();
assert_eq!(endpoint, UriEndpoint::default());
}
#[test]
fn test_parse_uri() {
let endpoint = serde_yaml::from_str::<UriEndpoint>("http://example.com:2000/path").unwrap();
assert_eq!(
endpoint,
Uri::from_static("http://example.com:2000/path").into()
);
}

#[test]
fn test_parse_uri_error() {
let error = serde_yaml::from_str::<UriEndpoint>("example.com:2000/path")
.expect_err("expected error");
assert_eq!(error.to_string(), "invalid endpoint: example.com:2000/path. Expected a valid uri or 'default' at line 1 column 1");
}

#[test]
fn test_to_url() {
assert_eq!(
UriEndpoint::from(Uri::from_static("example.com"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:9411/path2")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("example.com:2000"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:2000/path2")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("http://example.com:2000/"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:2000/")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("http://example.com:2000/path1"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:2000/path1")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("http://example.com:2000"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:2000")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("http://example.com/path1"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://example.com:9411/path1")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("http://:2000/path1"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://localhost:2000/path1")
);
assert_eq!(
UriEndpoint::from(Uri::from_static("/path1"))
.to_uri(&Uri::from_static("http://localhost:9411/path2"))
.unwrap(),
Uri::from_static("http://localhost:9411/path1")
);
}

#[test]
fn test_parse_socket_default() {
let endpoint = serde_yaml::from_str::<SocketEndpoint>("default").unwrap();
assert_eq!(endpoint, SocketEndpoint::default());
}
#[test]
fn test_parse_socket() {
let endpoint = serde_yaml::from_str::<SocketEndpoint>("127.0.0.1:8000").unwrap();
assert_eq!(
endpoint,
SocketAddr::from_str("127.0.0.1:8000").unwrap().into()
);
}

#[test]
fn test_parse_socket_error() {
let error = serde_yaml::from_str::<SocketEndpoint>("example.com:2000/path")
.expect_err("expected error");
assert_eq!(error.to_string(), "invalid endpoint: example.com:2000/path. Expected a valid socket or 'default' at line 1 column 1");
}

#[test]
fn test_to_socket() {
assert_eq!(
SocketEndpoint::from(SocketAddr::from_str("127.0.0.1:8000").unwrap())
.to_socket()
.unwrap(),
SocketAddr::from_str("127.0.0.1:8000").unwrap()
);
}
}
1 change: 1 addition & 0 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ use crate::ListenAddr;
pub(crate) mod apollo;
pub(crate) mod apollo_exporter;
pub(crate) mod config;
mod endpoint;
pub(crate) mod formatters;
pub(crate) mod metrics;
mod otlp;
Expand Down
Loading