Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uplink connections now reuse reqwest client #3703

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changesets/maint_bryn_uplink_client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Uplink connections now reuse reqwest client ([Issue #3333](https://github.com/apollographql/router/issues/3333))

Previously uplink requests created a new reqwest client each time, this may cause CPU spikes especially on OSX.
A single client will now be shared between requests of the same type.

By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/3703
29 changes: 17 additions & 12 deletions apollo-router/src/uplink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use std::time::Instant;

use futures::Stream;
use futures::StreamExt;
use graphql_client::QueryBody;
use thiserror::Error;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -169,6 +170,17 @@ where
{
let query = query_name::<Query>();
let (sender, receiver) = channel(2);
let client = match reqwest::Client::builder()
.timeout(uplink_config.timeout)
.build()
{
Ok(client) => client,
Err(err) => {
tracing::error!("unable to create client to query uplink: {err}", err = err);
return futures::stream::empty().boxed();
}
};

let task = async move {
let mut last_id = None;
let mut endpoints = uplink_config.endpoints.unwrap_or_default();
Expand All @@ -181,13 +193,7 @@ where

let query_body = Query::build_query(variables.into());

match fetch::<Query, Response>(
&query_body,
&mut endpoints.iter(),
uplink_config.timeout,
)
.await
{
match fetch::<Query, Response>(&client, &query_body, &mut endpoints.iter()).await {
Ok(response) => {
tracing::info!(
counter.apollo_router_uplink_fetch_count_total = 1,
Expand Down Expand Up @@ -255,13 +261,13 @@ where
};
drop(tokio::task::spawn(task.with_current_subscriber()));

ReceiverStream::new(receiver)
ReceiverStream::new(receiver).boxed()
}

pub(crate) async fn fetch<Query, Response>(
client: &reqwest::Client,
request_body: &QueryBody<Query::Variables>,
urls: &mut impl Iterator<Item = &Url>,
timeout: Duration,
) -> Result<UplinkResponse<Response>, Error>
where
Query: graphql_client::GraphQLQuery,
Expand All @@ -272,7 +278,7 @@ where
let query = query_name::<Query>();
for url in urls {
let now = Instant::now();
match http_request::<Query>(url.as_str(), request_body, timeout).await {
match http_request::<Query>(client, url.as_str(), request_body).await {
Ok(response) => {
let response = response.data.map(Into::into);
match &response {
Expand Down Expand Up @@ -352,14 +358,13 @@ fn query_name<Query>() -> &'static str {
}

async fn http_request<Query>(
client: &reqwest::Client,
url: &str,
request_body: &QueryBody<Query::Variables>,
timeout: Duration,
) -> Result<graphql_client::Response<Query::ResponseData>, reqwest::Error>
where
Query: graphql_client::GraphQLQuery,
{
let client = reqwest::Client::builder().timeout(timeout).build()?;
// It is possible that istio-proxy is re-configuring networking beneath us. If it is, we'll see an error something like this:
// level: "ERROR"
// message: "fetch failed from all endpoints"
Expand Down