-
-
Notifications
You must be signed in to change notification settings - Fork 794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
App specific scopes #395
App specific scopes #395
Changes from all commits
30aa3c1
0868bd2
d0e9726
e102579
2849b71
68c1cab
bbeb1f6
20a333d
a218b8c
76ba31d
89dd171
2d78b55
422c788
d08e442
37e89d8
5f45b74
5147c26
2e7c772
486ea36
b9e3c77
c7e1773
44f87be
6941270
8c5962e
5027287
1a90b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# -*- coding: utf-8 -*- | ||
# Generated by Django 1.10b1 on 2016-06-29 15:18 | ||
from __future__ import unicode_literals | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('oauth2_provider', '0003_auto_20160316_1503'), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name='application', | ||
name='allowed_scopes', | ||
field=models.TextField(blank=True, help_text='List of allowed scopes for this application, space separated'), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
from django.conf import settings | ||
from django.core.urlresolvers import reverse | ||
from django.db import models, transaction | ||
from django.utils import timezone | ||
from django.utils import timezone, six, six | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're importing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps you should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering DOT already depends on |
||
|
||
from django.utils.translation import ugettext_lazy as _ | ||
from django.utils.encoding import python_2_unicode_compatible | ||
|
@@ -38,6 +38,9 @@ class AbstractApplication(models.Model): | |
* :attr:`client_secret` Confidential secret issued to the client during | ||
the registration process as described in :rfc:`2.2` | ||
* :attr:`name` Friendly name for the Application | ||
* :attr:`skip_authorization` Boolean indication that this application doesn't present | ||
the Authorize view to the user | ||
* :attr:`allowed_scopes` Space separated list of scopes this application can ever get | ||
""" | ||
CLIENT_CONFIDENTIAL = 'confidential' | ||
CLIENT_PUBLIC = 'public' | ||
|
@@ -72,6 +75,9 @@ class AbstractApplication(models.Model): | |
default=generate_client_secret, db_index=True) | ||
name = models.CharField(max_length=255, blank=True) | ||
skip_authorization = models.BooleanField(default=False) | ||
# only used if not blank | ||
allowed_scopes = models.TextField(help_text="List of allowed scopes for this application, space separated", | ||
blank=True) | ||
|
||
class Meta: | ||
abstract = True | ||
|
@@ -123,6 +129,22 @@ def clean(self): | |
def get_absolute_url(self): | ||
return reverse('oauth2_provider:detail', args=[str(self.id)]) | ||
|
||
def get_allowed_scopes_from_scopes(self, scopes): | ||
""" | ||
Filters the given list of scopes so it only contains the allowed scopes | ||
""" | ||
if isinstance(scopes, six.string_types): | ||
professorhaseeb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
scopes = scopes.split(' ') | ||
allowed_scopes = None | ||
if self.allowed_scopes: | ||
# this will be [''], which evaluates to True if allowed_scopes is the empty string (but not None) | ||
allowed_scopes = self.allowed_scopes.split(' ') | ||
if allowed_scopes: | ||
# now reduce down to allowed scopes and make it a space separated list again | ||
scopes = set(allowed_scopes) & set(scopes) | ||
return ' '.join(scopes) | ||
|
||
|
||
def __str__(self): | ||
return self.name or self.client_id | ||
|
||
|
@@ -216,6 +238,8 @@ def allow_scopes(self, scopes): | |
""" | ||
if not scopes: | ||
return True | ||
if isinstance(scopes, six.string_types): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if it is not a string it has to be a list, so then there is no need to split it into a list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get that. Looking more closely at the existing code and docstring, this method expects an iterable of strings. You are weakening that contract to allow for a single string. However, the only place where you are consuming this is in tests. This change seems unnecessary. To keep the contract clean, if you do use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well a string is in fact an iterable, using it by accident might show up weird errors, this is a bit more developer friendly in that it will do the thing that you wanted to happen when you pass a string. But you are also right that it might encourage the use of a single string and confuse more people when they see inconsitent usage of 'scopes' |
||
scopes = scopes.split() | ||
|
||
provided_scopes = set(self.scope.split()) | ||
resource_scopes = set(scopes) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
from django.test import TestCase | ||
|
||
from ..models import get_application_model | ||
from ..views.base import AuthorizationView | ||
|
||
|
||
Application = get_application_model() | ||
|
@@ -76,6 +77,12 @@ def setUp(self): | |
self.app_bar_1 = self._create_application('app bar_user 1', self.bar_user) | ||
self.app_bar_2 = self._create_application('app bar_user 2', self.bar_user) | ||
|
||
def test_get_application(self): | ||
"""Test the get_application method from the AuthorizationView""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be helpful if the docstring explained what about the method you are testing. For example, |
||
self.assertEqual(self.app_foo_1, AuthorizationView().get_application(self.app_foo_1.client_id)) | ||
# test twice to test the cache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless you're checking DB query counts or calls to the cache, you aren't truly validating caching behavior. |
||
self.assertEqual(self.app_foo_1, AuthorizationView().get_application(self.app_foo_1.client_id)) | ||
|
||
def tearDown(self): | ||
super(TestApplicationViews, self).tearDown() | ||
Application.objects.all().delete() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,77 @@ def test_allow_scopes(self): | |
self.assertTrue(access_token.allow_scopes(['write', 'read', 'read'])) | ||
self.assertTrue(access_token.allow_scopes([])) | ||
self.assertFalse(access_token.allow_scopes(['write', 'destroy'])) | ||
self.assertTrue(access_token.allow_scopes('read write')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is a great candidate for parameterization. |
||
self.assertTrue(access_token.allow_scopes('write read')) | ||
self.assertTrue(access_token.allow_scopes('write read read')) | ||
self.assertTrue(access_token.allow_scopes('')) | ||
self.assertFalse(access_token.allow_scopes('write destroy')) | ||
|
||
|
||
def test_get_allowed_scopes_from_scopes(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is also a great candidate for parameterization. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test also needs a docstring, and, based on the length (60 lines!), should be split into multiple test cases. |
||
self.client.login(username="test_user", password="123456") | ||
app = Application.objects.create( | ||
name="test_app", | ||
redirect_uris="http://localhost http://example.com http://example.it", | ||
user=self.user, | ||
client_type=Application.CLIENT_CONFIDENTIAL, | ||
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE, | ||
) | ||
|
||
# app has no allowed scopes set, so everything specified should be allowed (but no more) | ||
scopes = app.get_allowed_scopes_from_scopes(['read', 'write']) | ||
self.assertTrue(all(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes('read write') | ||
self.assertTrue(all(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes(['write', 'read']) | ||
self.assertTrue(all(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes(['write', 'read', 'read']) | ||
self.assertTrue(all(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes('write read read') | ||
self.assertTrue(all(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes([]) | ||
self.assertFalse(any(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes('') | ||
self.assertFalse(any(x in scopes for x in ['read', 'write'])) | ||
scopes = app.get_allowed_scopes_from_scopes(['read', 'destroy']) | ||
self.assertTrue(all(x in scopes for x in ['read', 'destroy'])) | ||
self.assertFalse ('write' in scopes) | ||
|
||
app.allowed_scopes = 'read destroy' | ||
|
||
# app does not allow write scope, so it should never be in allowed scopes | ||
scopes = app.get_allowed_scopes_from_scopes(['read', 'write']) | ||
self.assertTrue('read' in scopes) | ||
self.assertFalse('write' in scopes) | ||
self.assertFalse('destroy' in scopes) | ||
|
||
scopes = app.get_allowed_scopes_from_scopes('read write') | ||
self.assertTrue('read' in scopes) | ||
self.assertFalse('write' in scopes) | ||
self.assertFalse('destroy' in scopes) | ||
|
||
scopes = app.get_allowed_scopes_from_scopes(['write', 'read']) | ||
self.assertTrue('read' in scopes) | ||
self.assertFalse('write' in scopes) | ||
self.assertFalse('destroy' in scopes) | ||
scopes = app.get_allowed_scopes_from_scopes(['write', 'read', 'read']) | ||
self.assertTrue('read' in scopes) | ||
self.assertFalse('write' in scopes) | ||
self.assertFalse('destroy' in scopes) | ||
|
||
scopes = app.get_allowed_scopes_from_scopes('write read read') | ||
self.assertTrue('read' in scopes) | ||
self.assertFalse('write' in scopes) | ||
self.assertFalse('destroy' in scopes) | ||
|
||
scopes = app.get_allowed_scopes_from_scopes([]) | ||
self.assertFalse(any(x in scopes for x in ['read', 'write', 'destroy'])) | ||
scopes = app.get_allowed_scopes_from_scopes('') | ||
self.assertFalse(any(x in scopes for x in ['read', 'write', 'destroy'])) | ||
scopes = app.get_allowed_scopes_from_scopes(['read', 'destroy']) | ||
self.assertTrue('read' in scopes) | ||
self.assertTrue('destroy' in scopes) | ||
self.assertFalse('write' in scopes) | ||
|
||
def test_grant_authorization_code_redirect_uris(self): | ||
app = Application( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,12 +72,13 @@ class AuthorizationView(BaseAuthorizationView, FormView): | |
server_class = oauth2_settings.OAUTH2_SERVER_CLASS | ||
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS | ||
oauthlib_backend_class = oauth2_settings.OAUTH2_BACKEND_CLASS | ||
_application_cache = {} | ||
|
||
skip_authorization_completely = False | ||
def getscopes(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return self.oauth2_data.get('scope', self.oauth2_data.get('scopes', [])) | ||
|
||
def get_initial(self): | ||
# TODO: move this scopes conversion from and to string into a utils function | ||
scopes = self.oauth2_data.get('scope', self.oauth2_data.get('scopes', [])) | ||
scopes = self.getscopes() | ||
initial_data = { | ||
'redirect_uri': self.oauth2_data.get('redirect_uri', None), | ||
'scope': ' '.join(scopes), | ||
|
@@ -87,6 +88,14 @@ def get_initial(self): | |
} | ||
return initial_data | ||
|
||
def get_application(self, client_id): | ||
""" | ||
get an application from a client_id, this caches the applications | ||
""" | ||
if client_id not in self._application_cache: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggested alternative: application = self._application_cache.get(client_id)
if not application:
application = get_application_model().objects.get(client_id=client_id)
self._application_cache[client_id] = application
return application There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that's better. But that aside this cache can be pulled into its own PR. |
||
self._application_cache[client_id] = get_application_model().objects.get(client_id=client_id) | ||
return self._application_cache[client_id] | ||
|
||
def form_valid(self, form): | ||
try: | ||
credentials = { | ||
|
@@ -95,8 +104,9 @@ def form_valid(self, form): | |
'response_type': form.cleaned_data.get('response_type', None), | ||
'state': form.cleaned_data.get('state', None), | ||
} | ||
application = self.get_application(credentials['client_id']) | ||
scopes = application.get_allowed_scopes_from_scopes(form.cleaned_data.get('scope')) | ||
|
||
scopes = form.cleaned_data.get('scope') | ||
allow = form.cleaned_data.get('allow') | ||
uri, headers, body, status = self.create_authorization_response( | ||
request=self.request, scopes=scopes, credentials=credentials, allow=allow) | ||
|
@@ -113,7 +123,11 @@ def get(self, request, *args, **kwargs): | |
kwargs['scopes_descriptions'] = [oauth2_settings.SCOPES[scope] for scope in scopes] | ||
kwargs['scopes'] = scopes | ||
# at this point we know an Application instance with such client_id exists in the database | ||
application = get_application_model().objects.get(client_id=credentials['client_id']) # TODO: cache it! | ||
application = self.get_application(credentials['client_id']) | ||
# filter scopes here, so an application with skip_authorization still only gets the allowed scopes | ||
# returns a string | ||
scopes = application.get_allowed_scopes_from_scopes(scopes) | ||
|
||
kwargs['application'] = application | ||
kwargs.update(credentials) | ||
self.oauth2_data = kwargs | ||
|
@@ -131,7 +145,7 @@ def get(self, request, *args, **kwargs): | |
# are already approved. | ||
if application.skip_authorization: | ||
uri, headers, body, status = self.create_authorization_response( | ||
request=self.request, scopes=" ".join(scopes), | ||
request=self.request, scopes=scopes, | ||
credentials=credentials, allow=True) | ||
return HttpResponseUriRedirect(uri) | ||
|
||
|
@@ -142,7 +156,7 @@ def get(self, request, *args, **kwargs): | |
for token in tokens: | ||
if token.allow_scopes(scopes): | ||
uri, headers, body, status = self.create_authorization_response( | ||
request=self.request, scopes=" ".join(scopes), | ||
request=self.request, scopes=scopes, | ||
credentials=credentials, allow=True) | ||
return HttpResponseUriRedirect(uri) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there also be a data migration that sets the scopes for existing applications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe so, if no custom scope is set, the default scopes will still be used.