diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5599e37dcb..c8a4fd3e90 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,25 @@ +3.28.0 +====== +June 5, 2023 + +Features +-------- +* Add support for vector type (PYTHON-1352) +* Cryptography module is now an optional dependency (PYTHON-1351) + +Bug Fixes +--------- +* Store IV along with encrypted text when using column-level encryption (PYTHON-1350) +* Create session-specific protocol handlers to contain session-specific CLE policies (PYTHON-1356) + +Others +------ +* Use Cython for smoke builds (PYTHON-1343) +* Don't fail when inserting UDTs with prepared queries with some missing fields (PR 1151) +* Convert print statement to function in docs (PR 1157) +* Update comment for retry policy (DOC-3278) +* Added error handling blog reference (DOC-2813) + 3.27.0 ====== May 1, 2023 diff --git a/Jenkinsfile b/Jenkinsfile index 41a10f4098..4c3f6629ad 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -13,9 +13,10 @@ Test Profiles: Matrix Types: Full: All server versions, python runtimes tested with and without Cython. - Develop: Smaller matrix for dev purpose. Cassandra: All cassandra server versions. Dse: All dse server versions. + Smoke: CI-friendly configurations. Currently-supported Python version + modern Cassandra/DSE instances. + We also avoid cython since it's tested as part of the nightlies Parameters: @@ -29,35 +30,30 @@ import com.datastax.jenkins.drivers.python.Slack slack = new Slack() -// Define our predefined matrices -// -// Smoke tests are CI-friendly test configuration. Currently-supported Python version + modern C*/DSE instances. -// We also avoid cython since it's tested as part of the nightlies. +DEFAULT_CASSANDRA = ['2.1', '2.2', '3.0', '3.11', '4.0'] +DEFAULT_DSE = ['dse-5.0.15', 'dse-5.1.35', 'dse-6.0.18', 'dse-6.7.17', 'dse-6.8.30'] +DEFAULT_RUNTIME = ['3.7.7', '3.8.3'] +DEFAULT_CYTHON = ["True", "False"] matrices = [ "FULL": [ - "SERVER": ['2.1', '2.2', '3.0', '3.11', '4.0', 'dse-5.0.15', 'dse-5.1.35', 'dse-6.0.18', 'dse-6.7.17', 'dse-6.8.30'], - "RUNTIME": ['2.7.18', '3.5.9', '3.6.10', '3.7.7', '3.8.3'], - "CYTHON": ["True", "False"] - ], - "DEVELOP": [ - "SERVER": ['2.1', '3.11', 'dse-6.8.30'], - "RUNTIME": ['2.7.18', '3.6.10'], - "CYTHON": ["True", "False"] + "SERVER": DEFAULT_CASSANDRA + DEFAULT_DSE, + "RUNTIME": DEFAULT_RUNTIME, + "CYTHON": DEFAULT_CYTHON ], "CASSANDRA": [ - "SERVER": ['2.1', '2.2', '3.0', '3.11', '4.0'], - "RUNTIME": ['2.7.18', '3.5.9', '3.6.10', '3.7.7', '3.8.3'], - "CYTHON": ["True", "False"] + "SERVER": DEFAULT_CASSANDRA, + "RUNTIME": DEFAULT_RUNTIME, + "CYTHON": DEFAULT_CYTHON ], "DSE": [ - "SERVER": ['dse-5.0.15', 'dse-5.1.35', 'dse-6.0.18', 'dse-6.7.17', 'dse-6.8.30'], - "RUNTIME": ['2.7.18', '3.5.9', '3.6.10', '3.7.7', '3.8.3'], - "CYTHON": ["True", "False"] + "SERVER": DEFAULT_DSE, + "RUNTIME": DEFAULT_RUNTIME, + "CYTHON": DEFAULT_CYTHON ], "SMOKE": [ - "SERVER": ['3.11', '4.0', 'dse-6.8.30'], - "RUNTIME": ['3.7.7', '3.8.3'], - "CYTHON": ["True", "False"] + "SERVER": DEFAULT_CASSANDRA.takeRight(2) + DEFAULT_DSE.takeRight(1), + "RUNTIME": DEFAULT_RUNTIME.takeRight(2), + "CYTHON": ["True"] ] ] @@ -84,24 +80,13 @@ def getBuildContext() { Based on schedule and parameters, configure the build context and env vars. */ - def profile = "${params.PROFILE}" + def PROFILE = "${params.PROFILE}" def EVENT_LOOP = "${params.EVENT_LOOP.toLowerCase()}" - matrixType = "SMOKE" - developBranchPattern = ~"((dev|long)-)?python-.*" - if (developBranchPattern.matcher(env.BRANCH_NAME).matches()) { - matrixType = "DEVELOP" - if (env.BRANCH_NAME.contains("long")) { - profile = "FULL" - } - } + matrixType = params.MATRIX != "DEFAULT" ? params.MATRIX : "SMOKE" + matrix = matrices[matrixType].clone() // Check if parameters were set explicitly - if (params.MATRIX != "DEFAULT") { - matrixType = params.MATRIX - } - - matrix = matrices[matrixType].clone() if (params.CYTHON != "DEFAULT") { matrix["CYTHON"] = [params.CYTHON] } @@ -121,7 +106,7 @@ def getBuildContext() { context = [ vars: [ - "PROFILE=${profile}", + "PROFILE=${PROFILE}", "EVENT_LOOP=${EVENT_LOOP}" ], matrix: matrix @@ -401,8 +386,9 @@ def describeBuild(buildContext) { } } -def scheduleTriggerJobName() { - "drivers/python/oss/master/disabled" +// branch pattern for cron +def branchPatternCron() { + ~"(master)" } pipeline { @@ -467,7 +453,7 @@ pipeline { ''') choice( name: 'MATRIX', - choices: ['DEFAULT', 'SMOKE', 'FULL', 'DEVELOP', 'CASSANDRA', 'DSE'], + choices: ['DEFAULT', 'SMOKE', 'FULL', 'CASSANDRA', 'DSE'], description: '''

The matrix for the build.

