diff --git a/apollo-router/src/uplink/mod.rs b/apollo-router/src/uplink/mod.rs index 8f10a44c280..b72ef919356 100644 --- a/apollo-router/src/uplink/mod.rs +++ b/apollo-router/src/uplink/mod.rs @@ -4,6 +4,7 @@ use std::time::Duration; use std::time::Instant; use futures::Stream; +use futures::StreamExt; use graphql_client::QueryBody; use thiserror::Error; use tokio::sync::mpsc::channel; @@ -169,6 +170,17 @@ where { let query = query_name::(); let (sender, receiver) = channel(2); + let client = match reqwest::Client::builder() + .timeout(uplink_config.timeout) + .build() + { + Ok(client) => client, + Err(err) => { + tracing::error!("unable to create client to query uplink: {err}", err = err); + return futures::stream::empty().boxed(); + } + }; + let task = async move { let mut last_id = None; let mut endpoints = uplink_config.endpoints.unwrap_or_default(); @@ -181,13 +193,7 @@ where let query_body = Query::build_query(variables.into()); - match fetch::( - &query_body, - &mut endpoints.iter(), - uplink_config.timeout, - ) - .await - { + match fetch::(&client, &query_body, &mut endpoints.iter()).await { Ok(response) => { tracing::info!( counter.apollo_router_uplink_fetch_count_total = 1, @@ -255,13 +261,13 @@ where }; drop(tokio::task::spawn(task.with_current_subscriber())); - ReceiverStream::new(receiver) + ReceiverStream::new(receiver).boxed() } pub(crate) async fn fetch( + client: &reqwest::Client, request_body: &QueryBody, urls: &mut impl Iterator, - timeout: Duration, ) -> Result, Error> where Query: graphql_client::GraphQLQuery, @@ -272,7 +278,7 @@ where let query = query_name::(); for url in urls { let now = Instant::now(); - match http_request::(url.as_str(), request_body, timeout).await { + match http_request::(client, url.as_str(), request_body).await { Ok(response) => { let response = response.data.map(Into::into); match &response { @@ -352,14 +358,13 @@ fn query_name() -> &'static str { } async fn http_request( + client: &reqwest::Client, url: &str, request_body: &QueryBody, - timeout: Duration, ) -> Result, reqwest::Error> where Query: graphql_client::GraphQLQuery, { - let client = reqwest::Client::builder().timeout(timeout).build()?; // It is possible that istio-proxy is re-configuring networking beneath us. If it is, we'll see an error something like this: // level: "ERROR" // message: "fetch failed from all endpoints"