diff --git a/kms/api-client/README.rst b/kms/api-client/README.rst index 28777c841d84..17e34befaf0c 100644 --- a/kms/api-client/README.rst +++ b/kms/api-client/README.rst @@ -92,18 +92,5 @@ To run this sample: $ python snippets.py - usage: snippets.py [-h] - {create_key_ring,create_crypto_key,encrypt,decrypt,disable_crypto_key_version,enable_crypto_key_version,destroy_crypto_key_version,restore_crypto_key_version,add_member_to_crypto_key_policy,get_key_ring_policy} - ... - positional arguments: - {create_key_ring,create_crypto_key,encrypt,decrypt,disable_crypto_key_version,enable_crypto_key_version,destroy_crypto_key_version,restore_crypto_key_version,add_member_to_crypto_key_policy,get_key_ring_policy} - - optional arguments: - -h, --help show this help message and exit - - - - - -.. _Google Cloud SDK: https://cloud.google.com/sdk/ \ No newline at end of file +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/kms/api-client/README.rst.in b/kms/api-client/README.rst.in index 6299b5a0a535..f8aef3a243f7 100644 --- a/kms/api-client/README.rst.in +++ b/kms/api-client/README.rst.in @@ -18,5 +18,7 @@ samples: - name: Snippets file: snippets.py show_help: True +- name: Asymmetric + file: asymmetric.py -folder: kms/api-client \ No newline at end of file +folder: kms/api-client diff --git a/kms/api-client/asymmetric.py b/kms/api-client/asymmetric.py index 7f0d11aa608f..9c8abd15cc9b 100644 --- a/kms/api-client/asymmetric.py +++ b/kms/api-client/asymmetric.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License.rom googleapiclient import discovery -import base64 import hashlib from cryptography.exceptions import InvalidSignature @@ -21,63 +20,96 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, padding, utils +from google.cloud import kms_v1 +from google.cloud.kms_v1 import enums + + +# [START kms_create_asymmetric_key] +def create_asymmetric_key(project_id, location_id, key_ring_id, crypto_key_id): + """Creates an RSA encrypt/decrypt key pair within a specified KeyRing.""" + + # Creates an API client for the KMS API. + client = kms_v1.KeyManagementServiceClient() + + # The resource name of the KeyRing associated with the CryptoKey. + parent = client.key_ring_path(project_id, location_id, key_ring_id) + + # Create the CryptoKey object template + purpose = enums.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_DECRYPT + algorithm = enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm.\ + RSA_DECRYPT_OAEP_2048_SHA256 + crypto_key = {'purpose': purpose, + 'version_template': {'algorithm': algorithm}} + + # Create a CryptoKey for the given KeyRing. + response = client.create_crypto_key(parent, crypto_key_id, crypto_key) + + print('Created CryptoKey {}.'.format(response.name)) + return response +# [END kms_create_asymmetric_key] + # [START kms_get_asymmetric_public] -def getAsymmetricPublicKey(client, key_path): +def get_asymmetric_public_key(key_name): """ Retrieves the public key from a saved asymmetric key pair on Cloud KMS + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" + Requires: cryptography.hazmat.backends.default_backend cryptography.hazmat.primitives.serialization """ - request = client.projects() \ - .locations() \ - .keyRings() \ - .cryptoKeys() \ - .cryptoKeyVersions() \ - .getPublicKey(name=key_path) - response = request.execute() - key_txt = response['pem'].encode('ascii') + + client = kms_v1.KeyManagementServiceClient() + response = client.get_public_key(key_name) + + key_txt = response.pem.encode('ascii') key = serialization.load_pem_public_key(key_txt, default_backend()) return key # [END kms_get_asymmetric_public] # [START kms_decrypt_rsa] -def decryptRSA(ciphertext, client, key_path): +def decrypt_rsa(ciphertext, key_name): """ Decrypt the input ciphertext (bytes) using an 'RSA_DECRYPT_OAEP_2048_SHA256' private key stored on Cloud KMS - Requires: - base64 + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" """ - request_body = {'ciphertext': base64.b64encode(ciphertext).decode('utf-8')} - request = client.projects() \ - .locations() \ - .keyRings() \ - .cryptoKeys() \ - .cryptoKeyVersions() \ - .asymmetricDecrypt(name=key_path, - body=request_body) - response = request.execute() - plaintext = base64.b64decode(response['plaintext']) - return plaintext + + client = kms_v1.KeyManagementServiceClient() + response = client.asymmetric_decrypt(key_name, ciphertext) + return response.plaintext # [END kms_decrypt_rsa] # [START kms_encrypt_rsa] -def encryptRSA(plaintext, client, key_path): +def encrypt_rsa(plaintext, key_name): """ Encrypt the input plaintext (bytes) locally using an 'RSA_DECRYPT_OAEP_2048_SHA256' public key retrieved from Cloud KMS + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" + Requires: cryptography.hazmat.primitives.asymmetric.padding cryptography.hazmat.primitives.hashes """ - public_key = getAsymmetricPublicKey(client, key_path) + # get the public key + client = kms_v1.KeyManagementServiceClient() + response = client.get_public_key(key_name) + key_txt = response.pem.encode('ascii') + public_key = serialization.load_pem_public_key(key_txt, default_backend()) + + # encrypt plaintext pad = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) @@ -86,38 +118,39 @@ def encryptRSA(plaintext, client, key_path): # [START kms_sign_asymmetric] -def signAsymmetric(message, client, key_path): +def sign_asymmetric(message, key_name): """ Create a signature for a message using a private key stored on Cloud KMS + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" + Requires: - base64 hashlib """ # Note: some key algorithms will require a different hash function # For example, EC_SIGN_P384_SHA384 requires SHA384 + client = kms_v1.KeyManagementServiceClient() digest_bytes = hashlib.sha256(message).digest() - digest64 = base64.b64encode(digest_bytes) - - digest_JSON = {'sha256': digest64.decode('utf-8')} - request = client.projects() \ - .locations() \ - .keyRings() \ - .cryptoKeys() \ - .cryptoKeyVersions() \ - .asymmetricSign(name=key_path, - body={'digest': digest_JSON}) - response = request.execute() - return base64.b64decode(response.get('signature', None)) + + digest_json = {'sha256': digest_bytes} + + response = client.asymmetric_sign(key_name, digest_json) + return response.signature # [END kms_sign_asymmetric] # [START kms_verify_signature_rsa] -def verifySignatureRSA(signature, message, client, key_path): +def verify_signature_rsa(signature, message, key_name): """ Verify the validity of an 'RSA_SIGN_PSS_2048_SHA256' signature for the specified message + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" + Requires: cryptography.exceptions.InvalidSignature cryptography.hazmat.primitives.asymmetric.padding @@ -125,7 +158,13 @@ def verifySignatureRSA(signature, message, client, key_path): cryptography.hazmat.primitives.hashes hashlib """ - public_key = getAsymmetricPublicKey(client, key_path) + # get the public key + client = kms_v1.KeyManagementServiceClient() + response = client.get_public_key(key_name) + key_txt = response.pem.encode('ascii') + public_key = serialization.load_pem_public_key(key_txt, default_backend()) + + # get the digest of the message digest_bytes = hashlib.sha256(message).digest() try: @@ -143,11 +182,15 @@ def verifySignatureRSA(signature, message, client, key_path): # [START kms_verify_signature_ec] -def verifySignatureEC(signature, message, client, key_path): +def verify_signature_ec(signature, message, key_name): """ Verify the validity of an 'EC_SIGN_P256_SHA256' signature for the specified message + Example key_name: + "projects/PROJECT_ID/locations/global/keyRings/RING_ID/cryptoKeys\ + /KEY_ID/cryptoKeyVersions/1" + Requires: cryptography.exceptions.InvalidSignature cryptography.hazmat.primitives.asymmetric.ec @@ -155,7 +198,13 @@ def verifySignatureEC(signature, message, client, key_path): cryptography.hazmat.primitives.hashes hashlib """ - public_key = getAsymmetricPublicKey(client, key_path) + # get the public key + client = kms_v1.KeyManagementServiceClient() + response = client.get_public_key(key_name) + key_txt = response.pem.encode('ascii') + public_key = serialization.load_pem_public_key(key_txt, default_backend()) + + # get the digest of the message digest_bytes = hashlib.sha256(message).digest() try: diff --git a/kms/api-client/asymmetric_test.py b/kms/api-client/asymmetric_test.py index 4ce9b32aed5b..cc621003c475 100644 --- a/kms/api-client/asymmetric_test.py +++ b/kms/api-client/asymmetric_test.py @@ -16,135 +16,118 @@ from os import environ from time import sleep +import asymmetric + from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePublicKey from cryptography.hazmat.backends.openssl.rsa import _RSAPublicKey -from googleapiclient import discovery -from googleapiclient.errors import HttpError -import asymmetric as sample +from google.api_core.exceptions import GoogleAPICallError +from google.cloud.kms_v1 import enums +from snippets import create_key_ring -def create_key_helper(key_id, key_path, purpose, algorithm, t): - try: - t.client.projects() \ - .locations() \ - .keyRings() \ - .cryptoKeys() \ - .create(parent='{}/keyRings/{}'.format(t.parent, t.keyring), - body={'purpose': purpose, - 'versionTemplate': { - 'algorithm': algorithm - } - }, - cryptoKeyId=key_id) \ - .execute() - return True - except HttpError: - # key already exists - return False +from snippets_test import create_key_helper def setup_module(module): """ Set up keys in project if needed """ - t = TestKMSSamples() + t = TestKMSAsymmetric() try: # create keyring - t.client.projects() \ - .locations() \ - .keyRings() \ - .create(parent=t.parent, body={}, keyRingId=t.keyring) \ - .execute() - except HttpError: + create_key_ring(t.project_id, t.location, t.keyring_id) + except GoogleAPICallError: # keyring already exists pass - s1 = create_key_helper(t.rsaDecryptId, t.rsaDecrypt, 'ASYMMETRIC_DECRYPT', - 'RSA_DECRYPT_OAEP_2048_SHA256', t) - s2 = create_key_helper(t.rsaSignId, t.rsaSign, 'ASYMMETRIC_SIGN', - 'RSA_SIGN_PSS_2048_SHA256', t) - s3 = create_key_helper(t.ecSignId, t.ecSign, 'ASYMMETRIC_SIGN', - 'EC_SIGN_P256_SHA256', t) + s1 = create_key_helper(t.rsaDecryptId, + enums.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_DECRYPT, + enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm. + RSA_DECRYPT_OAEP_2048_SHA256, + t) + s2 = create_key_helper(t.rsaSignId, + enums.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN, + enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm. + RSA_SIGN_PSS_2048_SHA256, + t) + s3 = create_key_helper(t.ecSignId, + enums.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN, + enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm. + EC_SIGN_P256_SHA256, + t) + if s1 or s2 or s3: # leave time for keys to initialize sleep(20) -class TestKMSSamples: - +class TestKMSAsymmetric: project_id = environ['GCLOUD_PROJECT'] - keyring = 'kms-asymmetric-samples4' - parent = 'projects/{}/locations/global'.format(project_id) + keyring_id = 'kms-samples' + location = 'global' + parent = 'projects/{}/locations/{}'.format(project_id, location) rsaSignId = 'rsa-sign' rsaDecryptId = 'rsa-decrypt' ecSignId = 'ec-sign' rsaSign = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \ - .format(parent, keyring, rsaSignId) + .format(parent, keyring_id, rsaSignId) rsaDecrypt = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \ - .format(parent, keyring, rsaDecryptId) + .format(parent, keyring_id, rsaDecryptId) ecSign = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \ - .format(parent, keyring, ecSignId) + .format(parent, keyring_id, ecSignId) message = 'test message 123' message_bytes = message.encode('utf-8') - client = discovery.build('cloudkms', 'v1') - def test_get_public_key(self): - rsa_key = sample.getAsymmetricPublicKey(self.client, self.rsaDecrypt) + rsa_key = asymmetric.get_asymmetric_public_key(self.rsaDecrypt) + assert isinstance(rsa_key, _RSAPublicKey), 'expected RSA key' + rsa_key = asymmetric.get_asymmetric_public_key(self.rsaSign) assert isinstance(rsa_key, _RSAPublicKey), 'expected RSA key' - ec_key = sample.getAsymmetricPublicKey(self.client, self.ecSign) + ec_key = asymmetric.get_asymmetric_public_key(self.ecSign) assert isinstance(ec_key, _EllipticCurvePublicKey), 'expected EC key' def test_rsa_encrypt_decrypt(self): - ciphertext = sample.encryptRSA(self.message_bytes, - self.client, - self.rsaDecrypt) - # ciphertext should be 256 characters with base64 and RSA 2048 + ciphertext = asymmetric.encrypt_rsa(self.message_bytes, + self.rsaDecrypt) + # signature should be 256 bytes for RSA 2048 assert len(ciphertext) == 256, \ 'ciphertext should be 256 chars; got {}'.format(len(ciphertext)) - plaintext_bytes = sample.decryptRSA(ciphertext, - self.client, - self.rsaDecrypt) + plaintext_bytes = asymmetric.decrypt_rsa(ciphertext, + self.rsaDecrypt) assert plaintext_bytes == self.message_bytes plaintext = plaintext_bytes.decode('utf-8') assert plaintext == self.message def test_rsa_sign_verify(self): - sig = sample.signAsymmetric(self.message_bytes, - self.client, - self.rsaSign) - # ciphertext should be 344 characters with base64 and RSA 2048 + sig = asymmetric.sign_asymmetric(self.message_bytes, + self.rsaSign) + # signature should be 256 bytes for RSA 2048 assert len(sig) == 256, \ 'sig should be 256 chars; got {}'.format(len(sig)) - success = sample.verifySignatureRSA(sig, - self.message_bytes, - self.client, - self.rsaSign) + success = asymmetric.verify_signature_rsa(sig, + self.message_bytes, + self.rsaSign) assert success is True, 'RSA verification failed' changed_bytes = self.message_bytes + b'.' - success = sample.verifySignatureRSA(sig, - changed_bytes, - self.client, - self.rsaSign) + success = asymmetric.verify_signature_rsa(sig, + changed_bytes, + self.rsaSign) assert success is False, 'verify should fail with modified message' def test_ec_sign_verify(self): - sig = sample.signAsymmetric(self.message_bytes, - self.client, - self.ecSign) + sig = asymmetric.sign_asymmetric(self.message_bytes, + self.ecSign) assert len(sig) > 50 and len(sig) < 300, \ 'sig outside expected length range' - success = sample.verifySignatureEC(sig, - self.message_bytes, - self.client, - self.ecSign) + success = asymmetric.verify_signature_ec(sig, + self.message_bytes, + self.ecSign) assert success is True, 'EC verification failed' changed_bytes = self.message_bytes + b'.' - success = sample.verifySignatureEC(sig, - changed_bytes, - self.client, - self.ecSign) + success = asymmetric.verify_signature_ec(sig, + changed_bytes, + self.ecSign) assert success is False, 'verify should fail with modified message' diff --git a/kms/api-client/quickstart.py b/kms/api-client/quickstart.py index 042bcae9ecb8..2f97f38f70de 100644 --- a/kms/api-client/quickstart.py +++ b/kms/api-client/quickstart.py @@ -13,32 +13,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and +import os + def run_quickstart(): # [START kms_quickstart] # Imports the Google APIs client library - import googleapiclient.discovery + from google.cloud import kms_v1 # Your Google Cloud Platform project ID project_id = 'YOUR_PROJECT_ID' + # [END kms_quickstart] + project_id = os.environ['GCLOUD_PROJECT'] + # [START kms_quickstart] # Lists keys in the "global" location. location = 'global' # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the location associated with the key rings. - parent = 'projects/{}/locations/{}'.format(project_id, location) + parent = client.location_path(project_id, location) # Lists key rings - request = kms_client.projects().locations().keyRings().list(parent=parent) - response = request.execute() + response = client.list_key_rings(parent) + response_list = list(response) - if 'keyRings' in response and response['keyRings']: + if len(response_list) > 0: print('Key rings:') - for key_ring in response['keyRings']: - print(key_ring['name']) + for key_ring in response_list: + print(key_ring.name) else: print('No key rings found.') # [END kms_quickstart] diff --git a/kms/api-client/requirements.txt b/kms/api-client/requirements.txt index e75d6b4d639a..74f7afd79000 100644 --- a/kms/api-client/requirements.txt +++ b/kms/api-client/requirements.txt @@ -1,4 +1,2 @@ -google-api-python-client==1.7.4 -google-auth==1.6.1 -google-auth-httplib2==0.0.3 +google-cloud-kms==0.2 cryptography==2.4.2 diff --git a/kms/api-client/snippets.py b/kms/api-client/snippets.py index 73e4a65d812b..07b027051913 100644 --- a/kms/api-client/snippets.py +++ b/kms/api-client/snippets.py @@ -13,11 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import argparse -import base64 -import io -import googleapiclient.discovery +from google.cloud import kms_v1 +from google.cloud.kms_v1 import enums # [START kms_create_keyring] @@ -25,17 +23,20 @@ def create_key_ring(project_id, location_id, key_ring_id): """Creates a KeyRing in the given location (e.g. global).""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the location associated with the KeyRing. - parent = 'projects/{}/locations/{}'.format(project_id, location_id) + parent = client.location_path(project_id, location_id) - # Create KeyRing - request = kms_client.projects().locations().keyRings().create( - parent=parent, body={}, keyRingId=key_ring_id) - response = request.execute() + # The keyring object template + keyring_name = client.key_ring_path(project_id, location_id, key_ring_id) + keyring = {'name': keyring_name} - print('Created KeyRing {}.'.format(response['name'])) + # Create a KeyRing + response = client.create_key_ring(parent, key_ring_id, keyring) + + print('Created KeyRing {}.'.format(response.name)) + return response # [END kms_create_keyring] @@ -44,87 +45,55 @@ def create_crypto_key(project_id, location_id, key_ring_id, crypto_key_id): """Creates a CryptoKey within a KeyRing in the given location.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the KeyRing associated with the CryptoKey. - parent = 'projects/{}/locations/{}/keyRings/{}'.format( - project_id, location_id, key_ring_id) + parent = client.key_ring_path(project_id, location_id, key_ring_id) + + # Create the CryptoKey object template + purpose = enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT + crypto_key = {'purpose': purpose} # Create a CryptoKey for the given KeyRing. - request = kms_client.projects().locations().keyRings().cryptoKeys().create( - parent=parent, body={'purpose': 'ENCRYPT_DECRYPT'}, - cryptoKeyId=crypto_key_id) - response = request.execute() + response = client.create_crypto_key(parent, crypto_key_id, crypto_key) - print('Created CryptoKey {}.'.format(response['name'])) + print('Created CryptoKey {}.'.format(response.name)) + return response # [END kms_create_cryptokey] # [START kms_encrypt] -def encrypt(project_id, location_id, key_ring_id, crypto_key_id, - plaintext_file_name, ciphertext_file_name): - """Encrypts data from plaintext_file_name using the provided CryptoKey and - saves it to ciphertext_file_name so it can only be recovered with a call to - decrypt. - """ +def encrypt_symmetric(project_id, location_id, key_ring_id, crypto_key_id, + plaintext): + """Encrypts input plaintext data using the provided symmetric CryptoKey.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the CryptoKey. - name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format( - project_id, location_id, key_ring_id, crypto_key_id) - - # Read data from the input file. - with io.open(plaintext_file_name, 'rb') as plaintext_file: - plaintext = plaintext_file.read() + name = client.crypto_key_path_path(project_id, location_id, key_ring_id, + crypto_key_id) # Use the KMS API to encrypt the data. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.encrypt( - name=name, - body={'plaintext': base64.b64encode(plaintext).decode('ascii')}) - response = request.execute() - ciphertext = base64.b64decode(response['ciphertext'].encode('ascii')) - - # Write the encrypted data to a file. - with io.open(ciphertext_file_name, 'wb') as ciphertext_file: - ciphertext_file.write(ciphertext) - - print('Saved ciphertext to {}.'.format(ciphertext_file_name)) + response = client.encrypt(name, plaintext) + return response.ciphertext # [END kms_encrypt] # [START kms_decrypt] -def decrypt(project_id, location_id, key_ring_id, crypto_key_id, - ciphertext_file_name, plaintext_file_name): - """Decrypts data from ciphertext_file_name that was previously encrypted - using the provided CryptoKey and saves it to plaintext_file_name.""" +def decrypt_symmetric(project_id, location_id, key_ring_id, crypto_key_id, + ciphertext): + """Decrypts input ciphertext using the provided symmetric CryptoKey.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the CryptoKey. - name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format( - project_id, location_id, key_ring_id, crypto_key_id) - - # Read encrypted data from the input file. - with io.open(ciphertext_file_name, 'rb') as ciphertext_file: - ciphertext = ciphertext_file.read() - + name = client.crypto_key_path_path(project_id, location_id, key_ring_id, + crypto_key_id) # Use the KMS API to decrypt the data. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.decrypt( - name=name, - body={'ciphertext': base64.b64encode(ciphertext).decode('ascii')}) - response = request.execute() - plaintext = base64.b64decode(response['plaintext'].encode('ascii')) - - # Write the decrypted data to a file. - with io.open(plaintext_file_name, 'wb') as plaintext_file: - plaintext_file.write(plaintext) - - print('Saved plaintext to {}.'.format(plaintext_file_name)) + response = client.decrypt(name, ciphertext) + return response.plaintext # [END kms_decrypt] @@ -135,23 +104,21 @@ def disable_crypto_key_version(project_id, location_id, key_ring_id, KeyRing.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # Construct the resource name of the CryptoKeyVersion. - name = ( - 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}' - .format( - project_id, location_id, key_ring_id, crypto_key_id, version_id)) + name = client.crypto_key_version_path(project_id, location_id, key_ring_id, + crypto_key_id, version_id) # Use the KMS API to disable the CryptoKeyVersion. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.cryptoKeyVersions().patch( - name=name, body={'state': 'DISABLED'}, updateMask='state') - response = request.execute() + new_state = enums.CryptoKeyVersion.CryptoKeyVersionState.DISABLED + version = {'name': name, 'state': new_state} + update_mask = {'paths': ["state"]} + # Print results + response = client.update_crypto_key_version(version, update_mask) print('CryptoKeyVersion {}\'s state has been set to {}.'.format( - name, response['state'])) + name, response.state)) # [END kms_disable_cryptokey_version] @@ -162,23 +129,21 @@ def enable_crypto_key_version(project_id, location_id, key_ring_id, KeyRing.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # Construct the resource name of the CryptoKeyVersion. - name = ( - 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}' - .format( - project_id, location_id, key_ring_id, crypto_key_id, version_id)) + name = client.crypto_key_version_path(project_id, location_id, key_ring_id, + crypto_key_id, version_id) # Use the KMS API to enable the CryptoKeyVersion. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.cryptoKeyVersions().patch( - name=name, body={'state': 'ENABLED'}, updateMask='state') - response = request.execute() + new_state = enums.CryptoKeyVersion.CryptoKeyVersionState.ENABLED + version = {'name': name, 'state': new_state} + update_mask = {'paths': ["state"]} + # Print results + response = client.update_crypto_key_version(version, update_mask) print('CryptoKeyVersion {}\'s state has been set to {}.'.format( - name, response['state'])) + name, response.state)) # [END kms_enable_cryptokey_version] @@ -189,22 +154,18 @@ def destroy_crypto_key_version( KeyRing for destruction 24 hours in the future.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # Construct the resource name of the CryptoKeyVersion. - name = ( - 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}' - .format( - project_id, location_id, key_ring_id, crypto_key_id, version_id)) + name = client.crypto_key_version_path(project_id, location_id, key_ring_id, + crypto_key_id, version_id) - # Use the KMS API to schedule the CryptoKeyVersion for destruction. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.cryptoKeyVersions().destroy(name=name, body={}) - response = request.execute() + # Use the KMS API to mark the CryptoKeyVersion for destruction. + response = client.destroy_crypto_key_version(name) + # Print results print('CryptoKeyVersion {}\'s state has been set to {}.'.format( - name, response['state'])) + name, response.state)) # [END kms_destroy_cryptokey_version] @@ -214,22 +175,20 @@ def restore_crypto_key_version( """Restores a CryptoKeyVersion that is scheduled for destruction.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # Construct the resource name of the CryptoKeyVersion. - name = ( - 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}' - .format( - project_id, location_id, key_ring_id, crypto_key_id, version_id)) + name = client.crypto_key_version_path(project_id, location_id, key_ring_id, + crypto_key_id, version_id) # Use the KMS API to restore the CryptoKeyVersion. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.cryptoKeyVersions().restore(name=name, body={}) - response = request.execute() + response = client.restore_crypto_key_version(name) + # Print results print('CryptoKeyVersion {}\'s state has been set to {}.'.format( - name, response['state'])) + name, response.state)) + + # [END kms_restore_cryptokey_version] @@ -240,217 +199,154 @@ def add_member_to_crypto_key_policy( (IAM) policy for a given CryptoKey associated with a KeyRing.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the CryptoKey. - parent = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format( - project_id, location_id, key_ring_id, crypto_key_id) - - # Get the current IAM policy and add the new member to it. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - policy_request = crypto_keys.getIamPolicy(resource=parent) - policy_response = policy_request.execute() - bindings = [] - if 'bindings' in policy_response.keys(): - bindings = policy_response['bindings'] - members = [] - members.append(member) - new_binding = dict() - new_binding['role'] = role - new_binding['members'] = members - bindings.append(new_binding) - policy_response['bindings'] = bindings - - # Set the new IAM Policy. - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - request = crypto_keys.setIamPolicy( - resource=parent, body={'policy': policy_response}) - request.execute() - - print_msg = ( - 'Member {} added with role {} to policy for CryptoKey {} in KeyRing {}' - .format(member, role, crypto_key_id, key_ring_id)) - print(print_msg) + resource = client.crypto_key_path_path(project_id, location_id, + key_ring_id, crypto_key_id) + # Get the current IAM policy. + policy = client.get_iam_policy(resource) + + # Add member + policy.bindings.add( + role=role, + members=[member]) + + # Update the IAM Policy. + client.set_iam_policy(resource, policy) + + # Print results + print('Member {} added with role {} to policy for CryptoKey {} \ + in KeyRing {}'.format(member, role, crypto_key_id, key_ring_id)) # [END kms_add_member_to_cryptokey_policy] +# [START kms_add_member_to_keyring_policy] +def add_member_to_key_ring_policy( + project_id, location_id, key_ring_id, member, role): + """Adds a member with a given role to the Identity and Access Management + (IAM) policy for a given KeyRing.""" + + # Creates an API client for the KMS API. + client = kms_v1.KeyManagementServiceClient() + + # The resource name of the KeyRing. + resource = client.key_ring_path(project_id, location_id, key_ring_id) + + # Get the current IAM policy. + policy = client.get_iam_policy(resource) + + # Add member + policy.bindings.add( + role=role, + members=[member]) + + # Update the IAM Policy. + client.set_iam_policy(resource, policy) + + # Print results + print('Member {} added with role {} to policy in KeyRing {}' + .format(member, role, key_ring_id)) + +# [END kms_add_member_to_keyring_policy] + + +# [START kms_remove_member_from_cryptokey_policy] +def remove_member_from_crypto_key_policy( + project_id, location_id, key_ring_id, crypto_key_id, member, role): + """Removes a member with a given role from the Identity and Access + Management (IAM) policy for a given CryptoKey associated with a KeyRing.""" + + # Creates an API client for the KMS API. + client = kms_v1.KeyManagementServiceClient() + + # The resource name of the CryptoKey. + resource = client.crypto_key_path_path(project_id, location_id, + key_ring_id, crypto_key_id) + # Get the current IAM policy. + policy = client.get_iam_policy(resource) + + # Remove member + for b in list(policy.bindings): + if b.role == role and member in b.members: + b.members.remove(member) + + # Update the IAM Policy. + client.set_iam_policy(resource, policy) + + # Print results + print('Member {} removed from role {} for CryptoKey in KeyRing {}' + .format(member, role, crypto_key_id, key_ring_id)) +# [END kms_remove_member_from_cryptokey_policy] + + +def remove_member_from_key_ring_policy(project_id, location_id, key_ring_id, + member, role): + """Removes a member with a given role from the Identity and Access + Management (IAM) policy for a given KeyRing.""" + + # Creates an API client for the KMS API. + client = kms_v1.KeyManagementServiceClient() + + # The resource name of the KeyRing. + resource = client.key_ring_path(project_id, location_id, key_ring_id) + + # Get the current IAM policy. + policy = client.get_iam_policy(resource) + + # Remove member + for b in list(policy.bindings): + if b.role == role and member in b.members: + b.members.remove(member) + + # Update the IAM Policy. + client.set_iam_policy(resource, policy) + + # Print results + print('Member {} removed from role {} for KeyRing {}' + .format(member, role, key_ring_id)) + + # [START kms_get_keyring_policy] def get_key_ring_policy(project_id, location_id, key_ring_id): """Gets the Identity and Access Management (IAM) policy for a given KeyRing and prints out roles and the members assigned to those roles.""" # Creates an API client for the KMS API. - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') + client = kms_v1.KeyManagementServiceClient() # The resource name of the KeyRing. - parent = 'projects/{}/locations/{}/keyRings/{}'.format( - project_id, location_id, key_ring_id) + resource = client.key_ring_path(project_id, location_id, key_ring_id) # Get the current IAM policy. - request = kms_client.projects().locations().keyRings().getIamPolicy( - resource=parent) - response = request.execute() - - if 'bindings' in response.keys(): - print('Printing IAM policy for resource {}:'.format(parent)) - for binding in response['bindings']: - print('') - print('Role: {}'.format(binding['role'])) - print('Members:') - for member in binding['members']: - print(member) - print('') - else: - print('No roles found for resource {}.'.format(parent)) + policy = client.get_iam_policy(resource) + + # Print results + print('Printing IAM policy for resource {}:'.format(resource)) + for b in policy.bindings: + for m in b.members: + print('Role: {} Member: {}'.format(b.role, m)) + return policy # [END kms_get_keyring_policy] -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter) - subparsers = parser.add_subparsers(dest='command') - - create_key_ring_parser = subparsers.add_parser('create_key_ring') - create_key_ring_parser.add_argument('project') - create_key_ring_parser.add_argument('location') - create_key_ring_parser.add_argument('key_ring') - - create_crypto_key_parser = subparsers.add_parser('create_crypto_key') - create_crypto_key_parser.add_argument('project') - create_crypto_key_parser.add_argument('location') - create_crypto_key_parser.add_argument('key_ring') - create_crypto_key_parser.add_argument('crypto_key') - - encrypt_parser = subparsers.add_parser('encrypt') - encrypt_parser.add_argument('project') - encrypt_parser.add_argument('location') - encrypt_parser.add_argument('key_ring') - encrypt_parser.add_argument('crypto_key') - encrypt_parser.add_argument('infile') - encrypt_parser.add_argument('outfile') - - decrypt_parser = subparsers.add_parser('decrypt') - decrypt_parser.add_argument('project') - decrypt_parser.add_argument('location') - decrypt_parser.add_argument('key_ring') - decrypt_parser.add_argument('crypto_key') - decrypt_parser.add_argument('infile') - decrypt_parser.add_argument('outfile') - - disable_crypto_key_version_parser = subparsers.add_parser( - 'disable_crypto_key_version') - disable_crypto_key_version_parser.add_argument('project') - disable_crypto_key_version_parser.add_argument('location') - disable_crypto_key_version_parser.add_argument('key_ring') - disable_crypto_key_version_parser.add_argument('crypto_key') - disable_crypto_key_version_parser.add_argument('version') - - enable_crypto_key_version_parser = subparsers.add_parser( - 'enable_crypto_key_version') - enable_crypto_key_version_parser.add_argument('project') - enable_crypto_key_version_parser.add_argument('location') - enable_crypto_key_version_parser.add_argument('key_ring') - enable_crypto_key_version_parser.add_argument('crypto_key') - enable_crypto_key_version_parser.add_argument('version') - - destroy_crypto_key_version_parser = subparsers.add_parser( - 'destroy_crypto_key_version') - destroy_crypto_key_version_parser.add_argument('project') - destroy_crypto_key_version_parser.add_argument('location') - destroy_crypto_key_version_parser.add_argument('key_ring') - destroy_crypto_key_version_parser.add_argument('crypto_key') - destroy_crypto_key_version_parser.add_argument('version') - - restore_crypto_key_version_parser = subparsers.add_parser( - 'restore_crypto_key_version') - restore_crypto_key_version_parser.add_argument('project') - restore_crypto_key_version_parser.add_argument('location') - restore_crypto_key_version_parser.add_argument('key_ring') - restore_crypto_key_version_parser.add_argument('crypto_key') - restore_crypto_key_version_parser.add_argument('version') - - add_member_to_crypto_key_policy_parser = subparsers.add_parser( - 'add_member_to_crypto_key_policy') - add_member_to_crypto_key_policy_parser.add_argument('project') - add_member_to_crypto_key_policy_parser.add_argument('location') - add_member_to_crypto_key_policy_parser.add_argument('key_ring') - add_member_to_crypto_key_policy_parser.add_argument('crypto_key') - add_member_to_crypto_key_policy_parser.add_argument('member') - add_member_to_crypto_key_policy_parser.add_argument('role') - - get_key_ring_policy_parser = subparsers.add_parser('get_key_ring_policy') - get_key_ring_policy_parser.add_argument('project') - get_key_ring_policy_parser.add_argument('location') - get_key_ring_policy_parser.add_argument('key_ring') - - args = parser.parse_args() - - if args.command == 'create_key_ring': - create_key_ring( - args.project, - args.location, - args.key_ring) - elif args.command == 'create_crypto_key': - create_crypto_key( - args.project, - args.location, - args.key_ring, - args.crypto_key) - elif args.command == 'encrypt': - encrypt( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.infile, - args.outfile) - elif args.command == 'decrypt': - decrypt( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.infile, - args.outfile) - elif args.command == 'disable_crypto_key_version': - disable_crypto_key_version( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.version) - elif args.command == 'enable_crypto_key_version': - enable_crypto_key_version( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.version) - elif args.command == 'destroy_crypto_key_version': - destroy_crypto_key_version( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.version) - elif args.command == 'restore_crypto_key_version': - restore_crypto_key_version( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.version) - elif args.command == 'add_member_to_crypto_key_policy': - add_member_to_crypto_key_policy( - args.project, - args.location, - args.key_ring, - args.crypto_key, - args.member, - args.role) - elif args.command == 'get_key_ring_policy': - get_key_ring_policy( - args.project, - args.location, - args.key_ring) +def get_crypto_key_policy(project_id, location_id, key_ring_id, crypto_key_id): + """Gets the Identity and Access Management (IAM) policy for a given KeyRing + and prints out roles and the members assigned to those roles.""" + + # Creates an API client for the KMS API. + client = kms_v1.KeyManagementServiceClient() + + # The resource name of the CryptoKey. + resource = client.crypto_key_path_path(project_id, location_id, + key_ring_id, crypto_key_id) + + # Get the current IAM policy. + policy = client.get_iam_policy(resource) + + # Print results + print('Printing IAM policy for resource {}:'.format(resource)) + for b in policy.bindings: + for m in b.members: + print('Role: {} Member: {}'.format(b.role, m)) + return policy diff --git a/kms/api-client/snippets_test.py b/kms/api-client/snippets_test.py index b36d9644b9ef..9da03d9c5f2b 100644 --- a/kms/api-client/snippets_test.py +++ b/kms/api-client/snippets_test.py @@ -13,167 +13,208 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -import os -import random -import string +import time +from os import environ -import googleapiclient.discovery +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import kms_v1 +from google.cloud.kms_v1 import enums +from google.iam.v1.policy_pb2 import Policy + +import pytest import snippets -PROJECT = os.environ['GCLOUD_PROJECT'] - -# Your Google Cloud Platform Key Location -LOCATION = 'global' - -# Your Google Cloud Platform KeyRing name -KEY_RING = ''.join( - random.choice(string.ascii_lowercase + string.digits) for _ in range(12)) - -# Your Google Cloud Platform CryptoKey name -CRYPTO_KEY = ''.join( - random.choice(string.ascii_lowercase + string.digits) for _ in range(12)) - -# Your Google Cloud Platform CryptoKeyVersion name -VERSION = 1 - -# A member to add to our IAM policy -MEMBER = 'user:ryanmats@google.com' - -# The role we want our new member to have for our IAM policy -ROLE = 'roles/owner' - - -def test_create_key_ring(capsys): - snippets.create_key_ring(PROJECT, LOCATION, KEY_RING) - out, _ = capsys.readouterr() - expected = 'Created KeyRing projects/{}/locations/{}/keyRings/{}.'.format( - PROJECT, LOCATION, KEY_RING) - assert expected in out - - -def test_create_crypto_key(capsys): - snippets.create_crypto_key( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY) - out, _ = capsys.readouterr() - expected = ( - 'Created CryptoKey projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}.' - .format(PROJECT, LOCATION, KEY_RING, CRYPTO_KEY)) - assert expected in out - - -def test_encrypt_decrypt(capsys, tmpdir): - # Write to a plaintext file. - tmpdir.join('in.txt').write('SampleText') - - # Construct temporary files. - plaintext_file = tmpdir.join('in.txt') - encrypted_file = tmpdir.join('out.txt') - decrypted_file = tmpdir.join('out2.txt') - - # Encrypt text and then decrypt it. - snippets.encrypt( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, - str(plaintext_file), str(encrypted_file)) - snippets.decrypt( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, - str(encrypted_file), str(decrypted_file)) - - # Make sure the decrypted text matches the original text. - decrypted_text = decrypted_file.read() - assert decrypted_text == 'SampleText' - - # Make sure other output is as expected. - out, _ = capsys.readouterr() - assert 'Saved ciphertext to {}.'.format(str(encrypted_file)) in out - assert 'Saved plaintext to {}.'.format(str(decrypted_file)) in out - - -def test_disable_crypto_key_version(capsys): - snippets.disable_crypto_key_version( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION) - out, _ = capsys.readouterr() - expected = ( - 'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}\'s state has been set to {}.' - .format( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION, - 'DISABLED')) - assert expected in out - - -def test_enable_crypto_key_version(capsys): - snippets.enable_crypto_key_version( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION) - out, _ = capsys.readouterr() - expected = ( - 'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}\'s state has been set to {}.' - .format( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION, - 'ENABLED')) - assert expected in out - - -def test_destroy_crypto_key_version(capsys): - snippets.destroy_crypto_key_version( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION) - out, _ = capsys.readouterr() - expected = ( - 'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}\'s state has been set to {}.' - .format( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION, - 'DESTROY_SCHEDULED')) - assert expected in out - - -def test_restore_crypto_key_version(capsys): - snippets.restore_crypto_key_version( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION) - out, _ = capsys.readouterr() - expected = ( - 'CryptoKeyVersion projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}/' - 'cryptoKeyVersions/{}\'s state has been set to {}.' - .format( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, VERSION, - 'DISABLED')) - assert expected in out - - -def test_add_member_to_crypto_key_policy(capsys): - snippets.add_member_to_crypto_key_policy( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY, MEMBER, ROLE) - out, _ = capsys.readouterr() - expected = ( - 'Member {} added with role {} to policy for CryptoKey {} in KeyRing {}' - .format(MEMBER, ROLE, CRYPTO_KEY, KEY_RING)) - assert expected in out - - kms_client = googleapiclient.discovery.build('cloudkms', 'v1') - parent = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format( - PROJECT, LOCATION, KEY_RING, CRYPTO_KEY) - crypto_keys = kms_client.projects().locations().keyRings().cryptoKeys() - policy_request = crypto_keys.getIamPolicy(resource=parent) - policy_response = policy_request.execute() - assert 'bindings' in policy_response.keys() - bindings = policy_response['bindings'] - found_member_role_pair = False - for binding in bindings: - if binding['role'] == ROLE: - for user in binding['members']: - if user == MEMBER: - found_member_role_pair = True - assert found_member_role_pair - - -def test_get_key_ring_policy(capsys): - snippets.get_key_ring_policy(PROJECT, LOCATION, KEY_RING) - out, _ = capsys.readouterr() - expected_roles_exist = ( - 'Printing IAM policy for resource projects/{}/locations/{}/keyRings/{}' - ':'.format(PROJECT, LOCATION, KEY_RING)) - expected_no_roles = ( - 'No roles found for resource projects/{}/locations/{}/keyRings/{}.' - .format(PROJECT, LOCATION, KEY_RING)) - assert (expected_roles_exist in out) or (expected_no_roles in out) + +def create_key_helper(key_id, purpose, algorithm, t): + try: + client = kms_v1.KeyManagementServiceClient() + parent = client.key_ring_path(t.project_id, t.location, t.keyring_id) + + crypto_key = {'purpose': purpose, + 'version_template': {'algorithm': algorithm}} + client.create_crypto_key(parent, key_id, crypto_key) + return True + except GoogleAPICallError: + # key already exists + return False + + +def setup_module(module): + """ + Set up keys in project if needed + """ + t = TestKMSSnippets() + try: + # create keyring + snippets.create_key_ring(t.project_id, t.location, t.keyring_id) + except GoogleAPICallError: + # keyring already exists + pass + s = create_key_helper(t.symId, + enums.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT, + enums.CryptoKeyVersion.CryptoKeyVersionAlgorithm. + GOOGLE_SYMMETRIC_ENCRYPTION, + t) + if s: + # leave time for key to initialize + time.sleep(20) + + +class TestKMSSnippets: + project_id = environ['GCLOUD_PROJECT'] + keyring_id = 'kms-samples' + location = 'global' + parent = 'projects/{}/locations/{}'.format(project_id, location) + keyring_path = '{}/keyRings/{}'.format(parent, keyring_id) + version = '1' + + symId = 'symmetric' + + sym = '{}/cryptoKeys/{}'.format(keyring_path, symId) + sym_version = '{}/cryptoKeyVersions/{}'.format(sym, version) + + message = 'test message 123' + message_bytes = message.encode('utf-8') + + member = 'group:test@google.com' + role = 'roles/viewer' + + @pytest.mark.skip(reason="There's currently no method to delete keyrings, \ + so we should avoid creating resources") + def test_create_key_ring(self): + ring_id = self.keyring_id + '-testcreate' + str(int(time.time())) + snippets.create_key_ring(self.project_id, self.location, ring_id) + client = kms_v1.KeyManagementServiceClient() + result = client.get_key_ring(client.key_ring_path(self.project_id, + self.location, + ring_id)) + assert ring_id in result.name + + @pytest.mark.skip(reason="Deleting keys isn't instant, so we should avoid \ + creating a large number of them in our tests") + def test_create_crypto_key(self): + key_id = self.symId + '-test' + str(int(time.time())) + snippets.create_crypto_key(self.project_id, self.location, + self.keyring_id, key_id) + c = kms_v1.KeyManagementServiceClient() + result = c.get_crypto_key(c.crypto_key_path(self.project_id, + self.location, + self.keyring_id, + key_id)) + assert key_id in result.name + + # tests disable/enable/destroy/restore + def test_key_change_version_state(self): + client = kms_v1.KeyManagementServiceClient() + name = client.crypto_key_version_path(self.project_id, self.location, + self.keyring_id, self.symId, + self.version) + state_enum = enums.CryptoKeyVersion.CryptoKeyVersionState + # test disable + snippets.disable_crypto_key_version(self.project_id, self.location, + self.keyring_id, self.symId, + self.version) + response = client.get_crypto_key_version(name) + assert response.state == state_enum.DISABLED + # test destroy + snippets.destroy_crypto_key_version(self.project_id, self.location, + self.keyring_id, self.symId, + self.version) + response = client.get_crypto_key_version(name) + assert response.state == state_enum.DESTROY_SCHEDULED + # test restore + snippets.restore_crypto_key_version(self.project_id, self.location, + self.keyring_id, self.symId, + self.version) + response = client.get_crypto_key_version(name) + assert response.state == state_enum.DISABLED + # test re-enable + snippets.enable_crypto_key_version(self.project_id, self.location, + self.keyring_id, self.symId, + self.version) + response = client.get_crypto_key_version(name) + assert response.state == state_enum.ENABLED + + def test_get_ring_policy(self): + policy = snippets.get_key_ring_policy(self.project_id, + self.location, self.keyring_id) + assert type(policy) is Policy + + # tests get/add/remove policy members + def test_ring_policy(self): + # add member + snippets.add_member_to_key_ring_policy(self.project_id, self.location, + self.keyring_id, self.member, + self.role) + policy = snippets.get_key_ring_policy(self.project_id, + self.location, self.keyring_id) + found = False + for b in list(policy.bindings): + if b.role == self.role and self.member in b.members: + found = True + assert found + # remove member + snippets.remove_member_from_key_ring_policy(self.project_id, + self.location, + self.keyring_id, + self.member, + self.role) + policy = snippets.get_key_ring_policy(self.project_id, + self.location, self.keyring_id) + found = False + for b in list(policy.bindings): + if b.role == self.role and self.member in b.members: + found = True + assert not found + + # tests get/add/remove policy members + def test_key_policy(self): + # add member + snippets.add_member_to_crypto_key_policy(self.project_id, + self.location, + self.keyring_id, + self.symId, + self.member, + self.role) + policy = snippets.get_crypto_key_policy(self.project_id, + self.location, + self.keyring_id, + self.symId) + found = False + for b in list(policy.bindings): + if b.role == self.role and self.member in b.members: + found = True + assert found + # remove member + snippets.remove_member_from_crypto_key_policy(self.project_id, + self.location, + self.keyring_id, + self.symId, + self.member, + self.role) + policy = snippets.get_crypto_key_policy(self.project_id, + self.location, + self.keyring_id, + self.symId) + found = False + for b in list(policy.bindings): + if b.role == self.role and self.member in b.members: + found = True + assert not found + + def test_symmetric_encrypt_decrypt(self): + cipher_bytes = snippets.encrypt_symmetric(self.project_id, + self.location, + self.keyring_id, + self.symId, + self.message_bytes) + plain_bytes = snippets.decrypt_symmetric(self.project_id, + self.location, + self.keyring_id, + self.symId, + cipher_bytes) + assert plain_bytes == self.message_bytes + assert cipher_bytes != self.message_bytes + plaintext = plain_bytes.decode("utf-8") + assert plaintext == self.message