diff --git a/cassandra/column_encryption/_policies.py b/cassandra/column_encryption/_policies.py index e049ba2d22..ef8097bfbd 100644 --- a/cassandra/column_encryption/_policies.py +++ b/cassandra/column_encryption/_policies.py @@ -35,15 +35,27 @@ class AES256ColumnEncryptionPolicy(ColumnEncryptionPolicy): - # CBC uses an IV that's the same size as the block size - # - # TODO: Need to find some way to expose mode options - # (CBC etc.) without leaking classes from the underlying - # impl here - def __init__(self, mode = modes.CBC, iv = os.urandom(AES256_BLOCK_SIZE_BYTES)): - - self.mode = mode + # Fix block cipher mode for now. IV size is a function of block cipher used + # so fixing this avoids (possibly unnecessary) validation logic here. + mode = modes.CBC + + # "iv" param here expects a bytearray that's the same size as the block + # size for AES-256 (128 bits or 16 bytes). If none is provided a new one + # will be randomly generated, but in this case the IV should be recorded and + # preserved or else you will not be able to decrypt any data encrypted by this + # policy. + def __init__(self, iv=None): + + # CBC uses an IV that's the same size as the block size + # + # Avoid defining IV with a default arg in order to stay away from + # any issues around the caching of default args self.iv = iv + if self.iv: + if not len(self.iv) == AES256_BLOCK_SIZE_BYTES: + raise ValueError("This policy uses AES-256 with CBC mode and therefore expects a 128-bit initialization vector") + else: + self.iv = os.urandom(AES256_BLOCK_SIZE_BYTES) # ColData for a given ColDesc is always preserved. We only create a Cipher # when there's an actual need to for a given ColDesc @@ -64,11 +76,13 @@ def encrypt(self, coldesc, obj_bytes): cipher = self._get_cipher(coldesc) encryptor = cipher.encryptor() - return encryptor.update(padded_bytes) + encryptor.finalize() + return self.iv + encryptor.update(padded_bytes) + encryptor.finalize() - def decrypt(self, coldesc, encrypted_bytes): + def decrypt(self, coldesc, bytes): - cipher = self._get_cipher(coldesc) + iv = bytes[:AES256_BLOCK_SIZE_BYTES] + encrypted_bytes = bytes[AES256_BLOCK_SIZE_BYTES:] + cipher = self._get_cipher(coldesc, iv=iv) decryptor = cipher.decryptor() padded_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize() @@ -108,19 +122,18 @@ def cache_info(self): def column_type(self, coldesc): return self.coldata[coldesc].type - def _get_cipher(self, coldesc): + def _get_cipher(self, coldesc, iv=None): """ Access relevant state from this instance necessary to create a Cipher and then get one, hopefully returning a cached instance if we've already done so (and it hasn't been evicted) """ - try: coldata = self.coldata[coldesc] - return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, self.mode, self.iv) + return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, iv or self.iv) except KeyError: raise ValueError("Could not find column {}".format(coldesc)) # Explicitly use a class method here to avoid caching self @lru_cache(maxsize=128) - def _build_cipher(key, mode, iv): - return Cipher(algorithms.AES256(key), mode(iv)) + def _build_cipher(key, iv): + return Cipher(algorithms.AES256(key), AES256ColumnEncryptionPolicy.mode(iv)) diff --git a/tests/integration/standard/column_encryption/test_policies.py b/tests/integration/standard/column_encryption/test_policies.py index 87bfde3c31..bb84c0352c 100644 --- a/tests/integration/standard/column_encryption/test_policies.py +++ b/tests/integration/standard/column_encryption/test_policies.py @@ -20,7 +20,7 @@ from cassandra.policies import ColDesc from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, \ - AES256_KEY_SIZE_BYTES + AES256_KEY_SIZE_BYTES, AES256_BLOCK_SIZE_BYTES def setup_module(): use_singledc() @@ -32,25 +32,28 @@ def _recreate_keyspace(self, session): session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))") + def _create_policy(self, key, iv = None): + cl_policy = AES256ColumnEncryptionPolicy() + col_desc = ColDesc('foo','bar','encrypted') + cl_policy.add_column(col_desc, key, "int") + return (col_desc, cl_policy) + def test_end_to_end_prepared(self): # We only currently perform testing on a single type/expected value pair since CLE functionality is essentially # independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's # encoded when coming back; the actual types of the data involved don't impact us. - expected = 12345 - expected_type = "int" + expected = 0 key = os.urandom(AES256_KEY_SIZE_BYTES) - cl_policy = AES256ColumnEncryptionPolicy() - col_desc = ColDesc('foo','bar','encrypted') - cl_policy.add_column(col_desc, key, expected_type) - + (_, cl_policy) = self._create_policy(key) cluster = TestCluster(column_encryption_policy=cl_policy) session = cluster.connect() self._recreate_keyspace(session) prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)") - session.execute(prepared, (expected,expected)) + for i in range(100): + session.execute(prepared, (i, i)) # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted # values here to confirm that we don't interfere with regular processing of unencrypted vals. @@ -66,20 +69,19 @@ def test_end_to_end_prepared(self): def test_end_to_end_simple(self): - expected = 67890 - expected_type = "int" + expected = 1 key = os.urandom(AES256_KEY_SIZE_BYTES) - cl_policy = AES256ColumnEncryptionPolicy() - col_desc = ColDesc('foo','bar','encrypted') - cl_policy.add_column(col_desc, key, expected_type) - + (col_desc, cl_policy) = self._create_policy(key) cluster = TestCluster(column_encryption_policy=cl_policy) session = cluster.connect() self._recreate_keyspace(session) # Use encode_and_encrypt helper function to populate date - session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected)) + for i in range(1,100): + self.assertIsNotNone(i) + encrypted = cl_policy.encode_and_encrypt(col_desc, i) + session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i)) # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted # values here to confirm that we don't interfere with regular processing of unencrypted vals. @@ -92,3 +94,42 @@ def test_end_to_end_simple(self): (encrypted,unencrypted) = session.execute(prepared, [expected]).one() self.assertEquals(expected, encrypted) self.assertEquals(expected, unencrypted) + + def test_end_to_end_different_cle_contexts(self): + + expected = 2 + + key = os.urandom(AES256_KEY_SIZE_BYTES) + + # Simulate the creation of two AES256 policies at two different times. Python caches + # default param args at function definition time so a single value will be used any time + # the default val is used. Upshot is that within the same test we'll always have the same + # IV if we rely on the default args, so manually introduce some variation here to simulate + # what actually happens if you have two distinct sessions created at two different times. + iv1 = os.urandom(AES256_BLOCK_SIZE_BYTES) + (col_desc1, cl_policy1) = self._create_policy(key, iv=iv1) + cluster1 = TestCluster(column_encryption_policy=cl_policy1) + session1 = cluster1.connect() + self._recreate_keyspace(session1) + + # Use encode_and_encrypt helper function to populate date + for i in range(1,100): + self.assertIsNotNone(i) + encrypted = cl_policy1.encode_and_encrypt(col_desc1, i) + session1.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i)) + session1.shutdown() + cluster1.shutdown() + + # Explicitly clear the class-level cache here; we're trying to simulate a second connection from a completely new process and + # that would entail not re-using any cached ciphers + AES256ColumnEncryptionPolicy._build_cipher.cache_clear() + cache_info = cl_policy1.cache_info() + self.assertEqual(cache_info.currsize, 0) + + iv2 = os.urandom(AES256_BLOCK_SIZE_BYTES) + (_, cl_policy2) = self._create_policy(key, iv=iv2) + cluster2 = TestCluster(column_encryption_policy=cl_policy2) + session2 = cluster2.connect() + (encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) diff --git a/tests/unit/column_encryption/test_policies.py b/tests/unit/column_encryption/test_policies.py index f6b06a3ade..38136c69d4 100644 --- a/tests/unit/column_encryption/test_policies.py +++ b/tests/unit/column_encryption/test_policies.py @@ -55,6 +55,23 @@ def test_add_column_invalid_key_size_raises(self): with self.assertRaises(ValueError): policy.add_column(coldesc, os.urandom(key_size), "blob") + def test_add_column_invalid_iv_size_raises(self): + def test_iv_size(iv_size): + policy = AES256ColumnEncryptionPolicy(iv = os.urandom(iv_size)) + policy.add_column(coldesc, os.urandom(AES256_KEY_SIZE_BYTES), "blob") + policy.encrypt(coldesc, os.urandom(128)) + + coldesc = ColDesc('ks1','table1','col1') + for iv_size in range(1,AES256_BLOCK_SIZE_BYTES - 1): + with self.assertRaises(ValueError): + test_iv_size(iv_size) + for iv_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1): + with self.assertRaises(ValueError): + test_iv_size(iv_size) + + # Finally, confirm that the expected IV size has no issue + test_iv_size(AES256_BLOCK_SIZE_BYTES) + def test_add_column_null_coldesc_raises(self): with self.assertRaises(ValueError): policy = AES256ColumnEncryptionPolicy() @@ -125,6 +142,9 @@ def test_decrypt_unknown_column(self): policy.decrypt(ColDesc('ks2','table2','col2'), encrypted_bytes) def test_cache_info(self): + # Exclude any interference from tests above + AES256ColumnEncryptionPolicy._build_cipher.cache_clear() + coldesc1 = ColDesc('ks1','table1','col1') coldesc2 = ColDesc('ks2','table2','col2') coldesc3 = ColDesc('ks3','table3','col3')