@@ -488,10 +474,6 @@ pipeline { - - - - @@ -503,22 +485,11 @@ pipeline {
FULL All server versions, python runtimes tested with and without Cython.
DEVELOPSmaller matrix for dev purpose.
CASSANDRA All cassandra server versions.
''') choice( name: 'PYTHON_VERSION', - choices: ['DEFAULT', '2.7.18', '3.5.9', '3.6.10', '3.7.7', '3.8.3'], + choices: ['DEFAULT'] + DEFAULT_RUNTIME, description: 'Python runtime version. Default to the build context.') choice( name: 'SERVER_VERSION', - choices: ['DEFAULT', - '2.1', // Legacy Apache CassandraⓇ - '2.2', // Legacy Apache CassandraⓇ - '3.0', // Previous Apache CassandraⓇ - '3.11', // Current Apache CassandraⓇ - '4.0', // Development Apache CassandraⓇ - 'dse-5.0.15', // Long Term Support DataStax Enterprise - 'dse-5.1.35', // Legacy DataStax Enterprise - 'dse-6.0.18', // Previous DataStax Enterprise - 'dse-6.7.17', // Previous DataStax Enterprise - 'dse-6.8.30', // Current DataStax Enterprise - ], + choices: ['DEFAULT'] + DEFAULT_CASSANDRA + DEFAULT_DSE, description: '''Apache CassandraⓇ and DataStax Enterprise server version to use for adhoc BUILD-AND-EXECUTE-TESTS ONLY! @@ -549,7 +520,7 @@ pipeline { - + @@ -574,7 +545,7 @@ pipeline {
4.0Apache CassandraⓇ v4.x (CURRENTLY UNDER DEVELOPMENT)Apache CassandraⓇ v4.0.x
dse-5.0.15
''') choice( name: 'CYTHON', - choices: ['DEFAULT', 'True', 'False'], + choices: ['DEFAULT'] + DEFAULT_CYTHON, description: '''

Flag to determine if Cython should be enabled

@@ -647,10 +618,10 @@ pipeline { } triggers { - parameterizedCron((scheduleTriggerJobName() == env.JOB_NAME) ? """ + parameterizedCron(branchPatternCron().matcher(env.BRANCH_NAME).matches() ? """ # Every weeknight (Monday - Friday) around 4:00 AM - # These schedules will run with and without Cython enabled for Python v2.7.18 and v3.5.9 - H 4 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;EVENT_LOOP=LIBEV;CI_SCHEDULE_PYTHON_VERSION=2.7.18 3.5.9;CI_SCHEDULE_SERVER_VERSION=2.2 3.11 dse-5.1.35 dse-6.0.18 dse-6.7.17 + # These schedules will run with and without Cython enabled for Python 3.7.7 and 3.8.3 + H 4 * * 1-5 %CI_SCHEDULE=WEEKNIGHTS;EVENT_LOOP=LIBEV;CI_SCHEDULE_PYTHON_VERSION=3.7.7 3.8.3;CI_SCHEDULE_SERVER_VERSION=2.2 3.11 dse-5.1.35 dse-6.0.18 dse-6.7.17 """ : "") } diff --git a/README.rst b/README.rst index 8e33b285e5..cfb68ffade 100644 --- a/README.rst +++ b/README.rst @@ -66,6 +66,10 @@ Contributing ------------ See `CONTRIBUTING `_. +Error Handling +------------ +While originally written for the Java driver, users may reference the `Cassandra error handling done right blog `_ for resolving error handling scenarios with Apache Cassandra. + Reporting Problems ------------------ Please report any bugs and make any feature requests by clicking the New Issue button in diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 301bcbf1c1..89fee0a9ef 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -23,7 +23,7 @@ def emit(self, record): logging.getLogger('cassandra').addHandler(NullHandler()) -__version_info__ = (3, 27, 0) +__version_info__ = (3, 28, 0) __version__ = '.'.join(map(str, __version_info__)) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 229f1d0145..404d486e2c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1017,7 +1017,7 @@ def default_retry_policy(self, policy): cloud = None """ A dict of the cloud configuration. Example:: - + { # path to the secure connect bundle 'secure_connect_bundle': '/path/to/secure-connect-dbname.zip', @@ -1531,7 +1531,7 @@ def __init__(self, street, zipcode): # results will include Address instances results = session.execute("SELECT * FROM users") row = results[0] - print row.id, row.location.street, row.location.zipcode + print(row.id, row.location.street, row.location.zipcode) """ if self.protocol_version < 3: @@ -2656,12 +2656,6 @@ def __init__(self, cluster, hosts, keyspace=None): self.encoder = Encoder() - if self.cluster.column_encryption_policy is not None: - try: - self.client_protocol_handler.column_encryption_policy = self.cluster.column_encryption_policy - except AttributeError: - log.info("Unable to set column encryption policy for session") - # create connection pools in parallel self._initial_connect_futures = set() for host in hosts: @@ -2682,6 +2676,15 @@ def __init__(self, cluster, hosts, keyspace=None): self.session_id = uuid.uuid4() self._graph_paging_available = self._check_graph_paging_available() + if self.cluster.column_encryption_policy is not None: + try: + self.client_protocol_handler = type( + str(self.session_id) + "-ProtocolHandler", + (ProtocolHandler,), + {"column_encryption_policy": self.cluster.column_encryption_policy}) + except AttributeError: + log.info("Unable to set column encryption policy for session") + if self.cluster.monitor_reporting_enabled: cc_host = self.cluster.get_control_connection_host() valid_insights_version = (cc_host and version_supports_insights(cc_host.dse_version)) diff --git a/cassandra/column_encryption/_policies.py b/cassandra/column_encryption/_policies.py new file mode 100644 index 0000000000..ef8097bfbd --- /dev/null +++ b/cassandra/column_encryption/_policies.py @@ -0,0 +1,139 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple +from functools import lru_cache + +import logging +import os + +log = logging.getLogger(__name__) + +from cassandra.cqltypes import _cqltypes +from cassandra.policies import ColumnEncryptionPolicy + +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +AES256_BLOCK_SIZE = 128 +AES256_BLOCK_SIZE_BYTES = int(AES256_BLOCK_SIZE / 8) +AES256_KEY_SIZE = 256 +AES256_KEY_SIZE_BYTES = int(AES256_KEY_SIZE / 8) + +ColData = namedtuple('ColData', ['key','type']) + +class AES256ColumnEncryptionPolicy(ColumnEncryptionPolicy): + + # Fix block cipher mode for now. IV size is a function of block cipher used + # so fixing this avoids (possibly unnecessary) validation logic here. + mode = modes.CBC + + # "iv" param here expects a bytearray that's the same size as the block + # size for AES-256 (128 bits or 16 bytes). If none is provided a new one + # will be randomly generated, but in this case the IV should be recorded and + # preserved or else you will not be able to decrypt any data encrypted by this + # policy. + def __init__(self, iv=None): + + # CBC uses an IV that's the same size as the block size + # + # Avoid defining IV with a default arg in order to stay away from + # any issues around the caching of default args + self.iv = iv + if self.iv: + if not len(self.iv) == AES256_BLOCK_SIZE_BYTES: + raise ValueError("This policy uses AES-256 with CBC mode and therefore expects a 128-bit initialization vector") + else: + self.iv = os.urandom(AES256_BLOCK_SIZE_BYTES) + + # ColData for a given ColDesc is always preserved. We only create a Cipher + # when there's an actual need to for a given ColDesc + self.coldata = {} + self.ciphers = {} + + def encrypt(self, coldesc, obj_bytes): + + # AES256 has a 128-bit block size so if the input bytes don't align perfectly on + # those blocks we have to pad them. There's plenty of room for optimization here: + # + # * Instances of the PKCS7 padder should be managed in a bounded pool + # * It would be nice if we could get a flag from encrypted data to indicate + # whether it was padded or not + # * Might be able to make this happen with a leading block of flags in encrypted data + padder = padding.PKCS7(AES256_BLOCK_SIZE).padder() + padded_bytes = padder.update(obj_bytes) + padder.finalize() + + cipher = self._get_cipher(coldesc) + encryptor = cipher.encryptor() + return self.iv + encryptor.update(padded_bytes) + encryptor.finalize() + + def decrypt(self, coldesc, bytes): + + iv = bytes[:AES256_BLOCK_SIZE_BYTES] + encrypted_bytes = bytes[AES256_BLOCK_SIZE_BYTES:] + cipher = self._get_cipher(coldesc, iv=iv) + decryptor = cipher.decryptor() + padded_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize() + + unpadder = padding.PKCS7(AES256_BLOCK_SIZE).unpadder() + return unpadder.update(padded_bytes) + unpadder.finalize() + + def add_column(self, coldesc, key, type): + + if not coldesc: + raise ValueError("ColDesc supplied to add_column cannot be None") + if not key: + raise ValueError("Key supplied to add_column cannot be None") + if not type: + raise ValueError("Type supplied to add_column cannot be None") + if type not in _cqltypes.keys(): + raise ValueError("Type %s is not a supported type".format(type)) + if not len(key) == AES256_KEY_SIZE_BYTES: + raise ValueError("AES256 column encryption policy expects a 256-bit encryption key") + self.coldata[coldesc] = ColData(key, _cqltypes[type]) + + def contains_column(self, coldesc): + return coldesc in self.coldata + + def encode_and_encrypt(self, coldesc, obj): + if not coldesc: + raise ValueError("ColDesc supplied to encode_and_encrypt cannot be None") + if not obj: + raise ValueError("Object supplied to encode_and_encrypt cannot be None") + coldata = self.coldata.get(coldesc) + if not coldata: + raise ValueError("Could not find ColData for ColDesc %s".format(coldesc)) + return self.encrypt(coldesc, coldata.type.serialize(obj, None)) + + def cache_info(self): + return AES256ColumnEncryptionPolicy._build_cipher.cache_info() + + def column_type(self, coldesc): + return self.coldata[coldesc].type + + def _get_cipher(self, coldesc, iv=None): + """ + Access relevant state from this instance necessary to create a Cipher and then get one, + hopefully returning a cached instance if we've already done so (and it hasn't been evicted) + """ + try: + coldata = self.coldata[coldesc] + return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, iv or self.iv) + except KeyError: + raise ValueError("Could not find column {}".format(coldesc)) + + # Explicitly use a class method here to avoid caching self + @lru_cache(maxsize=128) + def _build_cipher(key, iv): + return Cipher(algorithms.AES256(key), AES256ColumnEncryptionPolicy.mode(iv)) diff --git a/cassandra/column_encryption/policies.py b/cassandra/column_encryption/policies.py new file mode 100644 index 0000000000..770084bd48 --- /dev/null +++ b/cassandra/column_encryption/policies.py @@ -0,0 +1,20 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +try: + import cryptography + from cassandra.column_encryption._policies import * +except ImportError: + # Cryptography is not installed + pass diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index 11f664ec02..1978d319f4 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -286,15 +286,15 @@ class ContextQuery(object): with ContextQuery(Automobile, keyspace='test2') as A: A.objects.create(manufacturer='honda', year=2008, model='civic') - print len(A.objects.all()) # 1 result + print(len(A.objects.all())) # 1 result with ContextQuery(Automobile, keyspace='test4') as A: - print len(A.objects.all()) # 0 result + print(len(A.objects.all())) # 0 result # Multiple models with ContextQuery(Automobile, Automobile2, connection='cluster2') as (A, A2): - print len(A.objects.all()) - print len(A2.objects.all()) + print(len(A.objects.all())) + print(len(A2.objects.all())) """ @@ -809,11 +809,11 @@ class Comment(Model): print("Normal") for comment in Comment.objects(photo_id=u): - print comment.comment_id + print(comment.comment_id) print("Reversed") for comment in Comment.objects(photo_id=u).order_by("-comment_id"): - print comment.comment_id + print(comment.comment_id) """ if len(colnames) == 0: clone = copy.deepcopy(self) diff --git a/cassandra/cqltypes.py b/cassandra/cqltypes.py index c2c0d9f905..3c5cfe2d17 100644 --- a/cassandra/cqltypes.py +++ b/cassandra/cqltypes.py @@ -235,13 +235,15 @@ def parse_casstype_args(typestring): else: names.append(None) - ctype = lookup_casstype_simple(tok) + try: + ctype = int(tok) + except ValueError: + ctype = lookup_casstype_simple(tok) types.append(ctype) # return the first (outer) type, which will have all parameters applied return args[0][0][0] - def lookup_casstype(casstype): """ Given a Cassandra type as a string (possibly including parameters), hand @@ -259,6 +261,7 @@ def lookup_casstype(casstype): try: return parse_casstype_args(casstype) except (ValueError, AssertionError, IndexError) as e: + log.debug("Exception in parse_casstype_args: %s" % e) raise ValueError("Don't know how to parse type string %r: %s" % (casstype, e)) @@ -296,7 +299,7 @@ class _CassandraType(object): """ def __repr__(self): - return '<%s( %r )>' % (self.cql_parameterized_type(), self.val) + return '<%s>' % (self.cql_parameterized_type()) @classmethod def from_binary(cls, byts, protocol_version): @@ -1432,3 +1435,31 @@ def serialize(cls, v, protocol_version): buf.write(int8_pack(cls._encode_precision(bound.precision))) return buf.getvalue() + +class VectorType(_CassandraType): + typename = 'org.apache.cassandra.db.marshal.VectorType' + vector_size = 0 + subtype = None + + @classmethod + def apply_parameters(cls, params, names): + assert len(params) == 2 + subtype = lookup_casstype(params[0]) + vsize = params[1] + return type('%s(%s)' % (cls.cass_parameterized_type_with([]), vsize), (cls,), {'vector_size': vsize, 'subtype': subtype}) + + @classmethod + def deserialize(cls, byts, protocol_version): + indexes = (4 * x for x in range(0, cls.vector_size)) + return [cls.subtype.deserialize(byts[idx:idx + 4], protocol_version) for idx in indexes] + + @classmethod + def serialize(cls, v, protocol_version): + buf = io.BytesIO() + for item in v: + buf.write(cls.subtype.serialize(item, protocol_version)) + return buf.getvalue() + + @classmethod + def cql_parameterized_type(cls): + return "%s<%s, %s>" % (cls.typename, cls.subtype.typename, cls.vector_size) diff --git a/cassandra/datastax/graph/fluent/__init__.py b/cassandra/datastax/graph/fluent/__init__.py index 44a0d136e0..92f148721e 100644 --- a/cassandra/datastax/graph/fluent/__init__.py +++ b/cassandra/datastax/graph/fluent/__init__.py @@ -257,7 +257,7 @@ def traversal_source(session=None, graph_name=None, execution_profile=EXEC_PROFI session = c.connect() g = DseGraph.traversal_source(session, 'my_graph') - print g.V().valueMap().toList() + print(g.V().valueMap().toList()) """ diff --git a/cassandra/policies.py b/cassandra/policies.py index 7c26d804c6..0e9b915fde 100644 --- a/cassandra/policies.py +++ b/cassandra/policies.py @@ -13,21 +13,17 @@ # limitations under the License. import random from collections import namedtuple -from functools import lru_cache from itertools import islice, cycle, groupby, repeat import logging -import os from random import randint, shuffle from threading import Lock import socket import warnings -from cryptography.hazmat.primitives import padding -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +log = logging.getLogger(__name__) from cassandra import WriteType as WT from cassandra.connection import UnixSocketEndPoint -from cassandra.cqltypes import _cqltypes # This is done this way because WriteType was originally @@ -35,11 +31,8 @@ # It may removed in the next mayor. WriteType = WT - from cassandra import ConsistencyLevel, OperationTimedOut -log = logging.getLogger(__name__) - class HostDistance(object): """ @@ -893,7 +886,7 @@ def on_request_error(self, query, consistency, error, retry_num): `retry_num` counts how many times the operation has been retried, so the first time this method is called, `retry_num` will be 0. - The default, it triggers a retry on the next host in the query plan + By default, it triggers a retry on the next host in the query plan with the same consistency level. """ # TODO revisit this for the next major @@ -1263,7 +1256,6 @@ def _rethrow(self, *args, **kwargs): ColDesc = namedtuple('ColDesc', ['ks', 'table', 'col']) -ColData = namedtuple('ColData', ['key','type']) class ColumnEncryptionPolicy(object): """ @@ -1320,100 +1312,3 @@ def encode_and_encrypt(self, coldesc, obj): statements. """ raise NotImplementedError() - -AES256_BLOCK_SIZE = 128 -AES256_BLOCK_SIZE_BYTES = int(AES256_BLOCK_SIZE / 8) -AES256_KEY_SIZE = 256 -AES256_KEY_SIZE_BYTES = int(AES256_KEY_SIZE / 8) - -class AES256ColumnEncryptionPolicy(ColumnEncryptionPolicy): - - # CBC uses an IV that's the same size as the block size - # - # TODO: Need to find some way to expose mode options - # (CBC etc.) without leaking classes from the underlying - # impl here - def __init__(self, mode = modes.CBC, iv = os.urandom(AES256_BLOCK_SIZE_BYTES)): - - self.mode = mode - self.iv = iv - - # ColData for a given ColDesc is always preserved. We only create a Cipher - # when there's an actual need to for a given ColDesc - self.coldata = {} - self.ciphers = {} - - def encrypt(self, coldesc, obj_bytes): - - # AES256 has a 128-bit block size so if the input bytes don't align perfectly on - # those blocks we have to pad them. There's plenty of room for optimization here: - # - # * Instances of the PKCS7 padder should be managed in a bounded pool - # * It would be nice if we could get a flag from encrypted data to indicate - # whether it was padded or not - # * Might be able to make this happen with a leading block of flags in encrypted data - padder = padding.PKCS7(AES256_BLOCK_SIZE).padder() - padded_bytes = padder.update(obj_bytes) + padder.finalize() - - cipher = self._get_cipher(coldesc) - encryptor = cipher.encryptor() - return encryptor.update(padded_bytes) + encryptor.finalize() - - def decrypt(self, coldesc, encrypted_bytes): - - cipher = self._get_cipher(coldesc) - decryptor = cipher.decryptor() - padded_bytes = decryptor.update(encrypted_bytes) + decryptor.finalize() - - unpadder = padding.PKCS7(AES256_BLOCK_SIZE).unpadder() - return unpadder.update(padded_bytes) + unpadder.finalize() - - def add_column(self, coldesc, key, type): - - if not coldesc: - raise ValueError("ColDesc supplied to add_column cannot be None") - if not key: - raise ValueError("Key supplied to add_column cannot be None") - if not type: - raise ValueError("Type supplied to add_column cannot be None") - if type not in _cqltypes.keys(): - raise ValueError("Type %s is not a supported type".format(type)) - if not len(key) == AES256_KEY_SIZE_BYTES: - raise ValueError("AES256 column encryption policy expects a 256-bit encryption key") - self.coldata[coldesc] = ColData(key, _cqltypes[type]) - - def contains_column(self, coldesc): - return coldesc in self.coldata - - def encode_and_encrypt(self, coldesc, obj): - if not coldesc: - raise ValueError("ColDesc supplied to encode_and_encrypt cannot be None") - if not obj: - raise ValueError("Object supplied to encode_and_encrypt cannot be None") - coldata = self.coldata.get(coldesc) - if not coldata: - raise ValueError("Could not find ColData for ColDesc %s".format(coldesc)) - return self.encrypt(coldesc, coldata.type.serialize(obj, None)) - - def cache_info(self): - return AES256ColumnEncryptionPolicy._build_cipher.cache_info() - - def column_type(self, coldesc): - return self.coldata[coldesc].type - - def _get_cipher(self, coldesc): - """ - Access relevant state from this instance necessary to create a Cipher and then get one, - hopefully returning a cached instance if we've already done so (and it hasn't been evicted) - """ - - try: - coldata = self.coldata[coldesc] - return AES256ColumnEncryptionPolicy._build_cipher(coldata.key, self.mode, self.iv) - except KeyError: - raise ValueError("Could not find column {}".format(coldesc)) - - # Explicitly use a class method here to avoid caching self - @lru_cache(maxsize=128) - def _build_cipher(key, mode, iv): - return Cipher(algorithms.AES256(key), mode(iv)) diff --git a/cassandra/query.py b/cassandra/query.py index d34708a998..85f9175e14 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -77,7 +77,7 @@ def tuple_factory(colnames, rows): >>> session = cluster.connect('mykeyspace') >>> session.row_factory = tuple_factory >>> rows = session.execute("SELECT name, age FROM users LIMIT 1") - >>> print rows[0] + >>> print(rows[0]) ('Bob', 42) .. versionchanged:: 2.0.0 @@ -133,16 +133,16 @@ def named_tuple_factory(colnames, rows): >>> user = rows[0] >>> # you can access field by their name: - >>> print "name: %s, age: %d" % (user.name, user.age) + >>> print("name: %s, age: %d" % (user.name, user.age)) name: Bob, age: 42 >>> # or you can access fields by their position (like a tuple) >>> name, age = user - >>> print "name: %s, age: %d" % (name, age) + >>> print("name: %s, age: %d" % (name, age)) name: Bob, age: 42 >>> name = user[0] >>> age = user[1] - >>> print "name: %s, age: %d" % (name, age) + >>> print("name: %s, age: %d" % (name, age)) name: Bob, age: 42 .. versionchanged:: 2.0.0 @@ -188,7 +188,7 @@ def dict_factory(colnames, rows): >>> session = cluster.connect('mykeyspace') >>> session.row_factory = dict_factory >>> rows = session.execute("SELECT name, age FROM users LIMIT 1") - >>> print rows[0] + >>> print(rows[0]) {u'age': 42, u'name': u'Bob'} .. versionchanged:: 2.0.0 diff --git a/docs.yaml b/docs.yaml index d7226d5e69..eb94f74590 100644 --- a/docs.yaml +++ b/docs.yaml @@ -23,7 +23,7 @@ sections: CASS_DRIVER_NO_CYTHON=1 python setup.py build_ext --inplace --force versions: - name: '3.27' - ref: 78b1e2b + ref: 910f0282 - name: '3.26' ref: f1e9126 - name: '3.25' diff --git a/docs/api/cassandra/cqlengine/models.rst b/docs/api/cassandra/cqlengine/models.rst index 60b1471184..ee689a2b48 100644 --- a/docs/api/cassandra/cqlengine/models.rst +++ b/docs/api/cassandra/cqlengine/models.rst @@ -103,7 +103,7 @@ Model TestIfNotExistsModel.if_not_exists().create(id=id, count=9, text='111111111111') except LWTException as e: # handle failure case - print e.existing # dict containing LWT result fields + print(e.existing # dict containing LWT result fields) This method is supported on Cassandra 2.0 or later. @@ -144,7 +144,7 @@ Model t.iff(count=5).update('other text') except LWTException as e: # handle failure case - print e.existing # existing object + print(e.existing # existing object) .. automethod:: get diff --git a/docs/column_encryption.rst b/docs/column_encryption.rst index 289f9cd62b..e18b9286ed 100644 --- a/docs/column_encryption.rst +++ b/docs/column_encryption.rst @@ -14,6 +14,20 @@ also available, although in this case values must be manually encrypted and/or d Client-side encryption and decryption should work against all versions of Cassandra and DSE. It does not utilize any server-side functionality to do its work. +WARNING: Consider upgrading to 3.28.0 or later +------------------------------------------------ +There is a significant issue with the column encryption functionality in Python driver 3.27.0. +To be able to decrypt your data, you must preserve the cipher initialization vector (IV) used by +the :class:`~.AES256ColumnEncryptionPolicy` when your data was written. +To decrypt your data, you must supply this IV when creating a policy to read this data. +If you do not supply this IV in the policy to read this data, you will **NOT BE ABLE TO DECRYPT YOUR DATA**. +See +`PYTHON-1350 `_ for more detail. + +DataStax recommends upgrading to Python driver 3.28.0 or later to avoid this issue. 3.28.0 or later manages the IV automatically. +Because of this change in functionality, any encrypted data written in 3.27.0 will **NOT** be readable by 3.28.0 or later. +After upgrading to Python driver 3.28.0 or later, it is critical that you re-encrypt your data with the new driver version. + Configuration ------------- Client-side encryption is enabled by creating an instance of a subclass of :class:`~.ColumnEncryptionPolicy` @@ -21,9 +35,11 @@ and adding information about columns to be encrypted to it. This policy is then when it's created. .. code-block:: python + import os - from cassandra.policies import ColDesc, AES256ColumnEncryptionPolicy, AES256_KEY_SIZE_BYTES + from cassandra.policies import ColDesc + from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, AES256_KEY_SIZE_BYTES key = os.urandom(AES256_KEY_SIZE_BYTES) cl_policy = AES256ColumnEncryptionPolicy() @@ -57,6 +73,7 @@ supplied parameters. For example, we can create a prepared statement to insert by executing the following code after creating a :class:`~.Cluster` in the manner described above: .. code-block:: python + session = cluster.connect() prepared = session.prepare("insert into ks1.table1 (column1) values (?)") session.execute(prepared, (1000,)) @@ -70,6 +87,7 @@ standard serialization methods employed by the driver. The result is then encry of the policy. Using this approach the example above could be implemented along the lines of the following: .. code-block:: python + session = cluster.connect() session.execute("insert into ks1.table1 (column1) values (%s)",(cl_policy.encode_and_encrypt(col_desc, 1000),)) @@ -88,4 +106,4 @@ of the cryptography package, although wheels exist for many common platforms. Client-side encryption has been implemented for both the default Cython and pure Python row processing logic. This functionality has not yet been ported to the NumPy Cython implementation. During testing, -the NumPy processing works on Python 3.7 but fails for Python 3.8. \ No newline at end of file +the NumPy processing works on Python 3.7 but fails for Python 3.8. diff --git a/docs/cqlengine/connections.rst b/docs/cqlengine/connections.rst index 03ade27521..fd44303514 100644 --- a/docs/cqlengine/connections.rst +++ b/docs/cqlengine/connections.rst @@ -99,7 +99,7 @@ You can specify a default connection per model: year = columns.Integer(primary_key=True) model = columns.Text(primary_key=True) - print len(Automobile.objects.all()) # executed on the connection 'cluster2' + print(len(Automobile.objects.all())) # executed on the connection 'cluster2' QuerySet and model instance --------------------------- diff --git a/docs/cqlengine/models.rst b/docs/cqlengine/models.rst index c0ba390119..719513f4a9 100644 --- a/docs/cqlengine/models.rst +++ b/docs/cqlengine/models.rst @@ -201,7 +201,7 @@ are only created, presisted, and queried via table Models. A short example to in users.create(name="Joe", addr=address(street="Easy St.", zipcode=99999)) user = users.objects(name="Joe")[0] - print user.name, user.addr + print(user.name, user.addr) # Joe address(street=u'Easy St.', zipcode=99999) UDTs are modeled by inheriting :class:`~.usertype.UserType`, and setting column type attributes. Types are then used in defining diff --git a/docs/execution-profiles.rst b/docs/execution-profiles.rst index 7be1a85e3f..0965d77f3d 100644 --- a/docs/execution-profiles.rst +++ b/docs/execution-profiles.rst @@ -43,7 +43,7 @@ Default session = cluster.connect() local_query = 'SELECT rpc_address FROM system.local' for _ in cluster.metadata.all_hosts(): - print session.execute(local_query)[0] + print(session.execute(local_query)[0]) .. parsed-literal:: @@ -69,7 +69,7 @@ Initializing cluster with profiles profiles = {'node1': node1_profile, 'node2': node2_profile} session = Cluster(execution_profiles=profiles).connect() for _ in cluster.metadata.all_hosts(): - print session.execute(local_query, execution_profile='node1')[0] + print(session.execute(local_query, execution_profile='node1')[0]) .. parsed-literal:: @@ -81,7 +81,7 @@ Initializing cluster with profiles .. code:: python for _ in cluster.metadata.all_hosts(): - print session.execute(local_query, execution_profile='node2')[0] + print(session.execute(local_query, execution_profile='node2')[0]) .. parsed-literal:: @@ -93,7 +93,7 @@ Initializing cluster with profiles .. code:: python for _ in cluster.metadata.all_hosts(): - print session.execute(local_query)[0] + print(session.execute(local_query)[0]) .. parsed-literal:: @@ -123,7 +123,7 @@ New profiles can be added constructing from scratch, or deriving from default: cluster.add_execution_profile(node1_profile, locked_execution) for _ in cluster.metadata.all_hosts(): - print session.execute(local_query, execution_profile=node1_profile)[0] + print(session.execute(local_query, execution_profile=node1_profile)[0]) .. parsed-literal:: @@ -144,8 +144,8 @@ We also have the ability to pass profile instances to be used for execution, but tmp = session.execution_profile_clone_update('node1', request_timeout=100, row_factory=tuple_factory) - print session.execute(local_query, execution_profile=tmp)[0] - print session.execute(local_query, execution_profile='node1')[0] + print(session.execute(local_query, execution_profile=tmp)[0]) + print(session.execute(local_query, execution_profile='node1')[0]) .. parsed-literal:: diff --git a/docs/faq.rst b/docs/faq.rst index 56cb648a24..194d5520e8 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -44,7 +44,7 @@ Since tracing is done asynchronously to the request, this method polls until the >>> result = future.result() >>> trace = future.get_query_trace() >>> for e in trace.events: - >>> print e.source_elapsed, e.description + >>> print(e.source_elapsed, e.description) 0:00:00.000077 Parsing select * from system.local 0:00:00.000153 Preparing statement @@ -67,7 +67,7 @@ With prepared statements, the replicas are obtained by ``routing_key``, based on >>> bound = prepared.bind((1,)) >>> replicas = cluster.metadata.get_replicas(bound.keyspace, bound.routing_key) >>> for h in replicas: - >>> print h.address + >>> print(h.address) 127.0.0.1 127.0.0.2 diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 1969b503ba..76685c5fdf 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -119,7 +119,7 @@ way to execute a query is to use :meth:`~.Session.execute()`: rows = session.execute('SELECT name, age, email FROM users') for user_row in rows: - print user_row.name, user_row.age, user_row.email + print(user_row.name, user_row.age, user_row.email) This will transparently pick a Cassandra node to execute the query against and handle any retries that are necessary if the operation fails. @@ -135,19 +135,19 @@ examples are equivalent: rows = session.execute('SELECT name, age, email FROM users') for row in rows: - print row.name, row.age, row.email + print(row.name, row.age, row.email) .. code-block:: python rows = session.execute('SELECT name, age, email FROM users') for (name, age, email) in rows: - print name, age, email + print(name, age, email) .. code-block:: python rows = session.execute('SELECT name, age, email FROM users') for row in rows: - print row[0], row[1], row[2] + print(row[0], row[1], row[2]) If you prefer another result format, such as a ``dict`` per row, you can change the :attr:`~.Session.row_factory` attribute. @@ -335,7 +335,7 @@ For example: try: rows = future.result() user = rows[0] - print user.name, user.age + print(user.name, user.age) except ReadTimeout: log.exception("Query timed out:") @@ -352,7 +352,7 @@ This works well for executing many queries concurrently: # wait for them to complete and use the results for future in futures: rows = future.result() - print rows[0].name + print(rows[0].name) Alternatively, instead of calling :meth:`~.ResponseFuture.result()`, you can attach callback and errback functions through the diff --git a/docs/graph_fluent.rst b/docs/graph_fluent.rst index cada908f2f..ac4b4806d5 100644 --- a/docs/graph_fluent.rst +++ b/docs/graph_fluent.rst @@ -85,7 +85,7 @@ to accomplish this configuration: session = cluster.connect() g = DseGraph.traversal_source(session) # Build the GraphTraversalSource - print g.V().toList() # Traverse the Graph + print(g.V().toList()) # Traverse the Graph Note that the execution profile created with :meth:`DseGraph.create_execution_profile <.datastax.graph.fluent.DseGraph.create_execution_profile>` cannot be used for any groovy string queries. @@ -233,11 +233,11 @@ Batch Queries DSE Graph supports batch queries using a :class:`TraversalBatch <.datastax.graph.fluent.query.TraversalBatch>` object instantiated with :meth:`DseGraph.batch <.datastax.graph.fluent.DseGraph.batch>`. A :class:`TraversalBatch <.datastax.graph.fluent.query.TraversalBatch>` allows -you to execute multiple graph traversals in a single atomic transaction. A -traversal batch is executed with :meth:`.Session.execute_graph` or using -:meth:`TraversalBatch.execute <.datastax.graph.fluent.query.TraversalBatch.execute>` if bounded to a DSE session. +you to execute multiple graph traversals in a single atomic transaction. A +traversal batch is executed with :meth:`.Session.execute_graph` or using +:meth:`TraversalBatch.execute <.datastax.graph.fluent.query.TraversalBatch.execute>` if bounded to a DSE session. -Either way you choose to execute the traversal batch, you need to configure +Either way you choose to execute the traversal batch, you need to configure the execution profile accordingly. Here is a example:: from cassandra.cluster import Cluster diff --git a/docs/installation.rst b/docs/installation.rst index 4996a02c1b..204a6d4475 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -3,7 +3,7 @@ Installation Supported Platforms ------------------- -Python 2.7, 3.5, 3.6, 3.7 and 3.8 are supported. Both CPython (the standard Python +Python 3.7 and 3.8 are supported. Both CPython (the standard Python implementation) and `PyPy `_ are supported and tested. Linux, OSX, and Windows are supported. @@ -26,7 +26,7 @@ To check if the installation was successful, you can run:: python -c 'import cassandra; print cassandra.__version__' -It should print something like "3.22.0". +It should print something like "3.27.0". .. _installation-datastax-graph: @@ -34,7 +34,7 @@ It should print something like "3.22.0". --------------------------- The driver provides an optional fluent graph API that depends on Apache TinkerPop (gremlinpython). It is not installed by default. To be able to build Gremlin traversals, you need to install -the `graph` requirements:: +the `graph` extra:: pip install scylla-driver[graph] @@ -65,6 +65,27 @@ support this:: pip install scales +*Optional:* Column-Level Encryption (CLE) Support +-------------------------------------------------- +The driver has built-in support for client-side encryption and +decryption of data. For more, see :doc:`column_encryption`. + +CLE depends on the Python `cryptography `_ module. +When installing Python driver 3.27.0. the `cryptography` module is +also downloaded and installed. +If you are using Python driver 3.28.0 or later and want to use CLE, you must +install the `cryptography `_ module. + +You can install this module along with the driver by specifying the `cle` extra:: + + pip install cassandra-driver[cle] + +Alternatively, you can also install the module directly via `pip`:: + + pip install cryptography + +Any version of cryptography >= 35.0 will work for the CLE feature. You can find additional +details at `PYTHON-1351 `_ Speeding Up Installation ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/object-mapper.rst b/docs/object-mapper.rst index 421be246ac..5eb78f57b6 100644 --- a/docs/object-mapper.rst +++ b/docs/object-mapper.rst @@ -87,7 +87,7 @@ Getting Started >>> q.count() 4 >>> for instance in q: - >>> print instance.description + >>> print(instance.description) example5 example6 example7 @@ -101,5 +101,5 @@ Getting Started >>> q2.count() 1 >>> for instance in q2: - >>> print instance.description + >>> print(instance.description) example5 diff --git a/requirements.txt b/requirements.txt index 9d1a674cde..732bba1018 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -cryptography >= 35.0 geomet>=0.1,<0.3 six >=1.9 diff --git a/setup.py b/setup.py index 51ae17912b..bfe78530ef 100644 --- a/setup.py +++ b/setup.py @@ -403,11 +403,11 @@ def run_setup(extensions): dependencies = ['six >=1.9', 'geomet>=0.1,<0.3', - 'pyyaml > 5.0', - 'cryptography>=35.0'] + 'pyyaml > 5.0'] _EXTRAS_REQUIRE = { - 'graph': ['gremlinpython==3.4.6'] + 'graph': ['gremlinpython==3.4.6'], + 'cle': ['cryptography>=35.0'] } setup( @@ -426,7 +426,8 @@ def run_setup(extensions): packages=[ 'cassandra', 'cassandra.io', 'cassandra.cqlengine', 'cassandra.graph', 'cassandra.datastax', 'cassandra.datastax.insights', 'cassandra.datastax.graph', - 'cassandra.datastax.graph.fluent', 'cassandra.datastax.cloud', 'cassandra.scylla' + 'cassandra.datastax.graph.fluent', 'cassandra.datastax.cloud', 'cassandra.column_encryption', + 'cassandra.scylla', ], keywords='cassandra,cql,orm,dse,graph', include_package_data=True, diff --git a/test-datastax-requirements.txt b/test-datastax-requirements.txt index 3a47b8de16..038a8b571d 100644 --- a/test-datastax-requirements.txt +++ b/test-datastax-requirements.txt @@ -1,3 +1,4 @@ -r test-requirements.txt kerberos gremlinpython==3.4.6 +cryptography >= 35.0 diff --git a/tests/integration/standard/column_encryption/test_policies.py b/tests/integration/standard/column_encryption/test_policies.py new file mode 100644 index 0000000000..dea6b6d39e --- /dev/null +++ b/tests/integration/standard/column_encryption/test_policies.py @@ -0,0 +1,170 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +from tests.integration import use_singledc, TestCluster + +from cassandra.policies import ColDesc + +from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, \ + AES256_KEY_SIZE_BYTES, AES256_BLOCK_SIZE_BYTES + +def setup_module(): + use_singledc() + +class ColumnEncryptionPolicyTest(unittest.TestCase): + + def _recreate_keyspace(self, session): + session.execute("drop keyspace if exists foo") + session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") + session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))") + + def _create_policy(self, key, iv = None): + cl_policy = AES256ColumnEncryptionPolicy() + col_desc = ColDesc('foo','bar','encrypted') + cl_policy.add_column(col_desc, key, "int") + return (col_desc, cl_policy) + + def test_end_to_end_prepared(self): + + # We only currently perform testing on a single type/expected value pair since CLE functionality is essentially + # independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's + # encoded when coming back; the actual types of the data involved don't impact us. + expected = 0 + + key = os.urandom(AES256_KEY_SIZE_BYTES) + (_, cl_policy) = self._create_policy(key) + cluster = TestCluster(column_encryption_policy=cl_policy) + session = cluster.connect() + self._recreate_keyspace(session) + + prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)") + for i in range(100): + session.execute(prepared, (i, i)) + + # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted + # values here to confirm that we don't interfere with regular processing of unencrypted vals. + (encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) + + # Confirm the same behaviour from a subsequent prepared statement as well + prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering") + (encrypted,unencrypted) = session.execute(prepared, [expected]).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) + + def test_end_to_end_simple(self): + + expected = 1 + + key = os.urandom(AES256_KEY_SIZE_BYTES) + (col_desc, cl_policy) = self._create_policy(key) + cluster = TestCluster(column_encryption_policy=cl_policy) + session = cluster.connect() + self._recreate_keyspace(session) + + # Use encode_and_encrypt helper function to populate date + for i in range(1,100): + self.assertIsNotNone(i) + encrypted = cl_policy.encode_and_encrypt(col_desc, i) + session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i)) + + # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted + # values here to confirm that we don't interfere with regular processing of unencrypted vals. + (encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) + + # Confirm the same behaviour from a subsequent prepared statement as well + prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering") + (encrypted,unencrypted) = session.execute(prepared, [expected]).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) + + def test_end_to_end_different_cle_contexts_different_ivs(self): + """ + Test to validate PYTHON-1350. We should be able to decode the data from two different contexts (with two different IVs) + since the IV used to decrypt the data is actually now stored with the data. + """ + + expected = 2 + + key = os.urandom(AES256_KEY_SIZE_BYTES) + + # Simulate the creation of two AES256 policies at two different times. Python caches + # default param args at function definition time so a single value will be used any time + # the default val is used. Upshot is that within the same test we'll always have the same + # IV if we rely on the default args, so manually introduce some variation here to simulate + # what actually happens if you have two distinct sessions created at two different times. + iv1 = os.urandom(AES256_BLOCK_SIZE_BYTES) + (col_desc1, cl_policy1) = self._create_policy(key, iv=iv1) + cluster1 = TestCluster(column_encryption_policy=cl_policy1) + session1 = cluster1.connect() + self._recreate_keyspace(session1) + + # Use encode_and_encrypt helper function to populate date + for i in range(1,100): + self.assertIsNotNone(i) + encrypted = cl_policy1.encode_and_encrypt(col_desc1, i) + session1.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)", (encrypted, i)) + session1.shutdown() + cluster1.shutdown() + + # Explicitly clear the class-level cache here; we're trying to simulate a second connection from a completely new process and + # that would entail not re-using any cached ciphers + AES256ColumnEncryptionPolicy._build_cipher.cache_clear() + cache_info = cl_policy1.cache_info() + self.assertEqual(cache_info.currsize, 0) + + iv2 = os.urandom(AES256_BLOCK_SIZE_BYTES) + (_, cl_policy2) = self._create_policy(key, iv=iv2) + cluster2 = TestCluster(column_encryption_policy=cl_policy2) + session2 = cluster2.connect() + (encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() + self.assertEquals(expected, encrypted) + self.assertEquals(expected, unencrypted) + + def test_end_to_end_different_cle_contexts_different_policies(self): + """ + Test to validate PYTHON-1356. Class variables used to pass CLE policy down to protocol handler shouldn't persist. + """ + + expected = 3 + + key = os.urandom(AES256_KEY_SIZE_BYTES) + (col_desc, cl_policy) = self._create_policy(key) + cluster = TestCluster(column_encryption_policy=cl_policy) + session = cluster.connect() + self._recreate_keyspace(session) + + # Use encode_and_encrypt helper function to populate date + session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected)) + + # We now open a new session _without_ the CLE policy specified. We should _not_ be able to read decrypted bits from this session. + cluster2 = TestCluster() + session2 = cluster2.connect() + + # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted + # values here to confirm that we don't interfere with regular processing of unencrypted vals. + (encrypted,unencrypted) = session2.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() + self.assertEquals(cl_policy.encode_and_encrypt(col_desc, expected), encrypted) + self.assertEquals(expected, unencrypted) + + # Confirm the same behaviour from a subsequent prepared statement as well + prepared = session2.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering") + (encrypted,unencrypted) = session2.execute(prepared, [expected]).one() + self.assertEquals(cl_policy.encode_and_encrypt(col_desc, expected), encrypted) diff --git a/tests/integration/standard/test_policies.py b/tests/integration/standard/test_policies.py index d6ab586e4c..4bcce1c5bc 100644 --- a/tests/integration/standard/test_policies.py +++ b/tests/integration/standard/test_policies.py @@ -17,8 +17,7 @@ from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, SimpleConvictionPolicy, \ - WhiteListRoundRobinPolicy, ExponentialBackoffRetryPolicy, ColDesc, AES256ColumnEncryptionPolicy, \ - AES256_KEY_SIZE_BYTES + WhiteListRoundRobinPolicy, ExponentialBackoffRetryPolicy, ColDesc from cassandra.pool import Host from cassandra.connection import DefaultEndPoint @@ -26,11 +25,9 @@ from concurrent.futures import wait as wait_futures - def setup_module(): use_singledc() - class HostFilterPolicyTests(unittest.TestCase): def test_predicate_changes(self): @@ -93,7 +90,6 @@ def test_only_connects_to_subset(self): queried_hosts = set(host.address for host in queried_hosts) self.assertEqual(queried_hosts, only_connect_hosts) - class ExponentialRetryPolicyTests(unittest.TestCase): def setUp(self): @@ -109,72 +105,3 @@ def test_exponential_retries(self): CREATE KEYSPACE preparedtests WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} """) - - -class ColumnEncryptionPolicyTest(unittest.TestCase): - - def _recreate_keyspace(self, session): - session.execute("drop keyspace if exists foo") - session.execute("CREATE KEYSPACE foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}") - session.execute("CREATE TABLE foo.bar(encrypted blob, unencrypted int, primary key(unencrypted))") - - def test_end_to_end_prepared(self): - - # We only currently perform testing on a single type/expected value pair since CLE functionality is essentially - # independent of the underlying type. We intercept data after it's been encoded when it's going out and before it's - # encoded when coming back; the actual types of the data involved don't impact us. - expected = 12345 - expected_type = "int" - - key = os.urandom(AES256_KEY_SIZE_BYTES) - cl_policy = AES256ColumnEncryptionPolicy() - col_desc = ColDesc('foo','bar','encrypted') - cl_policy.add_column(col_desc, key, expected_type) - - cluster = TestCluster(column_encryption_policy=cl_policy) - session = cluster.connect() - self._recreate_keyspace(session) - - prepared = session.prepare("insert into foo.bar (encrypted, unencrypted) values (?,?)") - session.execute(prepared, (expected,expected)) - - # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted - # values here to confirm that we don't interfere with regular processing of unencrypted vals. - (encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() - self.assertEquals(expected, encrypted) - self.assertEquals(expected, unencrypted) - - # Confirm the same behaviour from a subsequent prepared statement as well - prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering") - (encrypted,unencrypted) = session.execute(prepared, [expected]).one() - self.assertEquals(expected, encrypted) - self.assertEquals(expected, unencrypted) - - def test_end_to_end_simple(self): - - expected = 67890 - expected_type = "int" - - key = os.urandom(AES256_KEY_SIZE_BYTES) - cl_policy = AES256ColumnEncryptionPolicy() - col_desc = ColDesc('foo','bar','encrypted') - cl_policy.add_column(col_desc, key, expected_type) - - cluster = TestCluster(column_encryption_policy=cl_policy) - session = cluster.connect() - self._recreate_keyspace(session) - - # Use encode_and_encrypt helper function to populate date - session.execute("insert into foo.bar (encrypted, unencrypted) values (%s,%s)",(cl_policy.encode_and_encrypt(col_desc, expected), expected)) - - # A straight select from the database will now return the decrypted bits. We select both encrypted and unencrypted - # values here to confirm that we don't interfere with regular processing of unencrypted vals. - (encrypted,unencrypted) = session.execute("select encrypted, unencrypted from foo.bar where unencrypted = %s allow filtering", (expected,)).one() - self.assertEquals(expected, encrypted) - self.assertEquals(expected, unencrypted) - - # Confirm the same behaviour from a subsequent prepared statement as well - prepared = session.prepare("select encrypted, unencrypted from foo.bar where unencrypted = ? allow filtering") - (encrypted,unencrypted) = session.execute(prepared, [expected]).one() - self.assertEquals(expected, encrypted) - self.assertEquals(expected, unencrypted) diff --git a/tests/unit/column_encryption/test_policies.py b/tests/unit/column_encryption/test_policies.py new file mode 100644 index 0000000000..38136c69d4 --- /dev/null +++ b/tests/unit/column_encryption/test_policies.py @@ -0,0 +1,169 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +from cassandra.policies import ColDesc +from cassandra.column_encryption.policies import AES256ColumnEncryptionPolicy, \ + AES256_BLOCK_SIZE_BYTES, AES256_KEY_SIZE_BYTES + +class AES256ColumnEncryptionPolicyTest(unittest.TestCase): + + def _random_block(self): + return os.urandom(AES256_BLOCK_SIZE_BYTES) + + def _random_key(self): + return os.urandom(AES256_KEY_SIZE_BYTES) + + def _test_round_trip(self, bytes): + coldesc = ColDesc('ks1','table1','col1') + policy = AES256ColumnEncryptionPolicy() + policy.add_column(coldesc, self._random_key(), "blob") + encrypted_bytes = policy.encrypt(coldesc, bytes) + self.assertEqual(bytes, policy.decrypt(coldesc, encrypted_bytes)) + + def test_no_padding_necessary(self): + self._test_round_trip(self._random_block()) + + def test_some_padding_required(self): + for byte_size in range(1,AES256_BLOCK_SIZE_BYTES - 1): + bytes = os.urandom(byte_size) + self._test_round_trip(bytes) + for byte_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1): + bytes = os.urandom(byte_size) + self._test_round_trip(bytes) + + def test_add_column_invalid_key_size_raises(self): + coldesc = ColDesc('ks1','table1','col1') + policy = AES256ColumnEncryptionPolicy() + for key_size in range(1,AES256_KEY_SIZE_BYTES - 1): + with self.assertRaises(ValueError): + policy.add_column(coldesc, os.urandom(key_size), "blob") + for key_size in range(AES256_KEY_SIZE_BYTES + 1,(2 * AES256_KEY_SIZE_BYTES) - 1): + with self.assertRaises(ValueError): + policy.add_column(coldesc, os.urandom(key_size), "blob") + + def test_add_column_invalid_iv_size_raises(self): + def test_iv_size(iv_size): + policy = AES256ColumnEncryptionPolicy(iv = os.urandom(iv_size)) + policy.add_column(coldesc, os.urandom(AES256_KEY_SIZE_BYTES), "blob") + policy.encrypt(coldesc, os.urandom(128)) + + coldesc = ColDesc('ks1','table1','col1') + for iv_size in range(1,AES256_BLOCK_SIZE_BYTES - 1): + with self.assertRaises(ValueError): + test_iv_size(iv_size) + for iv_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1): + with self.assertRaises(ValueError): + test_iv_size(iv_size) + + # Finally, confirm that the expected IV size has no issue + test_iv_size(AES256_BLOCK_SIZE_BYTES) + + def test_add_column_null_coldesc_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + policy.add_column(None, self._random_block(), "blob") + + def test_add_column_null_key_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, None, "blob") + + def test_add_column_null_type_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_block(), None) + + def test_add_column_unknown_type_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_block(), "foobar") + + def test_encode_and_encrypt_null_coldesc_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_key(), "blob") + policy.encode_and_encrypt(None, self._random_block()) + + def test_encode_and_encrypt_null_obj_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_key(), "blob") + policy.encode_and_encrypt(coldesc, None) + + def test_encode_and_encrypt_unknown_coldesc_raises(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_key(), "blob") + policy.encode_and_encrypt(ColDesc('ks2','table2','col2'), self._random_block()) + + def test_contains_column(self): + coldesc = ColDesc('ks1','table1','col1') + policy = AES256ColumnEncryptionPolicy() + policy.add_column(coldesc, self._random_key(), "blob") + self.assertTrue(policy.contains_column(coldesc)) + self.assertFalse(policy.contains_column(ColDesc('ks2','table1','col1'))) + self.assertFalse(policy.contains_column(ColDesc('ks1','table2','col1'))) + self.assertFalse(policy.contains_column(ColDesc('ks1','table1','col2'))) + self.assertFalse(policy.contains_column(ColDesc('ks2','table2','col2'))) + + def test_encrypt_unknown_column(self): + with self.assertRaises(ValueError): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_key(), "blob") + policy.encrypt(ColDesc('ks2','table2','col2'), self._random_block()) + + def test_decrypt_unknown_column(self): + policy = AES256ColumnEncryptionPolicy() + coldesc = ColDesc('ks1','table1','col1') + policy.add_column(coldesc, self._random_key(), "blob") + encrypted_bytes = policy.encrypt(coldesc, self._random_block()) + with self.assertRaises(ValueError): + policy.decrypt(ColDesc('ks2','table2','col2'), encrypted_bytes) + + def test_cache_info(self): + # Exclude any interference from tests above + AES256ColumnEncryptionPolicy._build_cipher.cache_clear() + + coldesc1 = ColDesc('ks1','table1','col1') + coldesc2 = ColDesc('ks2','table2','col2') + coldesc3 = ColDesc('ks3','table3','col3') + policy = AES256ColumnEncryptionPolicy() + for coldesc in [coldesc1, coldesc2, coldesc3]: + policy.add_column(coldesc, self._random_key(), "blob") + + # First run for this coldesc should be a miss, everything else should be a cache hit + for _ in range(10): + policy.encrypt(coldesc1, self._random_block()) + cache_info = policy.cache_info() + self.assertEqual(cache_info.hits, 9) + self.assertEqual(cache_info.misses, 1) + self.assertEqual(cache_info.maxsize, 128) + + # Important note: we're measuring the size of the cache of ciphers, NOT stored + # keys. We won't have a cipher here until we actually encrypt something + self.assertEqual(cache_info.currsize, 1) + policy.encrypt(coldesc2, self._random_block()) + self.assertEqual(policy.cache_info().currsize, 2) + policy.encrypt(coldesc3, self._random_block()) + self.assertEqual(policy.cache_info().currsize, 3) diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index 37a8f8a523..d6595741aa 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -34,8 +34,8 @@ DowngradingConsistencyRetryPolicy, ConstantReconnectionPolicy, LoadBalancingPolicy, ConvictionPolicy, ReconnectionPolicy, FallthroughRetryPolicy, IdentityTranslator, EC2MultiRegionTranslator, HostFilterPolicy, ExponentialBackoffRetryPolicy, - AES256ColumnEncryptionPolicy, ColDesc, - AES256_BLOCK_SIZE_BYTES, AES256_KEY_SIZE_BYTES) + ColDesc) + from cassandra.pool import Host from cassandra.connection import DefaultEndPoint, UnixSocketEndPoint @@ -1538,133 +1538,3 @@ def test_create_whitelist(self): # Only the filtered replicas should be allowed self.assertEqual(set(query_plan), {Host(DefaultEndPoint("127.0.0.1"), SimpleConvictionPolicy), Host(DefaultEndPoint("127.0.0.4"), SimpleConvictionPolicy)}) - - -class AES256ColumnEncryptionPolicyTest(unittest.TestCase): - - def _random_block(self): - return os.urandom(AES256_BLOCK_SIZE_BYTES) - - def _random_key(self): - return os.urandom(AES256_KEY_SIZE_BYTES) - - def _test_round_trip(self, bytes): - coldesc = ColDesc('ks1','table1','col1') - policy = AES256ColumnEncryptionPolicy() - policy.add_column(coldesc, self._random_key(), "blob") - encrypted_bytes = policy.encrypt(coldesc, bytes) - self.assertEqual(bytes, policy.decrypt(coldesc, encrypted_bytes)) - - def test_no_padding_necessary(self): - self._test_round_trip(self._random_block()) - - def test_some_padding_required(self): - for byte_size in range(1,AES256_BLOCK_SIZE_BYTES - 1): - bytes = os.urandom(byte_size) - self._test_round_trip(bytes) - for byte_size in range(AES256_BLOCK_SIZE_BYTES + 1,(2 * AES256_BLOCK_SIZE_BYTES) - 1): - bytes = os.urandom(byte_size) - self._test_round_trip(bytes) - - def test_add_column_invalid_key_size_raises(self): - coldesc = ColDesc('ks1','table1','col1') - policy = AES256ColumnEncryptionPolicy() - for key_size in range(1,AES256_KEY_SIZE_BYTES - 1): - with self.assertRaises(ValueError): - policy.add_column(coldesc, os.urandom(key_size), "blob") - for key_size in range(AES256_KEY_SIZE_BYTES + 1,(2 * AES256_KEY_SIZE_BYTES) - 1): - with self.assertRaises(ValueError): - policy.add_column(coldesc, os.urandom(key_size), "blob") - - def test_add_column_null_coldesc_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - policy.add_column(None, self._random_block(), "blob") - - def test_add_column_null_key_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, None, "blob") - - def test_add_column_null_type_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_block(), None) - - def test_add_column_unknown_type_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_block(), "foobar") - - def test_encode_and_encrypt_null_coldesc_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_key(), "blob") - policy.encode_and_encrypt(None, self._random_block()) - - def test_encode_and_encrypt_null_obj_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_key(), "blob") - policy.encode_and_encrypt(coldesc, None) - - def test_encode_and_encrypt_unknown_coldesc_raises(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_key(), "blob") - policy.encode_and_encrypt(ColDesc('ks2','table2','col2'), self._random_block()) - - def test_contains_column(self): - coldesc = ColDesc('ks1','table1','col1') - policy = AES256ColumnEncryptionPolicy() - policy.add_column(coldesc, self._random_key(), "blob") - self.assertTrue(policy.contains_column(coldesc)) - self.assertFalse(policy.contains_column(ColDesc('ks2','table1','col1'))) - self.assertFalse(policy.contains_column(ColDesc('ks1','table2','col1'))) - self.assertFalse(policy.contains_column(ColDesc('ks1','table1','col2'))) - self.assertFalse(policy.contains_column(ColDesc('ks2','table2','col2'))) - - def test_encrypt_unknown_column(self): - with self.assertRaises(ValueError): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_key(), "blob") - policy.encrypt(ColDesc('ks2','table2','col2'), self._random_block()) - - def test_decrypt_unknown_column(self): - policy = AES256ColumnEncryptionPolicy() - coldesc = ColDesc('ks1','table1','col1') - policy.add_column(coldesc, self._random_key(), "blob") - encrypted_bytes = policy.encrypt(coldesc, self._random_block()) - with self.assertRaises(ValueError): - policy.decrypt(ColDesc('ks2','table2','col2'), encrypted_bytes) - - def test_cache_info(self): - coldesc1 = ColDesc('ks1','table1','col1') - coldesc2 = ColDesc('ks2','table2','col2') - coldesc3 = ColDesc('ks3','table3','col3') - policy = AES256ColumnEncryptionPolicy() - for coldesc in [coldesc1, coldesc2, coldesc3]: - policy.add_column(coldesc, self._random_key(), "blob") - - # First run for this coldesc should be a miss, everything else should be a cache hit - for _ in range(10): - policy.encrypt(coldesc1, self._random_block()) - cache_info = policy.cache_info() - self.assertEqual(cache_info.hits, 9) - self.assertEqual(cache_info.misses, 1) - self.assertEqual(cache_info.maxsize, 128) - - # Important note: we're measuring the size of the cache of ciphers, NOT stored - # keys. We won't have a cipher here until we actually encrypt something - self.assertEqual(cache_info.currsize, 1) - policy.encrypt(coldesc2, self._random_block()) - self.assertEqual(policy.cache_info().currsize, 2) - policy.encrypt(coldesc3, self._random_block()) - self.assertEqual(policy.cache_info().currsize, 3) diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index af3b327ef8..e85f5dbe67 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -27,7 +27,8 @@ EmptyValue, LongType, SetType, UTF8Type, cql_typename, int8_pack, int64_pack, lookup_casstype, lookup_casstype_simple, parse_casstype_args, - int32_pack, Int32Type, ListType, MapType + int32_pack, Int32Type, ListType, MapType, VectorType, + FloatType ) from cassandra.encoder import cql_quote from cassandra.pool import Host @@ -190,6 +191,12 @@ class BarType(FooType): self.assertEqual(UTF8Type, ctype.subtypes[2]) self.assertEqual([b'city', None, b'zip'], ctype.names) + def test_parse_casstype_vector(self): + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 3)") + self.assertTrue(issubclass(ctype, VectorType)) + self.assertEqual(3, ctype.vector_size) + self.assertEqual(FloatType, ctype.subtype) + def test_empty_value(self): self.assertEqual(str(EmptyValue()), 'EMPTY') @@ -303,6 +310,19 @@ def test_cql_quote(self): self.assertEqual(cql_quote('test'), "'test'") self.assertEqual(cql_quote(0), '0') + def test_vector_round_trip(self): + base = [3.4, 2.9, 41.6, 12.0] + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") + base_bytes = ctype.serialize(base, 0) + self.assertEqual(16, len(base_bytes)) + result = ctype.deserialize(base_bytes, 0) + self.assertEqual(len(base), len(result)) + for idx in range(0,len(base)): + self.assertAlmostEqual(base[idx], result[idx], places=5) + + def test_vector_cql_parameterized_type(self): + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") + self.assertEqual(ctype.cql_parameterized_type(), "org.apache.cassandra.db.marshal.VectorType") ZERO = datetime.timedelta(0)