This repository has been archived by the owner on Nov 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 431
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Populate scopes for gce.AppAssertionCredentials (#524)
* Populate Scopes for gce.AppAssertionCredentials * _retrieve_scopes -> _retrieve_info * Add note about credentials being initially invalid
- Loading branch information
Showing
4 changed files
with
126 additions
and
99 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,14 +15,13 @@ | |
"""Unit tests for oauth2client.contrib.gce.""" | ||
|
||
import datetime | ||
import httplib2 | ||
import json | ||
|
||
import mock | ||
from six.moves import http_client | ||
from six.moves import urllib | ||
import unittest2 | ||
|
||
from oauth2client.client import Credentials | ||
from oauth2client.client import save_to_well_known_file | ||
from oauth2client.client import HttpAccessTokenRefreshError | ||
from oauth2client.contrib.gce import _SCOPES_WARNING | ||
|
@@ -31,44 +30,60 @@ | |
|
||
__author__ = '[email protected] (Joe Gregorio)' | ||
|
||
SERVICE_ACCOUNT_INFO = { | ||
'scopes': ['a', 'b'], | ||
'email': '[email protected]', | ||
'aliases': ['default'] | ||
} | ||
|
||
class AppAssertionCredentialsTests(unittest2.TestCase): | ||
|
||
def test_constructor(self): | ||
credentials = AppAssertionCredentials(foo='bar') | ||
self.assertEqual(credentials.scope, '') | ||
self.assertEqual(credentials.kwargs, {'foo': 'bar'}) | ||
self.assertEqual(credentials.assertion_type, None) | ||
credentials = AppAssertionCredentials() | ||
self.assertIsNone(credentials.assertion_type, None) | ||
self.assertIsNone(credentials.service_account_email) | ||
self.assertIsNone(credentials.scopes) | ||
self.assertTrue(credentials.invalid) | ||
|
||
@mock.patch('warnings.warn') | ||
def test_constructor_with_scopes(self, warn_mock): | ||
scope = 'http://example.com/a http://example.com/b' | ||
scopes = scope.split() | ||
credentials = AppAssertionCredentials(scope=scopes, foo='bar') | ||
self.assertEqual(credentials.scope, scope) | ||
self.assertEqual(credentials.kwargs, {'foo': 'bar'}) | ||
credentials = AppAssertionCredentials(scopes=scopes) | ||
self.assertEqual(credentials.scopes, None) | ||
self.assertEqual(credentials.assertion_type, None) | ||
warn_mock.assert_called_once_with(_SCOPES_WARNING) | ||
|
||
def test_to_json_and_from_json(self): | ||
def test_to_json(self): | ||
credentials = AppAssertionCredentials() | ||
json = credentials.to_json() | ||
credentials_from_json = Credentials.new_from_json(json) | ||
self.assertEqual(credentials.access_token, | ||
credentials_from_json.access_token) | ||
with self.assertRaises(NotImplementedError): | ||
credentials.to_json() | ||
|
||
def test_from_json(self): | ||
with self.assertRaises(NotImplementedError): | ||
AppAssertionCredentials.from_json({}) | ||
|
||
@mock.patch('oauth2client.contrib._metadata.get_token', | ||
side_effect=[('A', datetime.datetime.min), | ||
('B', datetime.datetime.max)]) | ||
def test_refresh_token(self, metadata): | ||
@mock.patch('oauth2client.contrib._metadata.get_service_account_info', | ||
return_value=SERVICE_ACCOUNT_INFO) | ||
def test_refresh_token(self, get_info, get_token): | ||
http_request = mock.MagicMock() | ||
http_mock = mock.MagicMock(request=http_request) | ||
credentials = AppAssertionCredentials() | ||
credentials.invalid = False | ||
credentials.service_account_email = '[email protected]' | ||
self.assertIsNone(credentials.access_token) | ||
credentials.get_access_token() | ||
credentials.get_access_token(http=http_mock) | ||
self.assertEqual(credentials.access_token, 'A') | ||
self.assertTrue(credentials.access_token_expired) | ||
credentials.get_access_token() | ||
get_token.assert_called_with(http_request, service_account='[email protected]') | ||
credentials.get_access_token(http=http_mock) | ||
self.assertEqual(credentials.access_token, 'B') | ||
self.assertFalse(credentials.access_token_expired) | ||
get_token.assert_called_with(http_request, service_account='[email protected]') | ||
get_info.assert_not_called() | ||
|
||
def test_refresh_token_failed_fetch(self): | ||
http_request = request_mock( | ||
|
@@ -77,46 +92,50 @@ def test_refresh_token_failed_fetch(self): | |
json.dumps({'access_token': 'a', 'expires_in': 100}) | ||
) | ||
credentials = AppAssertionCredentials() | ||
|
||
credentials.invalid = False | ||
credentials.service_account_email = '[email protected]' | ||
with self.assertRaises(HttpAccessTokenRefreshError): | ||
credentials._refresh(http_request=http_request) | ||
credentials._refresh(http_request) | ||
|
||
def test_serialization_data(self): | ||
credentials = AppAssertionCredentials() | ||
self.assertRaises(NotImplementedError, getattr, | ||
credentials, 'serialization_data') | ||
|
||
def test_create_scoped_required_without_scopes(self): | ||
def test_create_scoped_required(self): | ||
credentials = AppAssertionCredentials() | ||
self.assertFalse(credentials.create_scoped_required()) | ||
|
||
@mock.patch('warnings.warn') | ||
def test_create_scoped_required_with_scopes(self, warn_mock): | ||
credentials = AppAssertionCredentials(['dummy_scope']) | ||
self.assertFalse(credentials.create_scoped_required()) | ||
warn_mock.assert_called_once_with(_SCOPES_WARNING) | ||
|
||
@mock.patch('warnings.warn') | ||
def test_create_scoped(self, warn_mock): | ||
credentials = AppAssertionCredentials() | ||
new_credentials = credentials.create_scoped(['dummy_scope']) | ||
self.assertNotEqual(credentials, new_credentials) | ||
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials)) | ||
self.assertEqual('dummy_scope', new_credentials.scope) | ||
warn_mock.assert_called_once_with(_SCOPES_WARNING) | ||
|
||
def test_sign_blob_not_implemented(self): | ||
credentials = AppAssertionCredentials([]) | ||
with self.assertRaises(NotImplementedError): | ||
credentials.sign_blob(b'blob') | ||
|
||
@mock.patch('oauth2client.contrib._metadata.get_service_account_info', | ||
return_value={'email': '[email protected]'}) | ||
def test_service_account_email(self, metadata): | ||
return_value=SERVICE_ACCOUNT_INFO) | ||
def test_retrieve_scopes(self, metadata): | ||
http_request = mock.MagicMock() | ||
http_mock = mock.MagicMock(request=http_request) | ||
credentials = AppAssertionCredentials() | ||
# Assert that service account isn't pre-fetched | ||
metadata.assert_not_called() | ||
self.assertEqual(credentials.service_account_email, '[email protected]') | ||
self.assertTrue(credentials.invalid) | ||
self.assertIsNone(credentials.scopes) | ||
scopes = credentials.retrieve_scopes(http_mock) | ||
self.assertEqual(scopes, SERVICE_ACCOUNT_INFO['scopes']) | ||
self.assertFalse(credentials.invalid) | ||
credentials.retrieve_scopes(http_mock) | ||
# Assert scopes weren't refetched | ||
metadata.assert_called_once_with(http_request, service_account='default') | ||
|
||
@mock.patch('oauth2client.contrib._metadata.get_service_account_info', | ||
side_effect=httplib2.HttpLib2Error('No Such Email')) | ||
def test_retrieve_scopes_bad_email(self, metadata): | ||
http_request = mock.MagicMock() | ||
http_mock = mock.MagicMock(request=http_request) | ||
credentials = AppAssertionCredentials(email='[email protected]') | ||
with self.assertRaises(httplib2.HttpLib2Error): | ||
credentials.retrieve_scopes(http_mock) | ||
|
||
metadata.assert_called_once_with(http_request, service_account='[email protected]') | ||
|
||
def test_save_to_well_known_file(self): | ||
import os | ||
|
Oops, something went wrong.