Skip to content
This repository has been archived by the owner on Nov 5, 2019. It is now read-only.

100% coverage for contrib.appengine #462

Merged
merged 1 commit into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oauth2client/contrib/appengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

XSRF_MEMCACHE_ID = 'xsrf_secret_key'

if _appengine_ndb is None:
if _appengine_ndb is None: # pragma: NO COVER
CredentialsNDBModel = None
CredentialsNDBProperty = None
FlowNDBProperty = None
Expand Down
214 changes: 198 additions & 16 deletions tests/contrib/test_appengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
import tempfile
import time
import unittest
import unittest2

from six.moves import urllib

Expand All @@ -39,14 +39,20 @@
from google.appengine.ext import testbed
from oauth2client.contrib import appengine
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client import GOOGLE_REVOKE_URI
from oauth2client.clientsecrets import _loadfile
from oauth2client.clientsecrets import TYPE_WEB
from oauth2client.clientsecrets import InvalidClientSecretsError
from oauth2client.contrib.appengine import AppAssertionCredentials
from oauth2client.contrib.appengine import CredentialsModel
from oauth2client.contrib.appengine import CredentialsNDBModel
from oauth2client.contrib.appengine import CredentialsProperty
from oauth2client.contrib.appengine import FlowProperty
from oauth2client.contrib.appengine import (
InvalidClientSecretsError as AppEngineInvalidClientSecretsError)

This comment was marked as spam.

This comment was marked as spam.

from oauth2client.contrib.appengine import OAuth2Decorator
from oauth2client.contrib.appengine import OAuth2DecoratorFromClientSecrets
from oauth2client.contrib.appengine import oauth2decorator_from_clientsecrets
from oauth2client.contrib.appengine import StorageByKeyName
from oauth2client.client import _CLOUDSDK_CONFIG_ENV_VAR
from oauth2client.client import AccessTokenRefreshError
Expand Down Expand Up @@ -104,7 +110,7 @@ def request(self, token_uri, method, body, headers, *args, **kwargs):
return self, json.dumps(self.content)


class TestAppAssertionCredentials(unittest.TestCase):
class TestAppAssertionCredentials(unittest2.TestCase):
account_name = "[email protected]"
signature = "signature"

Expand Down Expand Up @@ -287,27 +293,99 @@ class TestFlowModel(db.Model):
flow = FlowProperty()


class FlowPropertyTest(unittest.TestCase):
class FlowPropertyTest(unittest2.TestCase):

def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()

self.flow = flow_from_clientsecrets(
datafile('client_secrets.json'),
'foo',
redirect_uri='oob')

def tearDown(self):
self.testbed.deactivate()

def test_flow_get_put(self):
instance = TestFlowModel(
flow=flow_from_clientsecrets(datafile('client_secrets.json'),
'foo', redirect_uri='oob'),
flow=self.flow,
key_name='foo'
)
instance.put()
retrieved = TestFlowModel.get_by_key_name('foo')

self.assertEqual('foo_client_id', retrieved.flow.client_id)

def test_make_value_from_datastore_none(self):
self.assertIsNone(FlowProperty().make_value_from_datastore(None))

def test_validate(self):
FlowProperty().validate(None)
self.assertRaises(
db.BadValueError,
FlowProperty().validate, 42)


class TestCredentialsModel(db.Model):
credentials = CredentialsProperty()


class CredentialsPropertyTest(unittest2.TestCase):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()

access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
user_agent = 'refresh_checker/1.0'
self.credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, GOOGLE_TOKEN_URI,
user_agent)

def tearDown(self):
self.testbed.deactivate()

def test_credentials_get_put(self):
instance = TestCredentialsModel(
credentials=self.credentials,
key_name='foo'
)
instance.put()
retrieved = TestCredentialsModel.get_by_key_name('foo')

self.assertEqual(
self.credentials.to_json(),
retrieved.credentials.to_json())

def test_make_value_from_datastore(self):
self.assertIsNone(
CredentialsProperty().make_value_from_datastore(None))
self.assertIsNone(
CredentialsProperty().make_value_from_datastore(''))
self.assertIsNone(
CredentialsProperty().make_value_from_datastore('{'))

