Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce JWT expiration for subscriptions #4166

Merged
merged 16 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/fix_garypen_3947_jwt_sub_expire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Enfore JWT expiration for subscriptions ([Issue #3947](https://github.com/apollographql/router/issues/3947))

If a JWT expires whilst a subscription is executing, the subscription should be terminated. This also applies to deferred responses.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/4166
38 changes: 38 additions & 0 deletions apollo-router/src/plugins/authentication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::collections::HashMap;
use std::ops::ControlFlow;
use std::str::FromStr;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use displaydoc::Display;
use http::StatusCode;
Expand All @@ -23,6 +25,7 @@ use once_cell::sync::Lazy;
use reqwest::Client;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::Value;
use thiserror::Error;
use tower::BoxError;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -698,6 +701,41 @@ fn decode_jwt(
}
}

pub(crate) fn jwt_expires_in(context: &Context) -> Duration {
let claims = context
.get(APOLLO_AUTHENTICATION_JWT_CLAIMS)
.map_err(|err| tracing::error!("could not read JWT claims: {err}"))
.ok()
.flatten();
let ts_opt = claims.as_ref().and_then(|x: &Value| {
if !x.is_object() {
tracing::error!("JWT claims should be an object");
return None;
}
let claims = x.as_object().expect("claims should be an object");
let exp = claims.get("exp")?;
if !exp.is_number() {
tracing::error!("JWT 'exp' (expiry) claim should be a number");
return None;
}
exp.as_i64()
});
match ts_opt {
Some(ts) => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("we should not run before EPOCH")
.as_secs() as i64;
if now < ts {
Duration::from_secs((ts - now) as u64)
} else {
Duration::ZERO
}
}
None => Duration::MAX,
}
}

// This macro allows us to use it in our plugin registry!
// register_plugin takes a group name, and a plugin name.
//
Expand Down
46 changes: 46 additions & 0 deletions apollo-router/src/services/execution_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use futures::channel::mpsc;
use futures::channel::mpsc::Receiver;
Expand Down Expand Up @@ -36,6 +38,7 @@ use crate::json_ext::Object;
use crate::json_ext::Path;
use crate::json_ext::PathElement;
use crate::json_ext::ValueExt;
use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS;
use crate::plugins::subscription::Subscription;
use crate::plugins::subscription::SubscriptionConfig;
use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
Expand Down Expand Up @@ -117,6 +120,10 @@ impl ExecutionService {
.query_plan
.is_deferred(operation_name.as_deref(), &variables);
let is_subscription = req.query_plan.is_subscription(operation_name.as_deref());
let mut claims = None;
if is_deferred {
claims = context.get(APOLLO_AUTHENTICATION_JWT_CLAIMS)?
}
let (tx_close_signal, subscription_handle) = if is_subscription {
let (tx_close_signal, rx_close_signal) = broadcast::channel(1);
(
Expand Down Expand Up @@ -175,6 +182,45 @@ impl ExecutionService {
let execution_span = Span::current();

let stream = stream
.map(move |mut response: Response| {
// Enforce JWT expiry for deferred responses
if is_deferred {
let ts_opt = claims.as_ref().and_then(|x: &Value| {
if !x.is_object() {
tracing::error!("JWT claims should be an object");
return None;
}
let claims = x.as_object().expect("claims should be an object");
let exp = claims.get("exp")?;
if !exp.is_number() {
tracing::error!("JWT 'exp' (expiry) claim should be a number");
return None;
}
exp.as_i64()
});
if let Some(ts) = ts_opt {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("we should not run before EPOCH")
.as_secs() as i64;
if ts < now {
tracing::debug!("token has expired, shut down the subscription");
response = Response::builder()
.has_next(false)
.error(
Error::builder()
.message(
"deferred response closed because the JWT has expired",
)
.extension_code("DEFERRED_RESPONSE_JWT_EXPIRED")
.build(),
)
.build()
}
}
}
response
})
.filter_map(move |response: Response| {
ready(execution_span.in_scope(|| {
Self::process_graphql_response(
Expand Down
19 changes: 19 additions & 0 deletions apollo-router/src/services/supergraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,30 @@ async fn subscription_task(
let mut configuration_updated_rx = notify.subscribe_configuration();
let mut schema_updated_rx = notify.subscribe_schema();

let expires_in = crate::plugins::authentication::jwt_expires_in(&supergraph_req.context);

let mut timeout = Box::pin(tokio::time::sleep(expires_in));

loop {
tokio::select! {
// We prefer to specify the order of checks within the select
biased;
_ = subscription_handle.closed_signal.recv() => {
break;
}
_ = &mut timeout => {
let response = Response::builder()
.subscribed(false)
.error(
crate::error::Error::builder()
.message("subscription closed because the JWT has expired")
.extension_code("SUBSCRIPTION_JWT_EXPIRED")
.build(),
)
.build();
let _ = sender.send(response).await;
break;
},
message = receiver.next() => {
match message {
Some(mut val) => {
Expand Down