Skip to content

Commit

Permalink
PYTHON-1350 Store IV along with encrypted text when using column-leve…
Browse files Browse the repository at this point in the history
…l encryption (datastax#1160)
  • Loading branch information
absurdfarce authored and dkropachev committed Nov 18, 2024
1 parent 7e47772 commit a1c6bf7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 31 deletions.
45 changes: 29 additions & 16 deletions cassandra/column_encryption/_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,27 @@

class AES256ColumnEncryptionPolicy(ColumnEncryptionPolicy):

# CBC uses an IV that's the same size as the block size
#
# TODO: Need to find some way to expose mode options
# (CBC etc.) without leaking classes from the underlying
# impl here
def __init__(self, mode = modes.CBC, iv = os.urandom(AES256_BLOCK_SIZE_BYTES)):

self.mode = mode
# Fix block cipher mode for now. IV size is a function of block cipher used
# so fixing this avoids (possibly unnecessary) validation logic here.
mode = modes.CBC

# "iv" param here expects a bytearray that's the same size as the block
# size for AES-256 (128 bits or 16 bytes). If none is provided a new one
# will be randomly generated, but in this case the IV should be recorded and
# preserved or else you will not be able to decrypt any data encrypted by this
# policy.
def __init__(self, iv=None):

# CBC uses an IV that's the same size as the block size
#
# Avoid defining IV with a default arg in order to stay away from
# any issues around the caching of default args
self.iv = iv
if self.iv:
if not len(self.iv) == AES256_BLOCK_SIZE_BYTES:
raise ValueError("This policy uses AES-256 with CBC mode and therefore expects a 128-bit initialization vector")
else:
self.iv = os.urandom(AES256_BLOCK_SIZE_BYTES)

# ColData for a given ColDesc is always preserved. We only create a Cipher
# when there's an actual need to for a given ColDesc
Expand All @@ -64,11 +76,13 @@ def encrypt(self, coldesc, obj_bytes):

cipher = self._get_cipher(coldesc)
encryptor = cipher.encryptor()
return encryptor.update(padded_bytes) + encryptor.finalize()
return self.iv + encryptor.update(padded_bytes) + encryptor.finalize()

def decrypt(self, coldesc, encrypted_bytes):
def decrypt(self, coldesc, bytes):

cipher = self._get_cipher(coldesc)
iv = bytes[:AES256_BLOCK_SIZE_BYTES]
encrypted_bytes = bytes[AES256_BLOCK_SIZE_BYTES:]
cipher = self._get_cipher(coldesc, iv=iv)
decryptor = cipher.decryptor()
padded_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize()

Expand Down Expand Up @@ -108,19 +122,18 @@ def cache_info(self):
def column_type(self, coldesc):
return self.coldata[coldesc].type

def _get_cipher(self, coldesc):
def _get_cipher(self, coldesc, iv=None):
"""
Access relevant state from this instance necessary to create a Cipher and then get one,
hopefully returning a cached instance if we've already done so (and it hasn't been evicted)
"""

try:
coldata = self.coldata[coldesc]
return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, self.mode, self.iv)
return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, iv or self.iv)
except KeyError:
raise ValueError("Could not find column {}".format(coldesc))

# Explicitly use a class method here to avoid caching self
@lru_cache(maxsize=128)
def _build_cipher(key, mode, iv):
return Cipher(algorithms.AES256(key), mode(iv))
def _build_cipher(key, iv):
return Cipher(algorithms.AES256(key), AES256ColumnEncryptionPolicy.mode(iv))
71 changes: 56 additions & 15 deletions tests/integration/standard/column_encryption/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from cassandra.policies import ColDesc

from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, \
AES256_KEY_SIZE_BYTES
AES256_KEY_SIZE_BYTES, AES256_BLOCK_SIZE_BYTES

def setup_module():
use_singledc()
Expand All @@ -32,25 +32,28 @@ def _recreate_keyspace(self, session):
session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))")

def _create_policy(self, key, iv = None):
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, "int")
return (col_desc, cl_policy)

def test_end_to_end_prepared(self):

# We only currently perform testing on a single type/expected value pair since CLE functionality is essentially
# independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's
# encoded when coming back; the actual types of the data involved don't impact us.
expected = 12345
expected_type = "int"
expected = 0

key = os.urandom(AES256_KEY_SIZE_BYTES)
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, expected_type)

(_, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)

prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)")
session.execute(prepared, (expected,expected))
for i in range(100):
session.execute(prepared, (i, i))

# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
Expand All @@ -66,20 +69,19 @@ def test_end_to_end_prepared(self):

def test_end_to_end_simple(self):

expected = 67890
expected_type = "int"
expected = 1

key = os.urandom(AES256_KEY_SIZE_BYTES)
cl_policy = AES256ColumnEncryptionPolicy()
col_desc = ColDesc('foo','bar','encrypted')
cl_policy.add_column(col_desc, key, expected_type)

(col_desc, cl_policy) = self._create_policy(key)
cluster = TestCluster(column_encryption_policy=cl_policy)
session = cluster.connect()
self._recreate_keyspace(session)

# Use encode_and_encrypt helper function to populate date
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected))
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy.encode_and_encrypt(col_desc, i)
session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))

# A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted
# values here to confirm that we don't interfere with regular processing of unencrypted vals.
Expand All @@ -92,3 +94,42 @@ def test_end_to_end_simple(self):
(encrypted,unencrypted) = session.execute(prepared, [expected]).one()
self.assertEquals(expected, encrypted)
self.assertEquals(expected, unencrypted)

def test_end_to_end_different_cle_contexts(self):

expected = 2

key = os.urandom(AES256_KEY_SIZE_BYTES)

# Simulate the creation of two AES256 policies at two different times. Python caches
# default param args at function definition time so a single value will be used any time
# the default val is used. Upshot is that within the same test we'll always have the same
# IV if we rely on the default args, so manually introduce some variation here to simulate
# what actually happens if you have two distinct sessions created at two different times.
iv1 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(col_desc1, cl_policy1) = self._create_policy(key, iv=iv1)
cluster1 = TestCluster(column_encryption_policy=cl_policy1)
session1 = cluster1.connect()
self._recreate_keyspace(session1)

# Use encode_and_encrypt helper function to populate date
for i in range(1,100):
self.assertIsNotNone(i)
encrypted = cl_policy1.encode_and_encrypt(col_desc1, i)
session1.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i))
session1.shutdown()
cluster1.shutdown()

# Explicitly clear the class-level cache here; we're trying to simulate a second connection from a completely new process and
# that would entail not re-using any cached ciphers
AES256ColumnEncryptionPolicy._build_cipher.cache_clear()
cache_info = cl_policy1.cache_info()
self.assertEqual(cache_info.currsize, 0)

iv2 = os.urandom(AES256_BLOCK_SIZE_BYTES)
(_, cl_policy2) = self._create_policy(key, iv=iv2)
cluster2 = TestCluster(column_encryption_policy=cl_policy2)
session2 = cluster2.connect()
(encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one()
self.assertEquals(expected, encrypted)
self.assertEquals(expected, unencrypted)
20 changes: 20 additions & 0 deletions tests/unit/column_encryption/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ def test_add_column_invalid_key_size_raises(self):
with self.assertRaises(ValueError):
policy.add_column(coldesc, os.urandom(key_size), "blob")

def test_add_column_invalid_iv_size_raises(self):
def test_iv_size(iv_size):
policy = AES256ColumnEncryptionPolicy(iv = os.urandom(iv_size))
policy.add_column(coldesc, os.urandom(AES256_KEY_SIZE_BYTES), "blob")
policy.encrypt(coldesc, os.urandom(128))

coldesc = ColDesc('ks1','table1','col1')
for iv_size in range(1,AES256_BLOCK_SIZE_BYTES - 1):
with self.assertRaises(ValueError):
test_iv_size(iv_size)
for iv_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1):
with self.assertRaises(ValueError):
test_iv_size(iv_size)

# Finally, confirm that the expected IV size has no issue
test_iv_size(AES256_BLOCK_SIZE_BYTES)

def test_add_column_null_coldesc_raises(self):
with self.assertRaises(ValueError):
policy = AES256ColumnEncryptionPolicy()
Expand Down Expand Up @@ -125,6 +142,9 @@ def test_decrypt_unknown_column(self):
policy.decrypt(ColDesc('ks2','table2','col2'), encrypted_bytes)

def test_cache_info(self):
# Exclude any interference from tests above
AES256ColumnEncryptionPolicy._build_cipher.cache_clear()

coldesc1 = ColDesc('ks1','table1','col1')
coldesc2 = ColDesc('ks2','table2','col2')
coldesc3 = ColDesc('ks3','table3','col3')
Expand Down

0 comments on commit a1c6bf7

Please sign in to comment.