-
Notifications
You must be signed in to change notification settings - Fork 310
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add unit-tests for GCE ID Token Credentials
Signed-off-by: Christophe Taton <[email protected]>
- Loading branch information
Showing
1 changed file
with
252 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
from google.auth import _helpers | ||
from google.auth import exceptions | ||
from google.auth import jwt | ||
from google.auth import transport | ||
from google.auth.compute_engine import credentials | ||
|
||
|
@@ -105,3 +106,254 @@ def test_before_request_refreshes(self, get): | |
|
||
# Credentials should now be valid. | ||
assert self.credentials.valid | ||
|
||
|
||
class TestIDTokenCredentials(object): | ||
credentials = None | ||
|
||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
def test_default_state(self, get): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scope': ['one', 'two'], | ||
}] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://example.com") | ||
|
||
assert not self.credentials.valid | ||
# Expiration hasn't been set yet | ||
assert not self.credentials.expired | ||
# Service account email hasn't been populated | ||
assert (self.credentials.service_account_email | ||
== '[email protected]') | ||
# Signer is initialized | ||
assert self.credentials.signer | ||
assert self.credentials.signer_email == '[email protected]' | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
def test_make_authorization_grant_assertion(self, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': ['one', 'two'] | ||
}] | ||
sign.side_effect = [b'signature'] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com") | ||
|
||
# Generate authorization grant: | ||
token = self.credentials._make_authorization_grant_assertion() | ||
payload = jwt.decode(token, verify=False) | ||
|
||
# The JWT token signature is 'signature' encoded in base 64: | ||
assert token.endswith(b'.c2lnbmF0dXJl') | ||
|
||
# Check that the credentials have the token and proper expiration | ||
assert payload == { | ||
'aud': 'https://www.googleapis.com/oauth2/v4/token', | ||
'exp': 3600, | ||
'iat': 0, | ||
'iss': '[email protected]', | ||
'target_audience': 'https://audience.com'} | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
def test_with_service_account(self, sign, get, utcnow): | ||
sign.side_effect = [b'signature'] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com", | ||
service_account_email="[email protected]") | ||
|
||
# Generate authorization grant: | ||
token = self.credentials._make_authorization_grant_assertion() | ||
payload = jwt.decode(token, verify=False) | ||
|
||
# The JWT token signature is 'signature' encoded in base 64: | ||
assert token.endswith(b'.c2lnbmF0dXJl') | ||
|
||
# Check that the credentials have the token and proper expiration | ||
assert payload == { | ||
'aud': 'https://www.googleapis.com/oauth2/v4/token', | ||
'exp': 3600, | ||
'iat': 0, | ||
'iss': '[email protected]', | ||
'target_audience': 'https://audience.com'} | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
def test_additional_claims(self, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': ['one', 'two'] | ||
}] | ||
sign.side_effect = [b'signature'] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com", | ||
additional_claims={'foo': 'bar'}) | ||
|
||
# Generate authorization grant: | ||
token = self.credentials._make_authorization_grant_assertion() | ||
payload = jwt.decode(token, verify=False) | ||
|
||
# The JWT token signature is 'signature' encoded in base 64: | ||
assert token.endswith(b'.c2lnbmF0dXJl') | ||
|
||
# Check that the credentials have the token and proper expiration | ||
assert payload == { | ||
'aud': 'https://www.googleapis.com/oauth2/v4/token', | ||
'exp': 3600, | ||
'iat': 0, | ||
'iss': '[email protected]', | ||
'target_audience': 'https://audience.com', | ||
'foo': 'bar'} | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
def test_with_target_audience(self, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': ['one', 'two'] | ||
}] | ||
sign.side_effect = [b'signature'] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com") | ||
self.credentials = ( | ||
self.credentials.with_target_audience("https://actually.not")) | ||
|
||
# Generate authorization grant: | ||
token = self.credentials._make_authorization_grant_assertion() | ||
payload = jwt.decode(token, verify=False) | ||
|
||
# The JWT token signature is 'signature' encoded in base 64: | ||
assert token.endswith(b'.c2lnbmF0dXJl') | ||
|
||
# Check that the credentials have the token and proper expiration | ||
assert payload == { | ||
'aud': 'https://www.googleapis.com/oauth2/v4/token', | ||
'exp': 3600, | ||
'iat': 0, | ||
'iss': '[email protected]', | ||
'target_audience': 'https://actually.not'} | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
@mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True) | ||
def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': ['one', 'two'] | ||
}] | ||
sign.side_effect = [b'signature'] | ||
id_token_jwt_grant.side_effect = [( | ||
'idtoken', | ||
datetime.datetime.utcfromtimestamp(3600), | ||
{}, | ||
)] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com") | ||
|
||
# Refresh credentials | ||
self.credentials.refresh(None) | ||
|
||
# Check that the credentials have the token and proper expiration | ||
assert self.credentials.token == 'idtoken' | ||
assert self.credentials.expiry == ( | ||
datetime.datetime.utcfromtimestamp(3600)) | ||
|
||
# Check the credential info | ||
assert (self.credentials.service_account_email == | ||
'[email protected]') | ||
|
||
# Check that the credentials are valid (have a token and are not | ||
# expired) | ||
assert self.credentials.valid | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
def test_refresh_error(self, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': ['one', 'two'], | ||
}] | ||
sign.side_effect = [b'signature'] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
response = mock.Mock() | ||
response.data = b'{"error": "http error"}' | ||
response.status = 500 | ||
request.side_effect = [response] | ||
|
||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com") | ||
|
||
with pytest.raises(exceptions.RefreshError) as excinfo: | ||
self.credentials.refresh(request) | ||
|
||
assert excinfo.match(r'http error') | ||
|
||
@mock.patch( | ||
'google.auth._helpers.utcnow', | ||
return_value=datetime.datetime.utcfromtimestamp(0)) | ||
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True) | ||
@mock.patch('google.auth.iam.Signer.sign', autospec=True) | ||
@mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True) | ||
def test_before_request_refreshes( | ||
self, id_token_jwt_grant, sign, get, utcnow): | ||
get.side_effect = [{ | ||
'email': '[email protected]', | ||
'scopes': 'one two' | ||
}] | ||
sign.side_effect = [b'signature'] | ||
id_token_jwt_grant.side_effect = [( | ||
'idtoken', | ||
datetime.datetime.utcfromtimestamp(3600), | ||
{}, | ||
)] | ||
|
||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials = credentials.IDTokenCredentials( | ||
request=request, target_audience="https://audience.com") | ||
|
||
# Credentials should start as invalid | ||
assert not self.credentials.valid | ||
|
||
# before_request should cause a refresh | ||
request = mock.create_autospec(transport.Request, instance=True) | ||
self.credentials.before_request( | ||
request, 'GET', 'http://example.com?a=1#3', {}) | ||
|
||
# The refresh endpoint should've been called. | ||
assert get.called | ||
|
||
# Credentials should now be valid. | ||
assert self.credentials.valid |