decoded = CredentialsProperty().make_value_from_datastore(
self.credentials.to_json())
self.assertEqual(
self.credentials.to_json(),
decoded.to_json())

def test_validate(self):
CredentialsProperty().validate(self.credentials)
CredentialsProperty().validate(None)
self.assertRaises(
db.BadValueError,
CredentialsProperty().validate, 42)


def _http_request(*args, **kwargs):
resp = httplib2.Response({'status': '200'})
Expand All @@ -316,7 +394,7 @@ def _http_request(*args, **kwargs):
return resp, content


class StorageByKeyNameTest(unittest.TestCase):
class StorageByKeyNameTest(unittest2.TestCase):

def setUp(self):
self.testbed = testbed.Testbed()
Expand All @@ -339,6 +417,27 @@ def setUp(self):
def tearDown(self):
self.testbed.deactivate()

def test_bad_ctor(self):
with self.assertRaises(ValueError):
StorageByKeyName(CredentialsModel, None, None)

This comment was marked as spam.

This comment was marked as spam.


def test__is_ndb(self):
storage = StorageByKeyName(
object(), 'foo', 'credentials')

self.assertRaises(
TypeError, storage._is_ndb)

storage._model = type(object)
self.assertRaises(
TypeError, storage._is_ndb)

storage._model = CredentialsModel
self.assertFalse(storage._is_ndb())

storage._model = CredentialsNDBModel
self.assertTrue(storage._is_ndb())

def test_get_and_put_simple(self):
storage = StorageByKeyName(
CredentialsModel, 'foo', 'credentials')
Expand Down Expand Up @@ -492,7 +591,7 @@ class MockRequestHandler(object):
request = MockRequest()


class DecoratorTests(unittest.TestCase):
class DecoratorTests(unittest2.TestCase):

def setUp(self):
self.testbed = testbed.Testbed()
Expand Down Expand Up @@ -522,7 +621,7 @@ def get(self):
parent.had_credentials = True
parent.found_credentials = decorator.credentials
if parent.should_raise:
raise Exception('')
raise parent.should_raise

This comment was marked as spam.


class TestAwareHandler(webapp2.RequestHandler):
@decorator.oauth_aware
Expand All @@ -534,7 +633,7 @@ def get(self, *args, **kwargs):
parent.had_credentials = True
parent.found_credentials = decorator.credentials
if parent.should_raise:
raise Exception('')
raise parent.should_raise

