diff --git a/application/idp_client/service.py b/application/idp_client/service.py index 4b8f9b8..ea153e5 100644 --- a/application/idp_client/service.py +++ b/application/idp_client/service.py @@ -37,7 +37,9 @@ def consume_idp_code(self) -> Tuple[str, int]: logger.error(f'No session found based on id {state}') return 'Bad request, No session found based on id ' + state, 400 - hti_launch_token = pyjwt.decode(oauth2_session.launch, options={"verify_signature": False}) + hti_launch_token = pyjwt.decode(oauth2_session.launch, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) logger.info(f'[{oauth2_session.id}] Consuming idp oidc code for user {hti_launch_token["sub"]}') if 'X-Trace-Id' not in trace_headers: @@ -59,7 +61,9 @@ def consume_idp_code(self) -> Tuple[str, int]: logger.error(f'[{oauth2_session.id}] no id_token found') return 'Bad request, no id_token found', 400 - id_token = pyjwt.decode(encoded_id_token, options={"verify_signature": False}) # TODO: Verify signature + id_token = pyjwt.decode(encoded_id_token, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) # TODO: Verify signature if oauth2_session.identity_provider: identity_provider: IdentityProvider = IdentityProvider.query.filter_by(id=oauth2_session.identity_provider).first() diff --git a/application/oauth_server/verifiers.py b/application/oauth_server/verifiers.py index 45d64e0..d1a1e3d 100644 --- a/application/oauth_server/verifiers.py +++ b/application/oauth_server/verifiers.py @@ -20,7 +20,9 @@ def verify_token(encoded_token: str, auth_client_id: str) -> Optional[Dict[str, Any]]: - unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False}) + unverified_decoded_jwt = pyjwt.decode(encoded_token, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) iss = unverified_decoded_jwt.get('iss', '') aud = unverified_decoded_jwt.get('aud', '') @@ -47,7 +49,9 @@ class ClientCredentialsTokenVerifier: def verify_and_get_token(self, encoded_token, auth_client_id=None): try: - unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False}) + unverified_decoded_jwt = pyjwt.decode(encoded_token, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) except DecodeError as e: logger.warning(f"Failed to decode JWT: {e}") return @@ -124,7 +128,9 @@ def _verify_aud(aud: str, auth_client_id): class AccessTokenVerifier(): def verify_and_get_token(self, encoded_token): try: - unverified_decoded_jwt = pyjwt.decode(encoded_token, options={"verify_signature": False}) + unverified_decoded_jwt = pyjwt.decode(encoded_token, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) except DecodeError as e: logger.warning(f"Failed to decode JWT: {e}") return @@ -158,7 +164,8 @@ def _decode_with_jwks(smart_service, aud, encoded_token) -> Dict[str, Any]: signing_key = jwks_client.get_signing_key_from_jwt(encoded_token) decoded_jwt = pyjwt.decode(encoded_token, signing_key.key, algorithms=current_app.config['OIDC_SMART_CONFIG_SIGNING_ALGS'], - audience=aud) + audience=aud, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) logger.info(f'JWT for client_id {smart_service.client_id} is decoded by JWKS - valid key') return decoded_jwt @@ -176,7 +183,8 @@ def _decode_with_own_key(encoded_token): decoded_jwt = pyjwt.decode(encoded_token, public_key.as_pem(), algorithms=current_app.config[ 'OIDC_SMART_CONFIG_SIGNING_ALGS'], - options={'verify_aud': False}) # TODO: check if correct + options={'verify_aud': False}, # TODO: check skipping aud is correct + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) logger.info(f'JWT signed by self is decoded by JWKS - valid key') return decoded_jwt @@ -192,7 +200,8 @@ def _decode_with_public_key(smart_service, aud, encoded_token) -> Dict[str, Any] decoded_jwt = pyjwt.decode(encoded_token, public_key, algorithms=["RS512"], ## TODO: check with spec - audience=aud) + audience=aud, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) logger.info(f'JWT for client_id {smart_service.client_id} is decoded by PUBLIC KEY - valid key') return decoded_jwt diff --git a/application/oauth_server/views.py b/application/oauth_server/views.py index ab60465..e6d3b80 100644 --- a/application/oauth_server/views.py +++ b/application/oauth_server/views.py @@ -166,7 +166,9 @@ def handle_introspect_request(): if not token: return 'Bad Request, required field token missing', 400 - unverified_decoded_jwt = pyjwt.decode(token, options={"verify_signature": False}) + unverified_decoded_jwt = pyjwt.decode(token, + options={"verify_signature": False}, + leeway=current_app.config.get('JWT_VALIDATION_LEEWAY', 10)) iss = unverified_decoded_jwt.get('iss') if not iss: return jsonify({'active': False}) diff --git a/instance/config.py b/instance/config.py index fe5bcda..8c03a71 100644 --- a/instance/config.py +++ b/instance/config.py @@ -63,6 +63,8 @@ def envget_list(key: str, dflt: list = '') -> list: OIDC_JWT_PRIVATE_KEY = envget_str('OIDC_JWT_PRIVATE_KEY', '') OIDC_JWT_EXP_TIME_ACCESS_TOKEN = envget_int('OIDC_JWT_EXP_TIME_ACCESS_TOKEN', 300) +JWT_VALIDATION_LEEWAY = envget_int('JWT_VALIDATION_LEEWAY', 10) + # IdP is used to retrieve the id_token of the logged-in user IDP_AUTHORIZE_ENDPOINT = envget_str('IDP_AUTHORIZE_ENDPOINT', 'https://iam.koppeltaal.headease.nl/realms/Koppeltaal2/protocol/openid-connect/auth') IDP_TOKEN_ENDPOINT = envget_str('IDP_TOKEN_ENDPOINT', 'https://iam.koppeltaal.headease.nl/realms/Koppeltaal2/protocol/openid-connect/token') diff --git a/test/test_oauth_flows.py b/test/test_oauth_flows.py index ce99043..2fdaa0f 100644 --- a/test/test_oauth_flows.py +++ b/test/test_oauth_flows.py @@ -40,7 +40,8 @@ def testing_app(server_key: Key): 'SMART_BACKEND_SERVICE_DEVICE_ID': 'Device/' + str(uuid4()), 'OIDC_SMART_CONFIG_SIGNING_ALGS': ["RS384", "ES384", "RS512"], 'OIDC_JWT_PUBLIC_KEY': server_key.as_pem(), - 'OIDC_JWT_PRIVATE_KEY': private_key_bytes}) + 'OIDC_JWT_PRIVATE_KEY': private_key_bytes, + 'JWT_VALIDATION_LEEWAY': 0}) with app.test_client() as client: with app.app_context(): yield client