Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signer and IDTokenCredentials implementation using default service account on GCE #236

Merged
merged 3 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/auth/compute_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
"""Google Compute Engine authentication."""

from google.auth.compute_engine.credentials import Credentials
from google.auth.compute_engine.credentials import IDTokenCredentials


__all__ = [
'Credentials'
'Credentials',
'IDTokenCredentials',
]
129 changes: 129 additions & 0 deletions google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

"""

import datetime

import six

from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
from google.auth import iam
from google.auth import jwt
from google.auth.compute_engine import _metadata
from google.oauth2 import _client


class Credentials(credentials.ReadOnlyScoped, credentials.Credentials):
Expand Down Expand Up @@ -108,3 +114,126 @@ def service_account_email(self):
def requires_scopes(self):
"""False: Compute Engine credentials can not be scoped."""
return False


_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'


class IDTokenCredentials(credentials.Credentials, credentials.Signing):
"""Open ID Connect ID Token-based service account credentials.

These credentials relies on the default service account of a GCE instance.

This comment was marked as spam.

This comment was marked as spam.


In order for this to work, the GCE instance must have been started with
a service account that has access to the IAM Cloud API.
"""
def __init__(self, request, target_audience,
token_uri=_DEFAULT_TOKEN_URI,
additional_claims=None,
service_account_email=None):
"""
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token. The ID Token's ``aud`` claim
will be set to this string.
token_uri (str): The OAuth 2.0 Token URI.
additional_claims (Mapping[str, str]): Any additional claims for
the JWT assertion used in the authorization grant.
service_account_email (str): Optional explicit service account to
use to sign JWT tokens.
By default, this is the default GCE service account.
"""
super(IDTokenCredentials, self).__init__()

if service_account_email is None:
sa_info = _metadata.get_service_account_info(request)
service_account_email = sa_info['email']
self._service_account_email = service_account_email

self._signer = iam.Signer(
request=request,
credentials=Credentials(),

This comment was marked as spam.

This comment was marked as spam.

service_account_email=service_account_email)

self._token_uri = token_uri
self._target_audience = target_audience

if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}

def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
audience.
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token.
Returns:
google.auth.service_account.IDTokenCredentials: A new credentials
instance.
"""
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy())

def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
This assertion is used during the OAuth 2.0 grant to acquire an
ID token.
Returns:
bytes: The authorization grant assertion.
"""
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime

payload = {
'iat': _helpers.datetime_to_secs(now),
'exp': _helpers.datetime_to_secs(expiry),
# The issuer must be the service account email.
'iss': self.service_account_email,
# The audience must be the auth token endpoint's URI
'aud': self._token_uri,
# The target audience specifies which service the ID token is
# intended for.
'target_audience': self._target_audience
}

payload.update(self._additional_claims)

token = jwt.encode(self._signer, payload)

return token

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
assertion = self._make_authorization_grant_assertion()
access_token, expiry, _ = _client.id_token_jwt_grant(
request, self._token_uri, assertion)
self.token = access_token
self.expiry = expiry

@property
@_helpers.copy_docstring(credentials.Signing)
def signer(self):
return self._signer

@_helpers.copy_docstring(credentials.Signing)
def sign_bytes(self, message):
return self._signer.sign(message)

@property
def service_account_email(self):
"""The service account email."""
return self._service_account_email

@property
def signer_email(self):
return self._service_account_email