Skip to content

Commit

Permalink
Adding tests for pyOpenSSL import failures.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Feb 12, 2016
1 parent e56ecf4 commit a1de232
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
16 changes: 13 additions & 3 deletions gcloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

try:
from OpenSSL import crypto
except ImportError:
except ImportError: # pragma: NO COVER
# pyOpenSSL can't be installed on App Engine, but it will not
# be needed there since app_identity is used.
crypto = None
Expand Down Expand Up @@ -175,6 +175,7 @@ def _get_pem_key(credentials):
:rtype: :class:`OpenSSL.crypto.PKey`
:returns: A PKey object used to sign text.
:raises: `TypeError` if `credentials` is the wrong type.
`EnvironmentError` if `crypto` did not import successfully.
"""
if isinstance(credentials, client.SignedJwtAssertionCredentials):
# Take our PKCS12 (.p12) text and convert to PEM text.
Expand All @@ -186,7 +187,11 @@ def _get_pem_key(credentials):
raise TypeError((credentials,
'not a valid service account credentials type'))

return crypto.load_privatekey(crypto.FILETYPE_PEM, pem_text)
if crypto is None:
raise EnvironmentError('pyOpenSSL must be installed to load a '
'private key')
else:
return crypto.load_privatekey(crypto.FILETYPE_PEM, pem_text)


def _get_signature_bytes(credentials, string_to_sign):
Expand All @@ -203,6 +208,7 @@ def _get_signature_bytes(credentials, string_to_sign):
:rtype: bytes
:returns: Signed bytes produced by the credentials.
:raises: `EnvironmentError` if `crypto` did not import successfully.
"""
if isinstance(credentials, _GAECreds):
_, signed_bytes = app_identity.sign_blob(string_to_sign)
Expand All @@ -212,7 +218,11 @@ def _get_signature_bytes(credentials, string_to_sign):
pkey = _get_pem_key(credentials)
if not isinstance(string_to_sign, six.binary_type):
string_to_sign = string_to_sign.encode('utf-8')
return crypto.sign(pkey, string_to_sign, 'SHA256')
if crypto is None:
raise EnvironmentError('pyOpenSSL must be installed to sign '
'content using a private key')
else:
return crypto.sign(pkey, string_to_sign, 'SHA256')


def _get_service_account_name(credentials):
Expand Down
34 changes: 34 additions & 0 deletions gcloud/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,21 @@ def test_gae_type(self):
self.assertEqual(signed_bytes, STRING_TO_SIGN)
self.assertEqual(APP_IDENTITY._strings_signed, [STRING_TO_SIGN])

def test_without_pyopenssl(self):
from gcloud._testing import _Monkey
from gcloud import credentials as credentials_mod

mock_called = []
credentials = object()

def mock_pem_key(local_creds):
mock_called.append(local_creds)

with _Monkey(credentials_mod, crypto=None, _get_pem_key=mock_pem_key):
with self.assertRaises(EnvironmentError):
self._callFUT(credentials, b'STRING_TO_SIGN')
self.assertEqual(mock_called, [credentials])


class Test__get_service_account_name(unittest2.TestCase):

Expand Down Expand Up @@ -520,6 +535,25 @@ def _get_private_key(private_key_pkcs8_text):
[(openssl_crypto.FILETYPE_PEM, PRIVATE_TEXT)])
self.assertEqual(openssl_crypto._signed, [])

def test_without_pyopenssl(self):
from oauth2client import service_account
from gcloud._testing import _Monkey
from gcloud import credentials as credentials_mod

PRIVATE_TEXT = 'dummy_private_key_pkcs8_text'

def _get_private_key(private_key_pkcs8_text):
return private_key_pkcs8_text

with _Monkey(service_account, _get_private_key=_get_private_key):
credentials = service_account._ServiceAccountCredentials(
'dummy_service_account_id', 'dummy_service_account_email',
'dummy_private_key_id', PRIVATE_TEXT, '')

with _Monkey(credentials_mod, crypto=None):
with self.assertRaises(EnvironmentError):
self._callFUT(credentials)


class Test__get_expiration_seconds(unittest2.TestCase):

Expand Down

0 comments on commit a1de232

Please sign in to comment.