Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fewer header manipulations and allocations #3844

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions apollo-router/src/http_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use bytes::Bytes;
use http::header;
use http::header::HeaderName;
use http::HeaderValue;
use mime::APPLICATION_JSON;
use multimap::MultiMap;

use crate::graphql;
use crate::services::APPLICATION_JSON_HEADER_VALUE;

/// Delayed-fallibility wrapper for conversion to [`http::header::HeaderName`].
///
Expand Down Expand Up @@ -441,10 +441,9 @@ impl IntoResponse for Response<graphql::Response> {
let (mut parts, body) = http::Response::from(self).into_parts();
let json_body_bytes =
Bytes::from(serde_json::to_vec(&body).expect("body should be serializable; qed"));
parts.headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static(APPLICATION_JSON.essence_str()),
);
parts
.headers
.insert(header::CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());

axum::response::Response::from_parts(parts, boxed(http_body::Full::new(json_body_bytes)))
}
Expand Down
63 changes: 31 additions & 32 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub(crate) const LOGGING_DISPLAY_BODY: &str = "apollo_telemetry::logging::displa
const DEFAULT_SERVICE_NAME: &str = "apollo-router";
const GLOBAL_TRACER_NAME: &str = "apollo-router";
const DEFAULT_EXPOSE_TRACE_ID_HEADER: &str = "apollo-trace-id";
static DEFAULT_EXPOSE_TRACE_ID_HEADER_NAME: HeaderName =
HeaderName::from_static(DEFAULT_EXPOSE_TRACE_ID_HEADER);
static FTV1_HEADER_NAME: HeaderName = HeaderName::from_static("apollo-federation-include-trace");
static FTV1_HEADER_VALUE: HeaderValue = HeaderValue::from_static("ftv1");

#[doc(hidden)] // Only public for integration tests
pub(crate) struct Telemetry {
Expand Down Expand Up @@ -316,21 +320,21 @@ impl Plugin for Telemetry {
.unwrap_or_default();
let router_request = &request.router_request;
let headers = router_request.headers();
let client_name = headers
let client_name: &str = headers
.get(&apollo.client_name_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""));
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let client_version = headers
.get(&apollo.client_version_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""));
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let span = ::tracing::info_span!(ROUTER_SPAN_NAME,
"http.method" = %router_request.method(),
"http.route" = %router_request.uri(),
"http.flavor" = ?router_request.version(),
"trace_id" = %trace_id,
"client.name" = client_name.to_str().unwrap_or_default(),
"client.version" = client_version.to_str().unwrap_or_default(),
"client.name" = client_name,
"client.version" = client_version,
"otel.kind" = "INTERNAL",
"otel.status_code" = ::tracing::field::Empty,
"apollo_private.duration_ns" = ::tracing::field::Empty,
Expand Down Expand Up @@ -412,7 +416,7 @@ impl Plugin for Telemetry {
t.response_trace_id
.header_name
.clone()
.unwrap_or(HeaderName::from_static(DEFAULT_EXPOSE_TRACE_ID_HEADER))
.unwrap_or_else(||DEFAULT_EXPOSE_TRACE_ID_HEADER_NAME.clone())
})
});
if let (Some(header_name), Some(trace_id)) = (
Expand Down Expand Up @@ -942,26 +946,22 @@ impl Telemetry {
let headers = http_request.headers();
let client_name_header = &apollo_config.client_name_header;
let client_version_header = &apollo_config.client_version_header;
let _ = context.insert(
CLIENT_NAME,
headers
.get(client_name_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""))
.to_str()
.unwrap_or_default()
.to_string(),
);
let _ = context.insert(
CLIENT_VERSION,
headers
.get(client_version_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""))
.to_str()
.unwrap_or_default()
.to_string(),
);
if let Some(name) = headers
.get(client_name_header)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_owned())
{
let _ = context.insert(CLIENT_NAME, name);
}

if let Some(version) = headers
.get(client_version_header)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_owned())
{
let _ = context.insert(CLIENT_VERSION, version);
}

