diff --git a/docs/settings.md b/docs/settings.md index 5f930900..0ba2317d 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -98,6 +98,10 @@ The default is `knox.AuthToken` [DATETIME_FORMAT]: https://www.django-rest-framework.org/api-guide/settings/#date-and-time-formatting [strftime format]: https://docs.python.org/3/library/time.html#time.strftime +## TOKEN_PREFIX +This is the prefix for the generated token that is used in the Authorization header. The default is just an empty string. +It can be up to `CONSTANTS.MAXIMUM_TOKEN_PREFIX_LENGTH` long. + # Constants `knox.settings` Knox also provides some constants for information. These must not be changed in external code; they are used in the model definitions in knox and an error will @@ -111,3 +115,6 @@ print(CONSTANTS.DIGEST_LENGTH) #=> 128 ## DIGEST_LENGTH This is the length of the digest that will be stored in the database for each token. + +## MAXIMUM_TOKEN_PREFIX_LENGTH +This is the maximum length of the token prefix. diff --git a/knox/crypto.py b/knox/crypto.py index 3ce8811e..02b53697 100644 --- a/knox/crypto.py +++ b/knox/crypto.py @@ -12,6 +12,14 @@ def create_token_string(): ).decode() +def make_hex_compatible(token: str) -> str: + """ + We need to make sure that the token, that is send is hex-compatible. + When a token prefix is used, we cannot garantee that. + """ + return binascii.unhexlify(binascii.hexlify(bytes(token, 'utf-8'))) + + def hash_token(token: str) -> str: """ Calculates the hash of a token. @@ -19,5 +27,5 @@ def hash_token(token: str) -> str: a binascii.Error exception will be raised. """ digest = hash_func() - digest.update(binascii.unhexlify(token)) + digest.update(make_hex_compatible(token)) return digest.hexdigest() diff --git a/knox/migrations/0009_extend_authtoken_field.py b/knox/migrations/0009_extend_authtoken_field.py new file mode 100644 index 00000000..18a33836 --- /dev/null +++ b/knox/migrations/0009_extend_authtoken_field.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1rc1 on 2022-07-20 17:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("knox", "0008_remove_authtoken_salt"), + ] + + operations = [ + migrations.AlterField( + model_name="authtoken", + name="token_key", + field=models.CharField(db_index=True, max_length=25), + ), + ] diff --git a/knox/models.py b/knox/models.py index 4d316650..b054f289 100644 --- a/knox/models.py +++ b/knox/models.py @@ -7,20 +7,25 @@ from knox import crypto from knox.settings import CONSTANTS, knox_settings +sha = knox_settings.SECURE_HASH_ALGORITHM + User = settings.AUTH_USER_MODEL class AuthTokenManager(models.Manager): - def create(self, user, expiry=knox_settings.TOKEN_TTL, **kwargs): - token = crypto.create_token_string() + def create( + self, + user, + expiry=knox_settings.TOKEN_TTL, + prefix=knox_settings.TOKEN_PREFIX + ): + token = prefix + crypto.create_token_string() digest = crypto.hash_token(token) - if expiry is not None: expiry = timezone.now() + expiry - instance = super(AuthTokenManager, self).create( token_key=token[:CONSTANTS.TOKEN_KEY_LENGTH], digest=digest, - user=user, expiry=expiry, **kwargs) + user=user, expiry=expiry) return instance, token @@ -31,7 +36,10 @@ class AbstractAuthToken(models.Model): digest = models.CharField( max_length=CONSTANTS.DIGEST_LENGTH, primary_key=True) token_key = models.CharField( - max_length=CONSTANTS.TOKEN_KEY_LENGTH, db_index=True) + max_length=CONSTANTS.MAXIMUM_TOKEN_PREFIX_LENGTH + + CONSTANTS.TOKEN_KEY_LENGTH, + db_index=True + ) user = models.ForeignKey(User, null=False, blank=False, related_name='auth_token_set', on_delete=models.CASCADE) created = models.DateTimeField(auto_now_add=True) diff --git a/knox/settings.py b/knox/settings.py index b86e95bb..a5def499 100644 --- a/knox/settings.py +++ b/knox/settings.py @@ -17,6 +17,7 @@ 'AUTH_HEADER_PREFIX': 'Token', 'EXPIRY_DATETIME_FORMAT': api_settings.DATETIME_FORMAT, 'TOKEN_MODEL': getattr(settings, 'KNOX_TOKEN_MODEL', 'knox.AuthToken'), + 'TOKEN_PREFIX': '', } IMPORT_STRINGS = { @@ -32,6 +33,8 @@ def reload_api_settings(*args, **kwargs): setting, value = kwargs['setting'], kwargs['value'] if setting == 'REST_KNOX': knox_settings = APISettings(value, DEFAULTS, IMPORT_STRINGS) + if len(knox_settings.TOKEN_PREFIX) > CONSTANTS.MAXIMUM_TOKEN_PREFIX_LENGTH: + raise ValueError("Illegal TOKEN_PREFIX length") setting_changed.connect(reload_api_settings) @@ -41,8 +44,9 @@ class CONSTANTS: ''' Constants cannot be changed at runtime ''' - TOKEN_KEY_LENGTH = 8 + TOKEN_KEY_LENGTH = 15 DIGEST_LENGTH = 128 + MAXIMUM_TOKEN_PREFIX_LENGTH = 10 def __setattr__(self, *args, **kwargs): raise Exception(''' diff --git a/knox/views.py b/knox/views.py index 7b3d4920..7a6b5719 100644 --- a/knox/views.py +++ b/knox/views.py @@ -22,6 +22,9 @@ def get_context(self): def get_token_ttl(self): return knox_settings.TOKEN_TTL + def get_token_prefix(self): + return knox_settings.TOKEN_PREFIX + def get_token_limit_per_user(self): return knox_settings.TOKEN_LIMIT_PER_USER @@ -36,8 +39,9 @@ def format_expiry_datetime(self, expiry): return DateTimeField(format=datetime_format).to_representation(expiry) def create_token(self): + token_prefix = self.get_token_prefix() return get_token_model().objects.create( - user=self.request.user, expiry=self.get_token_ttl() + user=self.request.user, expiry=self.get_token_ttl(), prefix=token_prefix ) def get_post_response_data(self, request, token, instance): diff --git a/tests/tests.py b/tests/tests.py index 914ce204..574305c2 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -10,7 +10,7 @@ from rest_framework.test import APIRequestFactory, APITestCase as TestCase from six.moves import reload_module -from knox import auth, views +from knox import auth, crypto, views from knox.auth import TokenAuthentication from knox.models import AuthToken from knox.serializers import UserSerializer @@ -45,6 +45,13 @@ def get_basic_auth_header(username, password): expiry_datetime_format_knox = knox_settings.defaults.copy() expiry_datetime_format_knox["EXPIRY_DATETIME_FORMAT"] = EXPIRY_DATETIME_FORMAT +token_prefix = "TEST_" +token_prefix_knox = knox_settings.defaults.copy() +token_prefix_knox["TOKEN_PREFIX"] = token_prefix + +token_prefix_too_long = "a" * CONSTANTS.MAXIMUM_TOKEN_PREFIX_LENGTH + "a" +token_prefix_too_long_knox = knox_settings.defaults.copy() +token_prefix_too_long_knox["TOKEN_PREFIX"] = token_prefix_too_long class AuthTestCase(TestCase): @@ -419,3 +426,65 @@ def test_expiry_is_present(self): response.data['expiry'], DateTimeField().to_representation(AuthToken.objects.first().expiry) ) + + def test_login_returns_serialized_token_with_prefix_when_prefix_set(self): + with override_settings(REST_KNOX=token_prefix_knox): + reload_module(views) + reload_module(crypto) + self.assertEqual(AuthToken.objects.count(), 0) + url = reverse('knox_login') + self.client.credentials( + HTTP_AUTHORIZATION=get_basic_auth_header(self.username, self.password) + ) + response = self.client.post( + url, + {}, + format='json' + ) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.data['token'].startswith(token_prefix)) + reload_module(views) + reload_module(crypto) + + def test_token_with_prefix_returns_200(self): + with override_settings(REST_KNOX=token_prefix_knox): + reload_module(views) + self.assertEqual(AuthToken.objects.count(), 0) + url = reverse('knox_login') + self.client.credentials( + HTTP_AUTHORIZATION=get_basic_auth_header(self.username, self.password) + ) + response = self.client.post( + url, + {}, + format='json' + ) + self.assertEqual(response.status_code, 200) + self.assertTrue(response.data['token'].startswith(token_prefix)) + self.client.credentials(HTTP_AUTHORIZATION=('Token %s' % response.data['token'])) + response = self.client.get(root_url, {}, format='json') + self.assertEqual(response.status_code, 200) + reload_module(views) + + def test_prefix_set_longer_than_max_length_raises_valueerror(self): + with self.assertRaises(ValueError): + with override_settings(REST_KNOX=token_prefix_too_long_knox): + pass + + def test_tokens_created_before_prefix_still_work(self): + self.client.credentials( + HTTP_AUTHORIZATION=get_basic_auth_header(self.username, self.password) + ) + url = reverse('knox_login') + response = self.client.post( + url, + {}, + format='json' + ) + self.assertFalse(response.data['token'].startswith(token_prefix)) + with override_settings(REST_KNOX=token_prefix_knox): + reload_module(views) + self.client.credentials(HTTP_AUTHORIZATION=('Token %s' % response.data['token'])) + response = self.client.get(root_url, {}, format='json') + self.assertEqual(response.status_code, 200) + reload_module(views)