Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyjwt for API tokens instead of itsdangerous's deprecated code #6267

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions securedrop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@
import binascii
import datetime
import base64
import jwt
import os
import pyotp
import qrcode
# Using svg because it doesn't require additional dependencies
import qrcode.image.svg
import time
import uuid
from io import BytesIO

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.kdf import scrypt
from flask import current_app, url_for
from flask_babel import gettext, ngettext
from itsdangerous import TimedJSONWebSignatureSerializer, BadData
from jinja2 import Markup
from passlib.hash import argon2
from sqlalchemy import ForeignKey
Expand Down Expand Up @@ -737,26 +738,34 @@ def login(cls,
return user

def generate_api_token(self, expiration: int) -> str:
s = TimedJSONWebSignatureSerializer(
current_app.config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'id': self.id}).decode('ascii') # type:ignore
"""Generate a token that will expire in `expiration` seconds"""
return jwt.encode(
{'id': self.id, 'exp': time.time() + expiration},
current_app.config['SECRET_KEY'],
algorithm="HS512"
)

@staticmethod
def validate_token_is_not_expired_or_invalid(token: str) -> bool:
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
def _decode_jwt(token: str) -> 'Optional[dict]':
"""Decode a user-supplied jwt, verifying the signature and that it's not expired"""
try:
s.loads(token)
except BadData:
return False
return jwt.decode(
token, current_app.config['SECRET_KEY'],
algorithms=["HS512"],
options={'verify_exp': True, 'require': ["exp"]},
)
except (jwt.DecodeError, jwt.ExpiredSignatureError,
jwt.MissingRequiredClaimError, jwt.InvalidAlgorithmError):
return None

return True
@staticmethod
def validate_token_is_not_expired_or_invalid(token: str) -> bool:
return Journalist._decode_jwt(token) is not None

@staticmethod
def validate_api_token_and_get_user(token: str) -> 'Optional[Journalist]':
s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except BadData:
data = Journalist._decode_jwt(token)
if data is None:
return None

revoked_token = RevokedToken.query.filter_by(token=token).one_or_none()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod_wsgi
passlib
pretty-bad-protocol>=3.1.1
psutil>=5.6.6
pyjwt>=2.3.0
pyotp>=2.6.0
qrcode
redis>=3.3.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ psutil==5.7.0 \
pycparser==2.18 \
--hash=sha256:99a8ca03e29851d96616ad0404b4aad7d9ee16f25c9f9708a11faf2810f7b226
# via cffi
pyjwt==2.3.0 \
--hash=sha256:b888b4d56f06f6dcd777210c334e69c737be74755d3e5e9ee3fe67dc18a0ee41 \
--hash=sha256:e0c4bb8d9f0af0c7f5b1ec4c5036309617d03d56932877f2f7a0beeb5318322f
# via -r requirements/python3/securedrop-app-code-requirements.in
pyotp==2.6.0 \
--hash=sha256:9d144de0f8a601d6869abe1409f4a3f75f097c37b50a36a3bf165810a6e23f28 \
--hash=sha256:d28ddfd40e0c1b6a6b9da961c7d47a10261fb58f378cb00f05ce88b26df9c432
Expand Down
59 changes: 54 additions & 5 deletions securedrop/tests/test_journalist_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
import json
import jwt
import random
import time

from datetime import datetime
from flask import url_for
from itsdangerous import TimedJSONWebSignatureSerializer
from pyotp import TOTP
from uuid import UUID, uuid4

Expand Down Expand Up @@ -207,14 +208,16 @@ def test_attacker_cannot_create_valid_token_with_none_alg(journalist_app,
test_journo):
with journalist_app.test_client() as app:
uuid = test_source['source'].uuid
s = TimedJSONWebSignatureSerializer('not the secret key',
algorithm_name='none')
attacker_token = s.dumps({'id': test_journo['id']}).decode('ascii')

attacker_token = jwt.encode(
{'id': test_journo['id'], 'exp': time.time() + 300},
None,
algorithm='none'
)
response = app.delete(url_for('api.single_source', source_uuid=uuid),
headers=get_api_headers(attacker_token))

assert response.status_code == 403
assert Journalist._decode_jwt(attacker_token) is None


def test_attacker_cannot_use_token_after_admin_deletes(journalist_app,
Expand Down Expand Up @@ -1195,3 +1198,49 @@ def test_seen_bad_requests(journalist_app, journalist_api_token):
response = app.post(seen_url, data=json.dumps(data), headers=headers)
assert response.status_code == 404
assert response.json["message"] == "reply not found: not-a-reply"


def test_api_token_decodes(journalist_app, test_journo):
"""With a valid token, we get the correct journalist back"""
with journalist_app.test_client() as _:
journalist = test_journo['journalist']
token = journalist.generate_api_token(expiration=300)
found = Journalist.validate_api_token_and_get_user(token)
assert found is not None
assert found.uuid == journalist.uuid


def test_api_token_expires(journalist_app, test_journo):
"""An expired token is rejected"""
with journalist_app.test_client() as _:
journalist = test_journo['journalist']
# Create a token that is already expired
token = journalist.generate_api_token(expiration=-1)
assert Journalist._decode_jwt(token) is None
assert Journalist.validate_token_is_not_expired_or_invalid(token) is False
assert Journalist.validate_api_token_and_get_user(token) is None


def test_invalid_api_tokens(journalist_app):
"""Verify various invalid tokens are rejected"""
with journalist_app.test_client() as _:
# Not a real token
assert Journalist._decode_jwt("thisisnotarealtoken") is None
# Signed with wrong key
assert Journalist._decode_jwt(jwt.encode(
{'id': 123, 'exp': time.time() + 300},
"wrong secret key",
algorithm="HS512",
)) is None
# Using a different algorithm
assert Journalist._decode_jwt(jwt.encode(
{'id': 123, 'exp': time.time() + 300},
journalist_app.config['SECRET_KEY'],
algorithm="HS256",
)) is None
# No expiry set
assert Journalist._decode_jwt(jwt.encode(
{'id': 123},
journalist_app.config['SECRET_KEY'],
algorithm="HS512"
)) is None