let (should_log_headers, should_log_body) = config.logging.should_log(req);
if should_log_headers {
::tracing::info!(http.request.headers = ?req.supergraph_request.headers(), "Supergraph request headers");
Expand Down Expand Up @@ -1896,10 +1896,9 @@ fn request_ftv1(mut req: SubgraphRequest) -> SubgraphRequest {
.contains_key::<EnableSubgraphFtv1>()
&& Span::current().context().span().span_context().is_sampled()
{
req.subgraph_request.headers_mut().insert(
"apollo-federation-include-trace",
HeaderValue::from_static("ftv1"),
);
req.subgraph_request
.headers_mut()
.insert(FTV1_HEADER_NAME.clone(), FTV1_HEADER_VALUE.clone());
}
req
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl Exporter {
SUBGRAPH_SPAN_NAME => {
let subgraph_name = span
.attributes
.get(&Key::from_static_str("apollo.subgraph.name"))
.get(&SUBGRAPH_NAME)
.and_then(extract_string)
.unwrap_or_default();
let error_configuration = self
Expand Down
6 changes: 2 additions & 4 deletions apollo-router/src/services/layers/apq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;

const DONT_CACHE_RESPONSE_VALUE: &str = "private, no-cache, must-revalidate";
static DONT_CACHE_HEADER_VALUE: HeaderValue = HeaderValue::from_static(DONT_CACHE_RESPONSE_VALUE);

/// A persisted query.
#[derive(Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -134,10 +135,7 @@ async fn apq_request(
// Persisted query errors (especially "not found") need to be uncached, because
// hopefully we're about to fill in the APQ cache and the same request will
// succeed next time.
.header(
CACHE_CONTROL,
HeaderValue::from_static(DONT_CACHE_RESPONSE_VALUE),
)
.header(CACHE_CONTROL, DONT_CACHE_HEADER_VALUE.clone())
.context(request.context)
.build()
.expect("response is valid");
Expand Down
27 changes: 14 additions & 13 deletions apollo-router/src/services/layers/content_negotiation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::ops::ControlFlow;
use http::header::ACCEPT;
use http::header::CONTENT_TYPE;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use http::StatusCode;
use mediatype::names::APPLICATION;
Expand All @@ -24,7 +23,10 @@ use crate::layers::sync_checkpoint::CheckpointService;
use crate::layers::ServiceExt as _;
use crate::services::router;
use crate::services::router::ClientRequestAccepts;
use crate::services::router_service::MULTIPART_DEFER_HEADER_VALUE;
use crate::services::router_service::MULTIPART_SUBSCRIPTION_HEADER_VALUE;
use crate::services::supergraph;
use crate::services::APPLICATION_JSON_HEADER_VALUE;
use crate::services::MULTIPART_DEFER_CONTENT_TYPE;
use crate::services::MULTIPART_DEFER_SPEC_PARAMETER;
use crate::services::MULTIPART_DEFER_SPEC_VALUE;
Expand Down Expand Up @@ -138,20 +140,17 @@ where
.unwrap_or_default();

if !res.has_next.unwrap_or_default() && (accepts_json || accepts_wildcard) {
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(APPLICATION_JSON.essence_str()),
);
parts
.headers
.insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());
} else if accepts_multipart_defer {
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE),
);
parts
.headers
.insert(CONTENT_TYPE, MULTIPART_DEFER_HEADER_VALUE.clone());
} else if accepts_multipart_subscription {
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(MULTIPART_SUBSCRIPTION_CONTENT_TYPE),
);
parts
.headers
.insert(CONTENT_TYPE, MULTIPART_SUBSCRIPTION_HEADER_VALUE.clone());
}
(parts, res)
})
Expand Down Expand Up @@ -236,6 +235,8 @@ fn parse_accept(headers: &HeaderMap) -> ClientRequestAccepts {

#[cfg(test)]
mod tests {
use http::HeaderValue;

use super::*;

#[test]
Expand Down
8 changes: 4 additions & 4 deletions apollo-router/src/services/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use serde_json_bytes::Value;
use static_assertions::assert_impl_all;
use tower::BoxError;

use super::router_service::MULTIPART_DEFER_HEADER_VALUE;
use super::router_service::MULTIPART_SUBSCRIPTION_HEADER_VALUE;
use super::supergraph;
use super::MULTIPART_DEFER_CONTENT_TYPE;
use super::MULTIPART_SUBSCRIPTION_CONTENT_TYPE;
use crate::graphql;
use crate::json_ext::Path;
use crate::services::TryIntoHeaderName;
Expand Down Expand Up @@ -220,8 +220,8 @@ impl Response {
.get(CONTENT_TYPE)
.iter()
.any(|value| {
*value == HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE)
|| *value == HeaderValue::from_static(MULTIPART_SUBSCRIPTION_CONTENT_TYPE)
*value == MULTIPART_DEFER_HEADER_VALUE
|| *value == MULTIPART_SUBSCRIPTION_HEADER_VALUE
})
{
let multipart = Multipart::new(self.response.into_body(), "graphql");
Expand Down
36 changes: 21 additions & 15 deletions apollo-router/src/services/router_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use super::HasPlugins;
#[cfg(test)]
use super::HasSchema;
use super::SupergraphCreator;
use super::APPLICATION_JSON_HEADER_VALUE;
use super::MULTIPART_DEFER_CONTENT_TYPE;
use super::MULTIPART_SUBSCRIPTION_CONTENT_TYPE;
use crate::cache::DeduplicatingCache;
Expand All @@ -63,6 +64,14 @@ use crate::Configuration;
use crate::Endpoint;
use crate::ListenAddr;

pub(crate) static MULTIPART_DEFER_HEADER_VALUE: HeaderValue =
HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE);
pub(crate) static MULTIPART_SUBSCRIPTION_HEADER_VALUE: HeaderValue =
HeaderValue::from_static(MULTIPART_SUBSCRIPTION_CONTENT_TYPE);
static ACCEL_BUFFERING_HEADER_NAME: HeaderName = HeaderName::from_static("x-accel-buffering");
static ACCEL_BUFFERING_HEADER_VALUE: HeaderValue = HeaderValue::from_static("no");
static ORIGIN_HEADER_VALUE: HeaderValue = HeaderValue::from_static("origin");

/// Containing [`Service`] in the request lifecyle.
#[derive(Clone)]
pub(crate) struct RouterService {
Expand Down Expand Up @@ -284,10 +293,9 @@ impl RouterService {
&& !response.subscribed.unwrap_or(false)
&& (accepts_json || accepts_wildcard)
{
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(APPLICATION_JSON.essence_str()),
);
parts
.headers
.insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());
tracing::trace_span!("serialize_response").in_scope(|| {
let body = serde_json::to_string(&response)?;
Ok(router::Response {
Expand All @@ -297,20 +305,18 @@ impl RouterService {
})
} else if accepts_multipart_defer || accepts_multipart_subscription {
if accepts_multipart_defer {
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE),
);
parts
.headers
.insert(CONTENT_TYPE, MULTIPART_DEFER_HEADER_VALUE.clone());
} else if accepts_multipart_subscription {
parts.headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(MULTIPART_SUBSCRIPTION_CONTENT_TYPE),
);
parts
.headers
.insert(CONTENT_TYPE, MULTIPART_SUBSCRIPTION_HEADER_VALUE.clone());
}
// Useful when you're using a proxy like nginx which enable proxy_buffering by default (http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering)
parts.headers.insert(
HeaderName::from_static("x-accel-buffering"),
HeaderValue::from_static("no"),
ACCEL_BUFFERING_HEADER_NAME.clone(),
ACCEL_BUFFERING_HEADER_VALUE.clone(),
);
let multipart_stream = match response.subscribed {
Some(true) => {
Expand Down Expand Up @@ -454,7 +460,7 @@ impl RouterService {
fn process_vary_header(headers: &mut HeaderMap<HeaderValue>) {
if headers.get(VARY).is_none() {
// We don't have a VARY header, add one with value "origin"
headers.insert(VARY, HeaderValue::from_static("origin"));
headers.insert(VARY, ORIGIN_HEADER_VALUE.clone());
}
}

Expand Down
23 changes: 15 additions & 8 deletions apollo-router/src/services/subgraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ const POOL_IDLE_TIMEOUT_DURATION: Option<Duration> = Some(Duration::from_secs(5)

// interior mutability is not a concern here, the value is never modified
#[allow(clippy::declare_interior_mutable_const)]
const ACCEPTED_ENCODINGS: HeaderValue = HeaderValue::from_static("gzip, br, deflate");
static ACCEPTED_ENCODINGS: HeaderValue = HeaderValue::from_static("gzip, br, deflate");
pub(crate) static APPLICATION_JSON_HEADER_VALUE: HeaderValue =
HeaderValue::from_static("application/json");
static APP_GRAPHQL_JSON: HeaderValue = HeaderValue::from_static(GRAPHQL_JSON_RESPONSE_HEADER_VALUE);

enum APQError {
PersistedQueryNotSupported,
Expand Down Expand Up @@ -643,15 +646,19 @@ async fn call_http(
})?;

let mut request = http::request::Request::from_parts(parts, compressed_body.into());
let app_json: HeaderValue = HeaderValue::from_static(APPLICATION_JSON.essence_str());
let app_graphql_json: HeaderValue =
HeaderValue::from_static(GRAPHQL_JSON_RESPONSE_HEADER_VALUE);
request.headers_mut().insert(CONTENT_TYPE, app_json.clone());
request.headers_mut().insert(ACCEPT, app_json);
request.headers_mut().append(ACCEPT, app_graphql_json);

request
.headers_mut()
.insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone());
request
.headers_mut()
.insert(ACCEPT, APPLICATION_JSON_HEADER_VALUE.clone());
request
.headers_mut()
.append(ACCEPT, APP_GRAPHQL_JSON.clone());
request
.headers_mut()
.insert(ACCEPT_ENCODING, ACCEPTED_ENCODINGS);
.insert(ACCEPT_ENCODING, ACCEPTED_ENCODINGS.clone());

let schema_uri = request.uri();
let host = schema_uri.host().unwrap_or_default();
Expand Down