routes = [
('/oauth2callback', self.decorator.callback_handler()),
Expand All @@ -557,6 +656,24 @@ def tearDown(self):
self.testbed.deactivate()
httplib2.Http = self.httplib2_orig

def test_in_error(self):
# NOTE: This branch is never reached. _in_error is not set by any code
# path. It appears to be intended to be set during construction.
self.decorator._in_error = True
self.decorator._message = 'foobar'

response = self.app.get('http://localhost/foo_path')
self.assertIn('foobar', response.body)

response = self.app.get('http://localhost/bar_path/1234/56')
self.assertIn('foobar', response.body)

def test_callback_application(self):
app = self.decorator.callback_application()
self.assertEqual(
app.router.match_routes[0].handler.__name__,
'OAuth2Handler')

def test_required(self):
# An initial request to an oauth_required decorated path should be a
# redirect to start the OAuth dance.
Expand Down Expand Up @@ -610,22 +727,32 @@ def test_required(self):
self.assertEqual(None, self.decorator.credentials)

# Raising an exception still clears the Credentials.
self.should_raise = True
self.should_raise = Exception('')

This comment was marked as spam.

self.assertRaises(Exception, self.app.get, '/foo_path')
self.should_raise = False
self.assertEqual(None, self.decorator.credentials)

# Access token refresh error should start the dance again
self.should_raise = AccessTokenRefreshError()
response = self.app.get('/foo_path')
self.should_raise = False
self.assertTrue(response.status.startswith('302'))
query_params = urllib.parse.parse_qs(
response.headers['Location'].split('?', 1)[1])

This comment was marked as spam.

This comment was marked as spam.

self.assertEqual('http://localhost/oauth2callback',
query_params['redirect_uri'][0])

# Invalidate the stored Credentials.
self.found_credentials.invalid = True
self.found_credentials.store.put(self.found_credentials)

# Invalid Credentials should start the OAuth dance again.
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
q = urllib.parse.parse_qs(
query_params = urllib.parse.parse_qs(
response.headers['Location'].split('?', 1)[1])
self.assertEqual('http://localhost/oauth2callback',
q['redirect_uri'][0])
query_params['redirect_uri'][0])

This comment was marked as spam.

This comment was marked as spam.

def test_storage_delete(self):
# An initial request to an oauth_required decorated path should be a
Expand Down Expand Up @@ -710,7 +837,7 @@ def test_aware(self):
self.assertEqual(None, self.decorator.credentials)

# Raising an exception still clears the Credentials.
self.should_raise = True
self.should_raise = Exception('')
self.assertRaises(Exception, self.app.get, '/bar_path/2012/01')
self.should_raise = False
self.assertEqual(None, self.decorator.credentials)
Expand Down Expand Up @@ -769,6 +896,38 @@ def test_decorator_from_client_secrets(self):
self.assertEqual(self.decorator._revoke_uri,
self.decorator.credentials.revoke_uri)

def test_decorator_from_client_secrets_toplevel(self):
decorator_patch = mock.patch(
'oauth2client.contrib.appengine.OAuth2DecoratorFromClientSecrets')

with decorator_patch as decorator_mock:

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

filename = datafile('client_secrets.json')
decorator = oauth2decorator_from_clientsecrets(
filename,
scope='foo_scope')
decorator_mock.assert_called_once_with(
filename,
'foo_scope',
cache=None,
message=None)

def test_decorator_from_client_secrets_bad_type(self):
# NOTE: this code path is not currently reachable, as the only types
# that oauth2client.clientsecrets can load is web and installed, so
# this test forces execution of this code path. Despite not being
# normally reachable, this should remain in case future types of
# credentials are added.

loadfile_patch = mock.patch(
'oauth2client.contrib.appengine.clientsecrets.loadfile')
with loadfile_patch as loadfile_mock:
loadfile_mock.return_value = ('badtype', None)
self.assertRaises(
AppEngineInvalidClientSecretsError,
OAuth2DecoratorFromClientSecrets,
'doesntmatter.json',
scope=['foo_scope', 'bar_scope'])

def test_decorator_from_client_secrets_kwargs(self):
decorator = OAuth2DecoratorFromClientSecrets(
datafile('client_secrets.json'),
Expand Down Expand Up @@ -830,8 +989,31 @@ def test_decorator_from_unfilled_client_secrets_aware(self):
except InvalidClientSecretsError:
pass

def test_decorator_from_client_secrets_with_optional_settings(self):
# Test that the decorator works with the absense of a revoke_uri in
# the client secrets.
loadfile_patch = mock.patch(
'oauth2client.contrib.appengine.clientsecrets.loadfile')
with loadfile_patch as loadfile_mock:
loadfile_mock.return_value = (TYPE_WEB, {
"client_id": "foo_client_id",
"client_secret": "foo_client_secret",
"redirect_uris": [],
"auth_uri": "https://accounts.google.com/o/oauth2/v2/auth",
"token_uri": "https://www.googleapis.com/oauth2/v4/token",
# No revoke URI
})

decorator = OAuth2DecoratorFromClientSecrets(
'doesntmatter.json',
scope=['foo_scope', 'bar_scope'])

self.assertEqual(decorator._revoke_uri, GOOGLE_REVOKE_URI)
# This is never set, but it's consistent with other tests.
self.assertFalse(decorator._in_error)


class DecoratorXsrfSecretTests(unittest.TestCase):
class DecoratorXsrfSecretTests(unittest2.TestCase):
"""Test xsrf_secret_key."""

def setUp(self):
Expand Down Expand Up @@ -880,7 +1062,7 @@ def test_db_insert_ndb_get(self):
self.assertEqual(site_key.secret, secret)


class DecoratorXsrfProtectionTests(unittest.TestCase):
class DecoratorXsrfProtectionTests(unittest2.TestCase):
"""Test _build_state_value and _parse_state_value."""

def setUp(self):
Expand All @@ -902,4 +1084,4 @@ def test_build_and_parse_state(self):


if __name__ == '__main__': # pragma: NO COVER
unittest.main()
unittest2.main()