Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing pyOpenSSL import to fail for GAE. #1446

Merged
merged 1 commit into from
Feb 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion gcloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
import six
from six.moves.urllib.parse import urlencode # pylint: disable=F0401

from OpenSSL import crypto
try:
from OpenSSL import crypto
except ImportError: # pragma: NO COVER
# pyOpenSSL can't be installed on App Engine, but it will not
# be needed there since app_identity is used.
crypto = None

from oauth2client import client
from oauth2client.client import _get_application_default_credential_from_file
Expand Down Expand Up @@ -170,6 +175,7 @@ def _get_pem_key(credentials):
:rtype: :class:`OpenSSL.crypto.PKey`
:returns: A PKey object used to sign text.
:raises: `TypeError` if `credentials` is the wrong type.
`EnvironmentError` if `crypto` did not import successfully.
"""
if isinstance(credentials, client.SignedJwtAssertionCredentials):
# Take our PKCS12 (.p12) text and convert to PEM text.
Expand All @@ -181,6 +187,9 @@ def _get_pem_key(credentials):
raise TypeError((credentials,
'not a valid service account credentials type'))

if crypto is None:
raise EnvironmentError(
'pyOpenSSL must be installed to load a private key')
return crypto.load_privatekey(crypto.FILETYPE_PEM, pem_text)


Expand All @@ -198,6 +207,7 @@ def _get_signature_bytes(credentials, string_to_sign):

:rtype: bytes
:returns: Signed bytes produced by the credentials.
:raises: `EnvironmentError` if `crypto` did not import successfully.
"""
if isinstance(credentials, _GAECreds):
_, signed_bytes = app_identity.sign_blob(string_to_sign)
Expand All @@ -207,6 +217,10 @@ def _get_signature_bytes(credentials, string_to_sign):
pkey = _get_pem_key(credentials)
if not isinstance(string_to_sign, six.binary_type):
string_to_sign = string_to_sign.encode('utf-8')
if crypto is None:

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

raise EnvironmentError(
'pyOpenSSL must be installed to sign content using a '
'private key')
return crypto.sign(pkey, string_to_sign, 'SHA256')


Expand Down
34 changes: 34 additions & 0 deletions gcloud/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,21 @@ def test_gae_type(self):
self.assertEqual(signed_bytes, STRING_TO_SIGN)
self.assertEqual(APP_IDENTITY._strings_signed, [STRING_TO_SIGN])

def test_without_pyopenssl(self):
from gcloud._testing import _Monkey
from gcloud import credentials as credentials_mod

mock_called = []
credentials = object()

def mock_pem_key(local_creds):
mock_called.append(local_creds)

with _Monkey(credentials_mod, crypto=None, _get_pem_key=mock_pem_key):
with self.assertRaises(EnvironmentError):
self._callFUT(credentials, b'STRING_TO_SIGN')
self.assertEqual(mock_called, [credentials])


class Test__get_service_account_name(unittest2.TestCase):

Expand Down Expand Up @@ -520,6 +535,25 @@ def _get_private_key(private_key_pkcs8_text):
[(openssl_crypto.FILETYPE_PEM, PRIVATE_TEXT)])
self.assertEqual(openssl_crypto._signed, [])

def test_without_pyopenssl(self):
from oauth2client import service_account
from gcloud._testing import _Monkey
from gcloud import credentials as credentials_mod

PRIVATE_TEXT = 'dummy_private_key_pkcs8_text'

def _get_private_key(private_key_pkcs8_text):
return private_key_pkcs8_text

with _Monkey(service_account, _get_private_key=_get_private_key):
credentials = service_account._ServiceAccountCredentials(
'dummy_service_account_id', 'dummy_service_account_email',
'dummy_private_key_id', PRIVATE_TEXT, '')

with _Monkey(credentials_mod, crypto=None):
with self.assertRaises(EnvironmentError):
self._callFUT(credentials)


class Test__get_expiration_seconds(unittest2.TestCase):

Expand Down