diff --git a/.changesets/fix_garypen_3947_jwt_sub_expire.md b/.changesets/fix_garypen_3947_jwt_sub_expire.md new file mode 100644 index 0000000000..785f837265 --- /dev/null +++ b/.changesets/fix_garypen_3947_jwt_sub_expire.md @@ -0,0 +1,5 @@ +### Enfore JWT expiration for subscriptions ([Issue #3947](https://github.com/apollographql/router/issues/3947)) + +If a JWT expires whilst a subscription is executing, the subscription should be terminated. This also applies to deferred responses. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/4166 \ No newline at end of file diff --git a/apollo-router/src/plugins/authentication/mod.rs b/apollo-router/src/plugins/authentication/mod.rs index 5874dd0284..ec2b135f8d 100644 --- a/apollo-router/src/plugins/authentication/mod.rs +++ b/apollo-router/src/plugins/authentication/mod.rs @@ -4,6 +4,8 @@ use std::collections::HashMap; use std::ops::ControlFlow; use std::str::FromStr; use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use displaydoc::Display; use http::StatusCode; @@ -23,6 +25,7 @@ use once_cell::sync::Lazy; use reqwest::Client; use schemars::JsonSchema; use serde::Deserialize; +use serde_json::Value; use thiserror::Error; use tower::BoxError; use tower::ServiceBuilder; @@ -698,6 +701,41 @@ fn decode_jwt( } } +pub(crate) fn jwt_expires_in(context: &Context) -> Duration { + let claims = context + .get(APOLLO_AUTHENTICATION_JWT_CLAIMS) + .map_err(|err| tracing::error!("could not read JWT claims: {err}")) + .ok() + .flatten(); + let ts_opt = claims.as_ref().and_then(|x: &Value| { + if !x.is_object() { + tracing::error!("JWT claims should be an object"); + return None; + } + let claims = x.as_object().expect("claims should be an object"); + let exp = claims.get("exp")?; + if !exp.is_number() { + tracing::error!("JWT 'exp' (expiry) claim should be a number"); + return None; + } + exp.as_i64() + }); + match ts_opt { + Some(ts) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("we should not run before EPOCH") + .as_secs() as i64; + if now < ts { + Duration::from_secs((ts - now) as u64) + } else { + Duration::ZERO + } + } + None => Duration::MAX, + } +} + // This macro allows us to use it in our plugin registry! // register_plugin takes a group name, and a plugin name. // diff --git a/apollo-router/src/services/execution_service.rs b/apollo-router/src/services/execution_service.rs index 9d5480508b..5832b6e834 100644 --- a/apollo-router/src/services/execution_service.rs +++ b/apollo-router/src/services/execution_service.rs @@ -5,6 +5,8 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use futures::channel::mpsc; use futures::channel::mpsc::Receiver; @@ -36,6 +38,7 @@ use crate::json_ext::Object; use crate::json_ext::Path; use crate::json_ext::PathElement; use crate::json_ext::ValueExt; +use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS; use crate::plugins::subscription::Subscription; use crate::plugins::subscription::SubscriptionConfig; use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN; @@ -117,6 +120,10 @@ impl ExecutionService { .query_plan .is_deferred(operation_name.as_deref(), &variables); let is_subscription = req.query_plan.is_subscription(operation_name.as_deref()); + let mut claims = None; + if is_deferred { + claims = context.get(APOLLO_AUTHENTICATION_JWT_CLAIMS)? + } let (tx_close_signal, subscription_handle) = if is_subscription { let (tx_close_signal, rx_close_signal) = broadcast::channel(1); ( @@ -175,6 +182,45 @@ impl ExecutionService { let execution_span = Span::current(); let stream = stream + .map(move |mut response: Response| { + // Enforce JWT expiry for deferred responses + if is_deferred { + let ts_opt = claims.as_ref().and_then(|x: &Value| { + if !x.is_object() { + tracing::error!("JWT claims should be an object"); + return None; + } + let claims = x.as_object().expect("claims should be an object"); + let exp = claims.get("exp")?; + if !exp.is_number() { + tracing::error!("JWT 'exp' (expiry) claim should be a number"); + return None; + } + exp.as_i64() + }); + if let Some(ts) = ts_opt { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("we should not run before EPOCH") + .as_secs() as i64; + if ts < now { + tracing::debug!("token has expired, shut down the subscription"); + response = Response::builder() + .has_next(false) + .error( + Error::builder() + .message( + "deferred response closed because the JWT has expired", + ) + .extension_code("DEFERRED_RESPONSE_JWT_EXPIRED") + .build(), + ) + .build() + } + } + } + response + }) .filter_map(move |response: Response| { ready(execution_span.in_scope(|| { Self::process_graphql_response( diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 63a2e6ddc3..94440c31e0 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -424,11 +424,30 @@ async fn subscription_task( let mut configuration_updated_rx = notify.subscribe_configuration(); let mut schema_updated_rx = notify.subscribe_schema(); + let expires_in = crate::plugins::authentication::jwt_expires_in(&supergraph_req.context); + + let mut timeout = Box::pin(tokio::time::sleep(expires_in)); + loop { tokio::select! { + // We prefer to specify the order of checks within the select + biased; _ = subscription_handle.closed_signal.recv() => { break; } + _ = &mut timeout => { + let response = Response::builder() + .subscribed(false) + .error( + crate::error::Error::builder() + .message("subscription closed because the JWT has expired") + .extension_code("SUBSCRIPTION_JWT_EXPIRED") + .build(), + ) + .build(); + let _ = sender.send(response).await; + break; + }, message = receiver.next() => { match message { Some(mut val) => {