Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JWT validation leeway in token decoding #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions application/idp_client/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def consume_idp_code(self) -> Tuple[str, int]:
logger.error(f'No session found based on id {state}')
return 'Bad request, No session found based on id ' + state, 400

hti_launch_token = pyjwt.decode(oauth2_session.launch, options={"verify_signature": False})
hti_launch_token = pyjwt.decode(oauth2_session.launch,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
logger.info(f'[{oauth2_session.id}] Consuming idp oidc code for user {hti_launch_token["sub"]}')

if 'X-Trace-Id' not in trace_headers:
Expand All @@ -59,7 +61,9 @@ def consume_idp_code(self) -> Tuple[str, int]:
logger.error(f'[{oauth2_session.id}] no id_token found')
return 'Bad request, no id_token found', 400

id_token = pyjwt.decode(encoded_id_token, options={"verify_signature": False}) # TODO: Verify signature
id_token = pyjwt.decode(encoded_id_token,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) # TODO: Verify signature

if oauth2_session.identity_provider:
identity_provider: IdentityProvider = IdentityProvider.query.filter_by(id=oauth2_session.identity_provider).first()
Expand Down
21 changes: 15 additions & 6 deletions application/oauth_server/verifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@


def verify_token(encoded_token: str, auth_client_id: str) -> Optional[Dict[str, Any]]:
unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False})
unverified_decoded_jwt = pyjwt.decode(encoded_token,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
iss = unverified_decoded_jwt.get('iss', '')
aud = unverified_decoded_jwt.get('aud', '')

Expand All @@ -47,7 +49,9 @@ class ClientCredentialsTokenVerifier:

def verify_and_get_token(self, encoded_token, auth_client_id=None):
try:
unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False})
unverified_decoded_jwt = pyjwt.decode(encoded_token,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
except DecodeError as e:
logger.warning(f"Failed to decode JWT: {e}")
return
Expand Down Expand Up @@ -124,7 +128,9 @@ def _verify_aud(aud: str, auth_client_id):
class AccessTokenVerifier():
def verify_and_get_token(self, encoded_token):
try:
unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False})
unverified_decoded_jwt = pyjwt.decode(encoded_token,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
except DecodeError as e:
logger.warning(f"Failed to decode JWT: {e}")
return
Expand Down Expand Up @@ -158,7 +164,8 @@ def _decode_with_jwks(smart_service, aud, encoded_token) -> Dict[str, Any]:
signing_key = jwks_client.get_signing_key_from_jwt(encoded_token)
decoded_jwt = pyjwt.decode(encoded_token, signing_key.key,
algorithms=current_app.config['OIDC_SMART_CONFIG_SIGNING_ALGS'],
audience=aud)
audience=aud,
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
logger.info(f'JWT for client_id {smart_service.client_id} is decoded by JWKS - valid key')
return decoded_jwt

Expand All @@ -176,7 +183,8 @@ def _decode_with_own_key(encoded_token):
decoded_jwt = pyjwt.decode(encoded_token, public_key.as_pem(),
algorithms=current_app.config[
'OIDC_SMART_CONFIG_SIGNING_ALGS'],
options={'verify_aud': False}) # TODO: check if correct
options={'verify_aud': False}, # TODO: check skipping aud is correct
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))

logger.info(f'JWT signed by self is decoded by JWKS - valid key')
return decoded_jwt
Expand All @@ -192,7 +200,8 @@ def _decode_with_public_key(smart_service, aud, encoded_token) -> Dict[str, Any]

decoded_jwt = pyjwt.decode(encoded_token, public_key,
algorithms=["RS512"], ## TODO: check with spec
audience=aud)
audience=aud,
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))

logger.info(f'JWT for client_id {smart_service.client_id} is decoded by PUBLIC KEY - valid key')
return decoded_jwt
Expand Down
4 changes: 3 additions & 1 deletion application/oauth_server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def handle_introspect_request():
if not token:
return 'Bad Request, required field token missing', 400

unverified_decoded_jwt = pyjwt.decode(token, options={"verify_signature": False})
unverified_decoded_jwt = pyjwt.decode(token,
options={"verify_signature": False},
leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10))
iss = unverified_decoded_jwt.get('iss')
if not iss:
return jsonify({'active': False})
Expand Down
2 changes: 2 additions & 0 deletions instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def envget_list(key: str, dflt: list = '') -> list:
OIDC_JWT_PRIVATE_KEY = envget_str('OIDC_JWT_PRIVATE_KEY', '')
OIDC_JWT_EXP_TIME_ACCESS_TOKEN = envget_int('OIDC_JWT_EXP_TIME_ACCESS_TOKEN', 300)

JWT_VALIDATION_LEEWAY = envget_int('JWT_VALIDATION_LEEWAY', 10)

# IdP is used to retrieve the id_token of the logged-in user
IDP_AUTHORIZE_ENDPOINT = envget_str('IDP_AUTHORIZE_ENDPOINT', 'https://iam.koppeltaal.headease.nl/realms/Koppeltaal2/protocol/openid-connect/auth')
IDP_TOKEN_ENDPOINT = envget_str('IDP_TOKEN_ENDPOINT', 'https://iam.koppeltaal.headease.nl/realms/Koppeltaal2/protocol/openid-connect/token')
Expand Down
3 changes: 2 additions & 1 deletion test/test_oauth_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def testing_app(server_key: Key):
'SMART_BACKEND_SERVICE_DEVICE_ID': 'Device/' + str(uuid4()),
'OIDC_SMART_CONFIG_SIGNING_ALGS': ["RS384", "ES384", "RS512"],
'OIDC_JWT_PUBLIC_KEY': server_key.as_pem(),
'OIDC_JWT_PRIVATE_KEY': private_key_bytes})
'OIDC_JWT_PRIVATE_KEY': private_key_bytes,
'JWT_VALIDATION_LEEWAY': 0})
with app.test_client() as client:
with app.app_context():
yield client
Expand Down
Loading