From 5629bacf780cf79c3af4cda3de50986c82e18c0d Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 11 Aug 2021 00:36:11 -0400 Subject: [PATCH] tests: move systests into separate modules, refactor using pytest (#474) * tests: move instance API systests to own module Refactor to use pytest fixtures / idioms, rather than old 'Config' setup / teardown. Toward #472. * tests: move database API systests to own module Refactor to use pytest fixtures / idioms, rather than old 'Config' setup / teardown. Toward #472. * tests: move table API systests to own module Refactor to use pytest fixtures / idioms, rather than old 'Config' setup / teardown. Toward #472. * tests: move backup API systests to own module [WIP] Refactor to use pytest fixtures / idioms, rather than old 'Config' setup / teardown. Toward #472. * tests: move streaming/chunnking systests to own module Refactor to use pytest fixtures / idioms, rather than old 'Config' setup / teardown. Toward #472. * tests: move session API systests to own module Refactor to use pytest fixtures / idioms, rather than old 'Config' setup/ teardown. Toward #472. * tests: move dbapi systests to owwn module Refactor to use pytest fixtures / idioms, rather than old 'Confog' setup / teardown. Toward #472. * tests: remove legacy systest setup / teardown code Closes #472. * tests: don't pre-create datbase before restore attempt * tests: fix instance config fixtures under emulator * tests: clean up alt instnce at module scope Avoids clash with 'test_list_instances' expectatons. * tests: work around MethodNotImplemented Raised from 'ListBackups' API on the CI emulator, but not locally. * chore: drop use of pytz in systests See #479 for rationale. * chore: fix fossil in comment * chore: move '_check_batch_status' to only calling module Likewise the 'FauxCall' helper class it uses. * chore: improve testcase name * tests: replicate dbapi systest changes from #412 into new module --- tests/system/_helpers.py | 110 + tests/system/_sample_data.py | 87 + tests/system/conftest.py | 153 ++ tests/system/test_backup_api.py | 466 ++++ tests/system/test_database_api.py | 360 +++ tests/system/test_dbapi.py | 352 +++ tests/system/test_instance_api.py | 139 + tests/system/test_session_api.py | 2159 +++++++++++++++ tests/system/test_streaming_chunking.py | 75 + tests/system/test_system.py | 3200 ----------------------- tests/system/test_system_dbapi.py | 432 --- tests/system/test_table_api.py | 69 + 12 files changed, 3970 insertions(+), 3632 deletions(-) create mode 100644 tests/system/_helpers.py create mode 100644 tests/system/_sample_data.py create mode 100644 tests/system/conftest.py create mode 100644 tests/system/test_backup_api.py create mode 100644 tests/system/test_database_api.py create mode 100644 tests/system/test_dbapi.py create mode 100644 tests/system/test_instance_api.py create mode 100644 tests/system/test_session_api.py create mode 100644 tests/system/test_streaming_chunking.py delete mode 100644 tests/system/test_system.py delete mode 100644 tests/system/test_system_dbapi.py create mode 100644 tests/system/test_table_api.py diff --git a/tests/system/_helpers.py b/tests/system/_helpers.py new file mode 100644 index 0000000000..75c4bb7f43 --- /dev/null +++ b/tests/system/_helpers.py @@ -0,0 +1,110 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import operator +import os +import time + +from google.api_core import exceptions +from google.cloud.spanner_v1 import instance as instance_mod +from tests import _fixtures +from test_utils import retry +from test_utils import system + + +CREATE_INSTANCE_ENVVAR = "GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE" +CREATE_INSTANCE = os.getenv(CREATE_INSTANCE_ENVVAR) is not None + +INSTANCE_ID_ENVVAR = "GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE" +INSTANCE_ID_DEFAULT = "google-cloud-python-systest" +INSTANCE_ID = os.environ.get(INSTANCE_ID_ENVVAR, INSTANCE_ID_DEFAULT) + +SKIP_BACKUP_TESTS_ENVVAR = "SKIP_BACKUP_TESTS" +SKIP_BACKUP_TESTS = os.getenv(SKIP_BACKUP_TESTS_ENVVAR) is not None + +SPANNER_OPERATION_TIMEOUT_IN_SECONDS = int( + os.getenv("SPANNER_OPERATION_TIMEOUT_IN_SECONDS", 60) +) + +USE_EMULATOR_ENVVAR = "SPANNER_EMULATOR_HOST" +USE_EMULATOR = os.getenv(USE_EMULATOR_ENVVAR) is not None + +EMULATOR_PROJECT_ENVVAR = "GCLOUD_PROJECT" +EMULATOR_PROJECT_DEFAULT = "emulator-test-project" +EMULATOR_PROJECT = os.getenv(EMULATOR_PROJECT_ENVVAR, EMULATOR_PROJECT_DEFAULT) + + +DDL_STATEMENTS = ( + _fixtures.EMULATOR_DDL_STATEMENTS if USE_EMULATOR else _fixtures.DDL_STATEMENTS +) + +retry_true = retry.RetryResult(operator.truth) +retry_false = retry.RetryResult(operator.not_) + +retry_503 = retry.RetryErrors(exceptions.ServiceUnavailable) +retry_429_503 = retry.RetryErrors( + exceptions.TooManyRequests, exceptions.ServiceUnavailable, +) +retry_mabye_aborted_txn = retry.RetryErrors(exceptions.ServerError, exceptions.Aborted) +retry_mabye_conflict = retry.RetryErrors(exceptions.ServerError, exceptions.Conflict) + + +def _has_all_ddl(database): + # Predicate to test for EC completion. + return len(database.ddl_statements) == len(DDL_STATEMENTS) + + +retry_has_all_dll = retry.RetryInstanceState(_has_all_ddl) + + +def scrub_instance_backups(to_scrub): + try: + for backup_pb in to_scrub.list_backups(): + bkp = instance_mod.Backup.from_pb(backup_pb, to_scrub) + try: + # Instance cannot be deleted while backups exist. + retry_429_503(bkp.delete)() + except exceptions.NotFound: # lost the race + pass + except exceptions.MethodNotImplemented: + # The CI emulator raises 501: local versions seem fine. + pass + + +def scrub_instance_ignore_not_found(to_scrub): + """Helper for func:`cleanup_old_instances`""" + scrub_instance_backups(to_scrub) + + try: + retry_429_503(to_scrub.delete)() + except exceptions.NotFound: # lost the race + pass + + +def cleanup_old_instances(spanner_client): + cutoff = int(time.time()) - 1 * 60 * 60 # one hour ago + instance_filter = "labels.python-spanner-systests:true" + + for instance_pb in spanner_client.list_instances(filter_=instance_filter): + instance = instance_mod.Instance.from_pb(instance_pb, spanner_client) + + if "created" in instance.labels: + create_time = int(instance.labels["created"]) + + if create_time <= cutoff: + scrub_instance_ignore_not_found(instance) + + +def unique_id(prefix, separator="-"): + return f"{prefix}{system.unique_resource_id(separator)}" diff --git a/tests/system/_sample_data.py b/tests/system/_sample_data.py new file mode 100644 index 0000000000..65f6e23ad3 --- /dev/null +++ b/tests/system/_sample_data.py @@ -0,0 +1,87 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import math + +from google.api_core import datetime_helpers +from google.cloud._helpers import UTC +from google.cloud import spanner_v1 + + +TABLE = "contacts" +COLUMNS = ("contact_id", "first_name", "last_name", "email") +ROW_DATA = ( + (1, u"Phred", u"Phlyntstone", u"phred@example.com"), + (2, u"Bharney", u"Rhubble", u"bharney@example.com"), + (3, u"Wylma", u"Phlyntstone", u"wylma@example.com"), +) +ALL = spanner_v1.KeySet(all_=True) +SQL = "SELECT * FROM contacts ORDER BY contact_id" + +COUNTERS_TABLE = "counters" +COUNTERS_COLUMNS = ("name", "value") + + +def _assert_timestamp(value, nano_value): + assert isinstance(value, datetime.datetime) + assert value.tzinfo is None + assert nano_value.tzinfo is UTC + + assert value.year == nano_value.year + assert value.month == nano_value.month + assert value.day == nano_value.day + assert value.hour == nano_value.hour + assert value.minute == nano_value.minute + assert value.second == nano_value.second + assert value.microsecond == nano_value.microsecond + + if isinstance(value, datetime_helpers.DatetimeWithNanoseconds): + assert value.nanosecond == nano_value.nanosecond + else: + assert value.microsecond * 1000 == nano_value.nanosecond + + +def _check_rows_data(rows_data, expected=ROW_DATA, recurse_into_lists=True): + assert len(rows_data) == len(expected) + + for row, expected in zip(rows_data, expected): + _check_row_data(row, expected, recurse_into_lists=recurse_into_lists) + + +def _check_row_data(row_data, expected, recurse_into_lists=True): + assert len(row_data) == len(expected) + + for found_cell, expected_cell in zip(row_data, expected): + _check_cell_data( + found_cell, expected_cell, recurse_into_lists=recurse_into_lists + ) + + +def _check_cell_data(found_cell, expected_cell, recurse_into_lists=True): + + if isinstance(found_cell, datetime_helpers.DatetimeWithNanoseconds): + _assert_timestamp(expected_cell, found_cell) + + elif isinstance(found_cell, float) and math.isnan(found_cell): + assert math.isnan(expected_cell) + + elif isinstance(found_cell, list) and recurse_into_lists: + assert len(found_cell) == len(expected_cell) + + for found_item, expected_item in zip(found_cell, expected_cell): + _check_cell_data(found_item, expected_item) + + else: + assert found_cell == expected_cell diff --git a/tests/system/conftest.py b/tests/system/conftest.py new file mode 100644 index 0000000000..cd3728525b --- /dev/null +++ b/tests/system/conftest.py @@ -0,0 +1,153 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import pytest + +from google.cloud import spanner_v1 +from . import _helpers + + +@pytest.fixture(scope="function") +def if_create_instance(): + if not _helpers.CREATE_INSTANCE: + pytest.skip(f"{_helpers.CREATE_INSTANCE_ENVVAR} not set in environment.") + + +@pytest.fixture(scope="function") +def no_create_instance(): + if _helpers.CREATE_INSTANCE: + pytest.skip(f"{_helpers.CREATE_INSTANCE_ENVVAR} set in environment.") + + +@pytest.fixture(scope="function") +def if_backup_tests(): + if _helpers.SKIP_BACKUP_TESTS: + pytest.skip(f"{_helpers.SKIP_BACKUP_TESTS_ENVVAR} set in environment.") + + +@pytest.fixture(scope="function") +def not_emulator(): + if _helpers.USE_EMULATOR: + pytest.skip(f"{_helpers.USE_EMULATOR_ENVVAR} set in environment.") + + +@pytest.fixture(scope="session") +def spanner_client(): + if _helpers.USE_EMULATOR: + from google.auth.credentials import AnonymousCredentials + + credentials = AnonymousCredentials() + return spanner_v1.Client( + project=_helpers.EMULATOR_PROJECT, credentials=credentials, + ) + else: + return spanner_v1.Client() # use google.auth.default credentials + + +@pytest.fixture(scope="session") +def operation_timeout(): + return _helpers.SPANNER_OPERATION_TIMEOUT_IN_SECONDS + + +@pytest.fixture(scope="session") +def shared_instance_id(): + if _helpers.CREATE_INSTANCE: + return f"{_helpers.unique_id('google-cloud')}" + + return _helpers.INSTANCE_ID + + +@pytest.fixture(scope="session") +def instance_configs(spanner_client): + configs = list(_helpers.retry_503(spanner_client.list_instance_configs)()) + + if not _helpers.USE_EMULATOR: + + # Defend against back-end returning configs for regions we aren't + # actually allowed to use. + configs = [config for config in configs if "-us-" in config.name] + + yield configs + + +@pytest.fixture(scope="session") +def instance_config(instance_configs): + if not instance_configs: + raise ValueError("No instance configs found.") + + yield instance_configs[0] + + +@pytest.fixture(scope="session") +def existing_instances(spanner_client): + instances = list(_helpers.retry_503(spanner_client.list_instances)()) + + yield instances + + +@pytest.fixture(scope="session") +def shared_instance( + spanner_client, + operation_timeout, + shared_instance_id, + instance_config, + existing_instances, # evalutate before creating one +): + _helpers.cleanup_old_instances(spanner_client) + + if _helpers.CREATE_INSTANCE: + create_time = str(int(time.time())) + labels = {"python-spanner-systests": "true", "created": create_time} + + instance = spanner_client.instance( + shared_instance_id, instance_config.name, labels=labels + ) + created_op = _helpers.retry_429_503(instance.create)() + created_op.result(operation_timeout) # block until completion + + else: # reuse existing instance + instance = spanner_client.instance(shared_instance_id) + instance.reload() + + yield instance + + if _helpers.CREATE_INSTANCE: + _helpers.retry_429_503(instance.delete)() + + +@pytest.fixture(scope="session") +def shared_database(shared_instance, operation_timeout): + database_name = _helpers.unique_id("test_database") + pool = spanner_v1.BurstyPool(labels={"testcase": "database_api"}) + database = shared_instance.database( + database_name, ddl_statements=_helpers.DDL_STATEMENTS, pool=pool + ) + operation = database.create() + operation.result(operation_timeout) # raises on failure / timeout. + + yield database + + database.drop() + + +@pytest.fixture(scope="function") +def databases_to_delete(): + to_delete = [] + + yield to_delete + + for database in to_delete: + database.drop() diff --git a/tests/system/test_backup_api.py b/tests/system/test_backup_api.py new file mode 100644 index 0000000000..b3a9642f4c --- /dev/null +++ b/tests/system/test_backup_api.py @@ -0,0 +1,466 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import time + +import pytest + +from google.api_core import exceptions +from google.cloud import spanner_v1 +from . import _helpers + +skip_env_reason = f"""\ +Remove {_helpers.SKIP_BACKUP_TESTS_ENVVAR} from environment to run these tests.\ +""" +skip_emulator_reason = "Backup operations not supported by emulator." + +pytestmark = [ + pytest.mark.skipif(_helpers.SKIP_BACKUP_TESTS, reason=skip_env_reason), + pytest.mark.skipif(_helpers.USE_EMULATOR, reason=skip_emulator_reason), +] + + +@pytest.fixture(scope="session") +def same_config_instance(spanner_client, shared_instance, operation_timeout): + current_config = shared_instance.configuration_name + same_config_instance_id = _helpers.unique_id("same-config") + create_time = str(int(time.time())) + labels = {"python-spanner-systests": "true", "created": create_time} + same_config_instance = spanner_client.instance( + same_config_instance_id, current_config, labels=labels + ) + op = same_config_instance.create() + op.result(operation_timeout) + + yield same_config_instance + + _helpers.scrub_instance_ignore_not_found(same_config_instance) + + +@pytest.fixture(scope="session") +def diff_config(shared_instance, instance_configs): + current_config = shared_instance.configuration_name + for config in instance_configs: + if "-us-" in config.name and config.name != current_config: + return config.name + return None + + +@pytest.fixture(scope="session") +def diff_config_instance( + spanner_client, shared_instance, operation_timeout, diff_config, +): + if diff_config is None: + return None + + diff_config_instance_id = _helpers.unique_id("diff-config") + create_time = str(int(time.time())) + labels = {"python-spanner-systests": "true", "created": create_time} + diff_config_instance = spanner_client.instance( + diff_config_instance_id, diff_config, labels=labels + ) + op = diff_config_instance.create() + op.result(operation_timeout) + + yield diff_config_instance + + _helpers.scrub_instance_ignore_not_found(diff_config_instance) + + +@pytest.fixture(scope="session") +def database_version_time(): + return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + + +@pytest.fixture(scope="session") +def second_database(shared_instance, operation_timeout): + database_name = _helpers.unique_id("test_database2") + pool = spanner_v1.BurstyPool(labels={"testcase": "database_api"}) + database = shared_instance.database( + database_name, ddl_statements=_helpers.DDL_STATEMENTS, pool=pool + ) + operation = database.create() + operation.result(operation_timeout) # raises on failure / timeout. + + yield database + + database.drop() + + +@pytest.fixture(scope="function") +def backups_to_delete(): + to_delete = [] + + yield to_delete + + for backup in to_delete: + _helpers.retry_429_503(backup.delete)() + + +def test_backup_workflow( + shared_instance, + shared_database, + database_version_time, + backups_to_delete, + databases_to_delete, +): + from google.cloud.spanner_admin_database_v1 import ( + CreateBackupEncryptionConfig, + EncryptionConfig, + EncryptionInfo, + RestoreDatabaseEncryptionConfig, + ) + + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + encryption_enum = CreateBackupEncryptionConfig.EncryptionType + encryption_config = CreateBackupEncryptionConfig( + encryption_type=encryption_enum.GOOGLE_DEFAULT_ENCRYPTION, + ) + + # Create backup. + backup = shared_instance.backup( + backup_id, + database=shared_database, + expire_time=expire_time, + version_time=database_version_time, + encryption_config=encryption_config, + ) + operation = backup.create() + backups_to_delete.append(backup) + + # Check metadata. + metadata = operation.metadata + assert backup.name == metadata.name + assert shared_database.name == metadata.database + operation.result() # blocks indefinitely + + # Check backup object. + backup.reload() + assert shared_database.name == backup._database + assert expire_time == backup.expire_time + assert backup.create_time is not None + assert database_version_time == backup.version_time + assert backup.size_bytes is not None + assert backup.state is not None + assert ( + EncryptionInfo.Type.GOOGLE_DEFAULT_ENCRYPTION + == backup.encryption_info.encryption_type + ) + + # Update with valid argument. + valid_expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=7) + valid_expire_time = valid_expire_time.replace(tzinfo=datetime.timezone.utc) + backup.update_expire_time(valid_expire_time) + assert valid_expire_time == backup.expire_time + + # Restore database to same instance. + restored_id = _helpers.unique_id("restored_db", separator="_") + encryption_config = RestoreDatabaseEncryptionConfig( + encryption_type=RestoreDatabaseEncryptionConfig.EncryptionType.GOOGLE_DEFAULT_ENCRYPTION, + ) + database = shared_instance.database( + restored_id, encryption_config=encryption_config, + ) + databases_to_delete.append(database) + operation = database.restore(source=backup) + restored_db = operation.result() # blocks indefinitely + assert database_version_time == restored_db.restore_info.backup_info.version_time + + metadata = operation.metadata + assert database_version_time == metadata.backup_info.version_time + + database.reload() + expected_encryption_config = EncryptionConfig() + assert expected_encryption_config == database.encryption_config + + database.drop() + backup.delete() + assert not backup.exists() + + +def test_backup_create_w_version_time_dflt_to_create_time( + shared_instance, + shared_database, + database_version_time, + backups_to_delete, + databases_to_delete, +): + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + + # Create backup. + backup = shared_instance.backup( + backup_id, database=shared_database, expire_time=expire_time, + ) + operation = backup.create() + backups_to_delete.append(backup) + + # Check metadata. + metadata = operation.metadata + assert backup.name == metadata.name + assert shared_database.name == metadata.database + operation.result() # blocks indefinitely + + # Check backup object. + backup.reload() + assert shared_database.name == backup._database + assert backup.create_time is not None + assert backup.create_time == backup.version_time + + backup.delete() + assert not backup.exists() + + +def test_backup_create_w_invalid_expire_time(shared_instance, shared_database): + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + + backup = shared_instance.backup( + backup_id, database=shared_database, expire_time=expire_time + ) + + with pytest.raises(exceptions.InvalidArgument): + op = backup.create() + op.result() # blocks indefinitely + + +def test_backup_create_w_invalid_version_time_past( + shared_instance, shared_database, +): + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + version_time = datetime.datetime.utcnow() - datetime.timedelta(days=10) + version_time = version_time.replace(tzinfo=datetime.timezone.utc) + + backup = shared_instance.backup( + backup_id, + database=shared_database, + expire_time=expire_time, + version_time=version_time, + ) + + with pytest.raises(exceptions.InvalidArgument): + op = backup.create() + op.result() # blocks indefinitely + + +def test_backup_create_w_invalid_version_time_future( + shared_instance, shared_database, +): + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + version_time = datetime.datetime.utcnow() + datetime.timedelta(days=2) + version_time = version_time.replace(tzinfo=datetime.timezone.utc) + + backup = shared_instance.backup( + backup_id, + database=shared_database, + expire_time=expire_time, + version_time=version_time, + ) + + with pytest.raises(exceptions.InvalidArgument): + op = backup.create() + op.result() # blocks indefinitely + + +def test_database_restore_to_diff_instance( + shared_instance, + shared_database, + backups_to_delete, + same_config_instance, + databases_to_delete, +): + backup_id = _helpers.unique_id("backup_id", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + + # Create backup. + backup = shared_instance.backup( + backup_id, database=shared_database, expire_time=expire_time, + ) + op = backup.create() + backups_to_delete.append(backup) + op.result() + + # Restore database to different instance with same config. + restored_id = _helpers.unique_id("restored_db") + database = same_config_instance.database(restored_id) + databases_to_delete.append(database) + operation = database.restore(source=backup) + operation.result() # blocks indefinitely + + database.drop() + backup.delete() + assert not backup.exists() + + +def test_multi_create_cancel_update_error_restore_errors( + shared_instance, + shared_database, + second_database, + diff_config_instance, + backups_to_delete, + databases_to_delete, + operation_timeout, +): + backup_id_1 = _helpers.unique_id("backup_id1", separator="_") + backup_id_2 = _helpers.unique_id("backup_id2", separator="_") + expire_time = datetime.datetime.utcnow() + datetime.timedelta(days=3) + expire_time = expire_time.replace(tzinfo=datetime.timezone.utc) + + backup1 = shared_instance.backup( + backup_id_1, database=shared_database, expire_time=expire_time + ) + backup2 = shared_instance.backup( + backup_id_2, database=second_database, expire_time=expire_time + ) + + # Create two backups. + op1 = backup1.create() + backups_to_delete.append(backup1) + op2 = backup2.create() + backups_to_delete.append(backup2) + + backup1.reload() + assert not backup1.is_ready() + + backup2.reload() + assert not backup2.is_ready() + + # Cancel a create operation. + op2.cancel() + assert op2.cancelled() + + op1.result() # blocks indefinitely + backup1.reload() + assert backup1.is_ready() + + # Update expire time to invalid value. + max_expire_days = 366 # documented maximum + invalid_expire_time = datetime.datetime.now().replace( + tzinfo=datetime.timezone.utc + ) + datetime.timedelta(days=max_expire_days + 1) + with pytest.raises(exceptions.InvalidArgument): + backup1.update_expire_time(invalid_expire_time) + + # Restore to existing database. + with pytest.raises(exceptions.AlreadyExists): + shared_database.restore(source=backup1) + + # Restore to instance with different config. + if diff_config_instance is not None: + new_db = diff_config_instance.database("diff_config") + + with pytest.raises(exceptions.InvalidArgument): + new_db.restore(source=backup1) + + +def test_instance_list_backups( + shared_instance, + shared_database, + second_database, + database_version_time, + backups_to_delete, +): + # Remove un-scrubbed backups FBO count below. + _helpers.scrub_instance_backups(shared_instance) + + backup_id_1 = _helpers.unique_id("backup_id1", separator="_") + backup_id_2 = _helpers.unique_id("backup_id2", separator="_") + + expire_time_1 = datetime.datetime.utcnow() + datetime.timedelta(days=21) + expire_time_1 = expire_time_1.replace(tzinfo=datetime.timezone.utc) + expire_time_1_stamp = expire_time_1.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + backup1 = shared_instance.backup( + backup_id_1, + database=shared_database, + expire_time=expire_time_1, + version_time=database_version_time, + ) + + expire_time_2 = datetime.datetime.utcnow() + datetime.timedelta(days=1) + expire_time_2 = expire_time_2.replace(tzinfo=datetime.timezone.utc) + backup2 = shared_instance.backup( + backup_id_2, database=second_database, expire_time=expire_time_2 + ) + + # Create two backups. + op1 = backup1.create() + backups_to_delete.append(backup1) + op1.result() # blocks indefinitely + backup1.reload() + + create_time_compare = datetime.datetime.utcnow().replace( + tzinfo=datetime.timezone.utc + ) + create_time_stamp = create_time_compare.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + backup2.create() + # This test doesn't block for the result of the 'backup2.create()' call + # because it wants to find `backup2` in the upcoming search for + # backups matching 'state;CREATING`: inherently racy, but probably + # safe, given how long it takes to create a backup (on the order of + # minutes, not seconds). + backups_to_delete.append(backup2) + + # List backups filtered by state. + filter_ = "state:CREATING" + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup2.name + + # List backups filtered by backup name. + filter_ = f"name:{backup_id_1}" + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup1.name + + # List backups filtered by database name. + filter_ = f"database:{shared_database.name}" + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup1.name + + # List backups filtered by create time. + filter_ = f'create_time > "{create_time_stamp}"' + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup2.name + + # List backups filtered by version time. + filter_ = f'version_time > "{create_time_stamp}"' + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup2.name + + # List backups filtered by expire time. + filter_ = f'expire_time > "{expire_time_1_stamp}"' + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup1.name + + # List backups filtered by size bytes. + # XXX: this one may only pass if other tests have run first, + # munging 'shared_database' so that its backup will be bigger? + filter_ = f"size_bytes < {backup1.size_bytes}" + for backup in shared_instance.list_backups(filter_=filter_): + assert backup.name == backup2.name + + # List backups using pagination. + count = 0 + for page in shared_instance.list_backups(page_size=1): + count += 1 + assert count == 2 diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py new file mode 100644 index 0000000000..3f2831cec0 --- /dev/null +++ b/tests/system/test_database_api.py @@ -0,0 +1,360 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import uuid + +import pytest + +from google.api_core import exceptions +from google.cloud import spanner_v1 +from . import _helpers +from . import _sample_data + + +DBAPI_OPERATION_TIMEOUT = 240 # seconds + + +@pytest.fixture(scope="module") +def multiregion_instance(spanner_client, operation_timeout): + multi_region_instance_id = _helpers.unique_id("multi-region") + multi_region_config = "nam3" + config_name = "{}/instanceConfigs/{}".format( + spanner_client.project_name, multi_region_config + ) + create_time = str(int(time.time())) + labels = {"python-spanner-systests": "true", "created": create_time} + multiregion_instance = spanner_client.instance( + instance_id=multi_region_instance_id, + configuration_name=config_name, + labels=labels, + ) + operation = _helpers.retry_429_503(multiregion_instance.create)() + operation.result(operation_timeout) + + yield multiregion_instance + + _helpers.retry_429_503(multiregion_instance.delete)() + + +def test_list_databases(shared_instance, shared_database): + # Since `shared_instance` is newly created in `setUpModule`, the + # database created in `setUpClass` here will be the only one. + database_names = [database.name for database in shared_instance.list_databases()] + assert shared_database.name in database_names + + +def test_create_database(shared_instance, databases_to_delete): + pool = spanner_v1.BurstyPool(labels={"testcase": "create_database"}) + temp_db_id = _helpers.unique_id("temp_db") + temp_db = shared_instance.database(temp_db_id, pool=pool) + operation = temp_db.create() + databases_to_delete.append(temp_db) + + # We want to make sure the operation completes. + operation.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + database_ids = [database.name for database in shared_instance.list_databases()] + assert temp_db.name in database_ids + + +def test_create_database_pitr_invalid_retention_period( + not_emulator, # PITR-lite features are not supported by the emulator + shared_instance, +): + pool = spanner_v1.BurstyPool(labels={"testcase": "create_database_pitr"}) + temp_db_id = _helpers.unique_id("pitr_inv_db", separator="_") + retention_period = "0d" + ddl_statements = [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (version_retention_period = '{retention_period}')" + ] + temp_db = shared_instance.database( + temp_db_id, pool=pool, ddl_statements=ddl_statements + ) + with pytest.raises(exceptions.InvalidArgument): + temp_db.create() + + +def test_create_database_pitr_success( + not_emulator, # PITR-lite features are not supported by the emulator + shared_instance, + databases_to_delete, +): + pool = spanner_v1.BurstyPool(labels={"testcase": "create_database_pitr"}) + temp_db_id = _helpers.unique_id("pitr_db", separator="_") + retention_period = "7d" + ddl_statements = [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (version_retention_period = '{retention_period}')" + ] + temp_db = shared_instance.database( + temp_db_id, pool=pool, ddl_statements=ddl_statements + ) + operation = temp_db.create() + databases_to_delete.append(temp_db) + operation.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + database_ids = [database.name for database in shared_instance.list_databases()] + assert temp_db.name in database_ids + + temp_db.reload() + temp_db.version_retention_period == retention_period + + with temp_db.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT OPTION_VALUE AS version_retention_period " + "FROM INFORMATION_SCHEMA.DATABASE_OPTIONS " + "WHERE SCHEMA_NAME = '' " + "AND OPTION_NAME = 'version_retention_period'" + ) + for result in results: + assert result[0] == retention_period + + +def test_create_database_with_default_leader_success( + not_emulator, # Default leader setting not supported by the emulator + multiregion_instance, + databases_to_delete, +): + pool = spanner_v1.BurstyPool(labels={"testcase": "create_database_default_leader"}) + + temp_db_id = _helpers.unique_id("dflt_ldr_db", separator="_") + default_leader = "us-east4" + ddl_statements = [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (default_leader = '{default_leader}')" + ] + temp_db = multiregion_instance.database( + temp_db_id, pool=pool, ddl_statements=ddl_statements + ) + operation = temp_db.create() + databases_to_delete.append(temp_db) + operation.result(30) # raises on failure / timeout. + + database_ids = [database.name for database in multiregion_instance.list_databases()] + assert temp_db.name in database_ids + + temp_db.reload() + assert temp_db.default_leader == default_leader + + with temp_db.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT OPTION_VALUE AS default_leader " + "FROM INFORMATION_SCHEMA.DATABASE_OPTIONS " + "WHERE SCHEMA_NAME = '' AND OPTION_NAME = 'default_leader'" + ) + for result in results: + assert result[0] == default_leader + + +def test_table_not_found(shared_instance): + temp_db_id = _helpers.unique_id("tbl_not_found", separator="_") + + correct_table = "MyTable" + incorrect_table = "NotMyTable" + + create_table = ( + f"CREATE TABLE {correct_table} (\n" + f" Id STRING(36) NOT NULL,\n" + f" Field1 STRING(36) NOT NULL\n" + f") PRIMARY KEY (Id)" + ) + create_index = f"CREATE INDEX IDX ON {incorrect_table} (Field1)" + + temp_db = shared_instance.database( + temp_db_id, ddl_statements=[create_table, create_index] + ) + with pytest.raises(exceptions.NotFound): + temp_db.create() + + +def test_update_ddl_w_operation_id(shared_instance, databases_to_delete): + # We used to have: + # @pytest.mark.skip( + # reason="'Database.update_ddl' has a flaky timeout. See: " + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5629 + # ) + pool = spanner_v1.BurstyPool(labels={"testcase": "update_database_ddl"}) + temp_db_id = _helpers.unique_id("update_ddl", separator="_") + temp_db = shared_instance.database(temp_db_id, pool=pool) + create_op = temp_db.create() + databases_to_delete.append(temp_db) + create_op.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + # random but shortish always start with letter + operation_id = f"a{str(uuid.uuid4())[:8]}" + operation = temp_db.update_ddl(_helpers.DDL_STATEMENTS, operation_id=operation_id) + + assert operation_id == operation.operation.name.split("/")[-1] + + operation.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + temp_db.reload() + + assert len(temp_db.ddl_statements) == len(_helpers.DDL_STATEMENTS) + + +def test_update_ddl_w_pitr_invalid( + not_emulator, shared_instance, databases_to_delete, +): + pool = spanner_v1.BurstyPool(labels={"testcase": "update_database_ddl_pitr"}) + temp_db_id = _helpers.unique_id("pitr_upd_ddl_inv", separator="_") + retention_period = "0d" + temp_db = shared_instance.database(temp_db_id, pool=pool) + + create_op = temp_db.create() + databases_to_delete.append(temp_db) + create_op.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + assert temp_db.version_retention_period is None + + ddl_statements = _helpers.DDL_STATEMENTS + [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (version_retention_period = '{retention_period}')" + ] + with pytest.raises(exceptions.InvalidArgument): + temp_db.update_ddl(ddl_statements) + + +def test_update_ddl_w_pitr_success( + not_emulator, shared_instance, databases_to_delete, +): + pool = spanner_v1.BurstyPool(labels={"testcase": "update_database_ddl_pitr"}) + temp_db_id = _helpers.unique_id("pitr_upd_ddl_inv", separator="_") + retention_period = "7d" + temp_db = shared_instance.database(temp_db_id, pool=pool) + + create_op = temp_db.create() + databases_to_delete.append(temp_db) + create_op.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + assert temp_db.version_retention_period is None + + ddl_statements = _helpers.DDL_STATEMENTS + [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (version_retention_period = '{retention_period}')" + ] + operation = temp_db.update_ddl(ddl_statements) + operation.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + temp_db.reload() + assert temp_db.version_retention_period == retention_period + assert len(temp_db.ddl_statements) == len(ddl_statements) + + +def test_update_ddl_w_default_leader_success( + not_emulator, multiregion_instance, databases_to_delete, +): + pool = spanner_v1.BurstyPool( + labels={"testcase": "update_database_ddl_default_leader"}, + ) + + temp_db_id = _helpers.unique_id("dfl_ldrr_upd_ddl", separator="_") + default_leader = "us-east4" + temp_db = multiregion_instance.database(temp_db_id, pool=pool) + + create_op = temp_db.create() + databases_to_delete.append(temp_db) + create_op.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + assert temp_db.default_leader is None + + ddl_statements = _helpers.DDL_STATEMENTS + [ + f"ALTER DATABASE {temp_db_id}" + f" SET OPTIONS (default_leader = '{default_leader}')" + ] + operation = temp_db.update_ddl(ddl_statements) + operation.result(DBAPI_OPERATION_TIMEOUT) # raises on failure / timeout. + + temp_db.reload() + assert temp_db.default_leader == default_leader + assert len(temp_db.ddl_statements) == len(ddl_statements) + + +def test_db_batch_insert_then_db_snapshot_read(shared_database): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.TABLE, sd.ALL) + batch.insert(sd.TABLE, sd.COLUMNS, sd.ROW_DATA) + + with shared_database.snapshot(read_timestamp=batch.committed) as snapshot: + from_snap = list(snapshot.read(sd.TABLE, sd.COLUMNS, sd.ALL)) + + sd._check_rows_data(from_snap) + + +def test_db_run_in_transaction_then_snapshot_execute_sql(shared_database): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.TABLE, sd.ALL) + + def _unit_of_work(transaction, test): + rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + assert rows == [] + + transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) + + shared_database.run_in_transaction(_unit_of_work, test=sd) + + with shared_database.snapshot() as after: + rows = list(after.execute_sql(sd.SQL)) + + sd._check_rows_data(rows) + + +def test_db_run_in_transaction_twice(shared_database): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.TABLE, sd.ALL) + + def _unit_of_work(transaction, test): + transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) + + shared_database.run_in_transaction(_unit_of_work, test=sd) + shared_database.run_in_transaction(_unit_of_work, test=sd) + + with shared_database.snapshot() as after: + rows = list(after.execute_sql(sd.SQL)) + sd._check_rows_data(rows) + + +def test_db_run_in_transaction_twice_4181(shared_database): + # See https://github.com/googleapis/google-cloud-python/issues/4181 + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.COUNTERS_TABLE, sd.ALL) + + def _unit_of_work(transaction, name): + transaction.insert(sd.COUNTERS_TABLE, sd.COUNTERS_COLUMNS, [[name, 0]]) + + shared_database.run_in_transaction(_unit_of_work, name="id_1") + + with pytest.raises(exceptions.AlreadyExists): + shared_database.run_in_transaction(_unit_of_work, name="id_1") + + shared_database.run_in_transaction(_unit_of_work, name="id_2") + + with shared_database.snapshot() as after: + rows = list(after.read(sd.COUNTERS_TABLE, sd.COUNTERS_COLUMNS, sd.ALL)) + + assert len(rows) == 2 diff --git a/tests/system/test_dbapi.py b/tests/system/test_dbapi.py new file mode 100644 index 0000000000..17aed8465f --- /dev/null +++ b/tests/system/test_dbapi.py @@ -0,0 +1,352 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import pickle + +import pytest + +from google.cloud import spanner_v1 +from google.cloud.spanner_dbapi.connection import Connection +from . import _helpers + +DATABASE_NAME = "dbapi-txn" + +DDL_STATEMENTS = ( + """CREATE TABLE contacts ( + contact_id INT64, + first_name STRING(1024), + last_name STRING(1024), + email STRING(1024) + ) + PRIMARY KEY (contact_id)""", +) + + +@pytest.fixture(scope="session") +def raw_database(shared_instance, operation_timeout): + databse_id = _helpers.unique_id("dbapi-txn") + pool = spanner_v1.BurstyPool(labels={"testcase": "database_api"}) + database = shared_instance.database( + databse_id, ddl_statements=DDL_STATEMENTS, pool=pool, + ) + op = database.create() + op.result(operation_timeout) # raises on failure / timeout. + + yield database + + database.drop() + + +def clear_table(transaction): + transaction.execute_update("DELETE FROM contacts WHERE true") + + +@pytest.fixture(scope="function") +def dbapi_database(raw_database): + + raw_database.run_in_transaction(clear_table) + + yield raw_database + + raw_database.run_in_transaction(clear_table) + + +def test_commit(shared_instance, dbapi_database): + """Test committing a transaction with several statements.""" + want_row = ( + 1, + "updated-first-name", + "last-name", + "test.email_updated@domen.ru", + ) + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + # execute several DML statements within one transaction + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + cursor.execute( + """ +UPDATE contacts +SET email = 'test.email_updated@domen.ru' +WHERE email = 'test.email@domen.ru' +""" + ) + conn.commit() + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + assert got_rows == [want_row] + + cursor.close() + conn.close() + + +def test_rollback(shared_instance, dbapi_database): + """Test rollbacking a transaction with several statements.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + conn.commit() + + # execute several DMLs with one transaction + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + cursor.execute( + """ +UPDATE contacts +SET email = 'test.email_updated@domen.ru' +WHERE email = 'test.email@domen.ru' +""" + ) + conn.rollback() + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + assert got_rows == [want_row] + + cursor.close() + conn.close() + + +def test_autocommit_mode_change(shared_instance, dbapi_database): + """Test auto committing a transaction on `autocommit` mode change.""" + want_row = ( + 2, + "updated-first-name", + "last-name", + "test.email@domen.ru", + ) + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + conn.autocommit = True + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + + assert got_rows == [want_row] + + cursor.close() + conn.close() + + +def test_rollback_on_connection_closing(shared_instance, dbapi_database): + """ + When closing a connection all the pending transactions + must be rollbacked. Testing if it's working this way. + """ + want_row = (1, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + conn.commit() + + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + conn.close() + + # connect again, as the previous connection is no-op after closing + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + assert got_rows == [want_row] + + cursor.close() + conn.close() + + +def test_results_checksum(shared_instance, dbapi_database): + """Test that results checksum is calculated properly.""" + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES +(1, 'first-name', 'last-name', 'test.email@domen.ru'), +(2, 'first-name2', 'last-name2', 'test.email2@domen.ru') + """ + ) + assert len(conn._statements) == 1 + conn.commit() + + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + + assert len(conn._statements) == 1 + conn.commit() + + checksum = hashlib.sha256() + checksum.update(pickle.dumps(got_rows[0])) + checksum.update(pickle.dumps(got_rows[1])) + + assert cursor._checksum.checksum.digest() == checksum.digest() + + +def test_execute_many(shared_instance, dbapi_database): + # connect to the test database + conn = Connection(shared_instance, dbapi_database) + cursor = conn.cursor() + + row_data = [ + (1, "first-name", "last-name", "test.email@example.com"), + (2, "first-name2", "last-name2", "test.email2@example.com"), + ] + cursor.executemany( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (%s, %s, %s, %s) + """, + row_data, + ) + conn.commit() + + cursor.executemany( + """SELECT * FROM contacts WHERE contact_id = @a1""", ({"a1": 1}, {"a1": 2}), + ) + res = cursor.fetchall() + conn.commit() + + assert len(res) == len(row_data) + for found, expected in zip(res, row_data): + assert found[0] == expected[0] + + # checking that execute() and executemany() + # results are not mixed together + cursor.execute( + """ +SELECT * FROM contacts WHERE contact_id = 1 +""", + ) + res = cursor.fetchone() + conn.commit() + + assert res[0] == 1 + conn.close() + + +def test_DDL_autocommit(shared_instance, dbapi_database): + """Check that DDLs in autocommit mode are immediately executed.""" + conn = Connection(shared_instance, dbapi_database) + conn.autocommit = True + + cur = conn.cursor() + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() + + +def test_DDL_commit(shared_instance, dbapi_database): + """Check that DDLs in commit mode are executed on calling `commit()`.""" + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute( + """ + CREATE TABLE Singers ( + SingerId INT64 NOT NULL, + Name STRING(1024), + ) PRIMARY KEY (SingerId) + """ + ) + conn.commit() + conn.close() + + # if previous DDL wasn't committed, the next DROP TABLE + # statement will fail with a ProgrammingError + conn = Connection(shared_instance, dbapi_database) + cur = conn.cursor() + + cur.execute("DROP TABLE Singers") + conn.commit() diff --git a/tests/system/test_instance_api.py b/tests/system/test_instance_api.py new file mode 100644 index 0000000000..1c9e0d71f0 --- /dev/null +++ b/tests/system/test_instance_api.py @@ -0,0 +1,139 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from test_utils import retry + +from . import _helpers + + +@pytest.fixture(scope="function") +def instances_to_delete(): + to_delete = [] + + yield to_delete + + for instance in to_delete: + _helpers.scrub_instance_ignore_not_found(instance) + + +def test_list_instances( + no_create_instance, spanner_client, existing_instances, shared_instance, +): + instances = list(spanner_client.list_instances()) + + for instance in instances: + assert instance in existing_instances or instance is shared_instance + + +def test_reload_instance(spanner_client, shared_instance_id, shared_instance): + # Use same arguments as shared_instance_id so we can use 'reload()' + # on a fresh instance. + instance = spanner_client.instance(shared_instance_id) + + # Unset metadata before reloading. + instance.display_name = None + + def _expected_display_name(instance): + return instance.display_name == shared_instance.display_name + + retry_until = retry.RetryInstanceState(_expected_display_name) + + retry_until(instance.reload)() + + assert instance.display_name == shared_instance.display_name + + +def test_create_instance( + if_create_instance, + spanner_client, + instance_config, + instances_to_delete, + operation_timeout, +): + alt_instance_id = _helpers.unique_id("new") + instance = spanner_client.instance(alt_instance_id, instance_config.name) + operation = instance.create() + # Make sure this instance gets deleted after the test case. + instances_to_delete.append(instance) + + # We want to make sure the operation completes. + operation.result(operation_timeout) # raises on failure / timeout. + + # Create a new instance instance and make sure it is the same. + instance_alt = spanner_client.instance(alt_instance_id, instance_config.name) + instance_alt.reload() + + assert instance == instance_alt + instance.display_name == instance_alt.display_name + + +def test_create_instance_with_processing_units( + not_emulator, + if_create_instance, + spanner_client, + instance_config, + instances_to_delete, + operation_timeout, +): + alt_instance_id = _helpers.unique_id("wpn") + processing_units = 5000 + instance = spanner_client.instance( + instance_id=alt_instance_id, + configuration_name=instance_config.name, + processing_units=processing_units, + ) + operation = instance.create() + # Make sure this instance gets deleted after the test case. + instances_to_delete.append(instance) + + # We want to make sure the operation completes. + operation.result(operation_timeout) # raises on failure / timeout. + + # Create a new instance instance and make sure it is the same. + instance_alt = spanner_client.instance(alt_instance_id, instance_config.name) + instance_alt.reload() + + assert instance == instance_alt + assert instance.display_name == instance_alt.display_name + assert instance.processing_units == instance_alt.processing_units + + +def test_update_instance( + not_emulator, + spanner_client, + shared_instance, + shared_instance_id, + operation_timeout, +): + old_display_name = shared_instance.display_name + new_display_name = "Foo Bar Baz" + shared_instance.display_name = new_display_name + operation = shared_instance.update() + + # We want to make sure the operation completes. + operation.result(operation_timeout) # raises on failure / timeout. + + # Create a new instance instance and reload it. + instance_alt = spanner_client.instance(shared_instance_id, None) + assert instance_alt.display_name != new_display_name + + instance_alt.reload() + assert instance_alt.display_name == new_display_name + + # Make sure to put the instance back the way it was for the + # other test cases. + shared_instance.display_name = old_display_name + shared_instance.update() diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py new file mode 100644 index 0000000000..665c98e578 --- /dev/null +++ b/tests/system/test_session_api.py @@ -0,0 +1,2159 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import datetime +import decimal +import math +import struct +import threading +import time + +import pytest + +import grpc +from google.rpc import code_pb2 +from google.api_core import datetime_helpers +from google.api_core import exceptions +from google.cloud import spanner_v1 +from google.cloud._helpers import UTC +from tests import _helpers as ot_helpers +from . import _helpers +from . import _sample_data + + +SOME_DATE = datetime.date(2011, 1, 17) +SOME_TIME = datetime.datetime(1989, 1, 17, 17, 59, 12, 345612) +NANO_TIME = datetime_helpers.DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321) +POS_INF = float("+inf") +NEG_INF = float("-inf") +(OTHER_NAN,) = struct.unpack(" + _check_sql_results( + database, + sql="SELECT @v", + params={"v": single_value}, + param_types={"v": spanner_v1.Type(code=type_name)}, + expected=[(single_value,)], + order=False, + recurse_into_lists=recurse_into_lists, + ) + + # Bind a null + _check_sql_results( + database, + sql="SELECT @v", + params={"v": None}, + param_types={"v": spanner_v1.Type(code=type_name)}, + expected=[(None,)], + order=False, + recurse_into_lists=recurse_into_lists, + ) + + # Bind an array of + array_element_type = spanner_v1.Type(code=type_name) + array_type = spanner_v1.Type( + code=spanner_v1.TypeCode.ARRAY, array_element_type=array_element_type + ) + + if expected_array_value is None: + expected_array_value = array_value + + _check_sql_results( + database, + sql="SELECT @v", + params={"v": array_value}, + param_types={"v": array_type}, + expected=[(expected_array_value,)], + order=False, + recurse_into_lists=recurse_into_lists, + ) + + # Bind an empty array of + _check_sql_results( + database, + sql="SELECT @v", + params={"v": []}, + param_types={"v": array_type}, + expected=[([],)], + order=False, + recurse_into_lists=recurse_into_lists, + ) + + # Bind a null array of + _check_sql_results( + database, + sql="SELECT @v", + params={"v": None}, + param_types={"v": array_type}, + expected=[(None,)], + order=False, + recurse_into_lists=recurse_into_lists, + ) + + +def test_execute_sql_w_string_bindings(sessions_database): + _bind_test_helper( + sessions_database, spanner_v1.TypeCode.STRING, "Phred", ["Phred", "Bharney"] + ) + + +def test_execute_sql_w_bool_bindings(sessions_database): + _bind_test_helper( + sessions_database, spanner_v1.TypeCode.BOOL, True, [True, False, True] + ) + + +def test_execute_sql_w_int64_bindings(sessions_database): + _bind_test_helper(sessions_database, spanner_v1.TypeCode.INT64, 42, [123, 456, 789]) + + +def test_execute_sql_w_float64_bindings(sessions_database): + _bind_test_helper( + sessions_database, spanner_v1.TypeCode.FLOAT64, 42.3, [12.3, 456.0, 7.89] + ) + + +def test_execute_sql_w_float_bindings_transfinite(sessions_database): + + # Find -inf + _check_sql_results( + sessions_database, + sql="SELECT @neg_inf", + params={"neg_inf": NEG_INF}, + param_types={"neg_inf": spanner_v1.param_types.FLOAT64}, + expected=[(NEG_INF,)], + order=False, + ) + + # Find +inf + _check_sql_results( + sessions_database, + sql="SELECT @pos_inf", + params={"pos_inf": POS_INF}, + param_types={"pos_inf": spanner_v1.param_types.FLOAT64}, + expected=[(POS_INF,)], + order=False, + ) + + +def test_execute_sql_w_bytes_bindings(sessions_database): + _bind_test_helper( + sessions_database, + spanner_v1.TypeCode.BYTES, + b"DEADBEEF", + [b"FACEDACE", b"DEADBEEF"], + ) + + +def test_execute_sql_w_timestamp_bindings(sessions_database): + + timestamp_1 = datetime_helpers.DatetimeWithNanoseconds( + 1989, 1, 17, 17, 59, 12, nanosecond=345612789 + ) + + timestamp_2 = datetime_helpers.DatetimeWithNanoseconds( + 1989, 1, 17, 17, 59, 13, nanosecond=456127893 + ) + + timestamps = [timestamp_1, timestamp_2] + + # In round-trip, timestamps acquire a timezone value. + expected_timestamps = [timestamp.replace(tzinfo=UTC) for timestamp in timestamps] + + _bind_test_helper( + sessions_database, + spanner_v1.TypeCode.TIMESTAMP, + timestamp_1, + timestamps, + expected_timestamps, + recurse_into_lists=False, + ) + + +def test_execute_sql_w_date_bindings(sessions_database): + dates = [SOME_DATE, SOME_DATE + datetime.timedelta(days=1)] + _bind_test_helper(sessions_database, spanner_v1.TypeCode.DATE, SOME_DATE, dates) + + +def test_execute_sql_w_numeric_bindings(not_emulator, sessions_database): + _bind_test_helper( + sessions_database, + spanner_v1.TypeCode.NUMERIC, + NUMERIC_1, + [NUMERIC_1, NUMERIC_2], + ) + + +def test_execute_sql_w_query_param_struct(sessions_database): + name = "Phred" + count = 123 + size = 23.456 + height = 188.0 + weight = 97.6 + param_types = spanner_v1.param_types + + record_type = param_types.Struct( + [ + param_types.StructField("name", param_types.STRING), + param_types.StructField("count", param_types.INT64), + param_types.StructField("size", param_types.FLOAT64), + param_types.StructField( + "nested", + param_types.Struct( + [ + param_types.StructField("height", param_types.FLOAT64), + param_types.StructField("weight", param_types.FLOAT64), + ] + ), + ), + ] + ) + + # Query with null struct, explicit type + _check_sql_results( + sessions_database, + sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", + params={"r": None}, + param_types={"r": record_type}, + expected=[(None, None, None, None)], + order=False, + ) + + # Query with non-null struct, explicit type, NULL values + _check_sql_results( + sessions_database, + sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", + params={"r": (None, None, None, None)}, + param_types={"r": record_type}, + expected=[(None, None, None, None)], + order=False, + ) + + # Query with non-null struct, explicit type, nested NULL values + _check_sql_results( + sessions_database, + sql="SELECT @r.nested.weight", + params={"r": (None, None, None, (None, None))}, + param_types={"r": record_type}, + expected=[(None,)], + order=False, + ) + + # Query with non-null struct, explicit type + _check_sql_results( + sessions_database, + sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", + params={"r": (name, count, size, (height, weight))}, + param_types={"r": record_type}, + expected=[(name, count, size, weight)], + order=False, + ) + + # Query with empty struct, explicitly empty type + empty_type = param_types.Struct([]) + _check_sql_results( + sessions_database, + sql="SELECT @r IS NULL", + params={"r": ()}, + param_types={"r": empty_type}, + expected=[(False,)], + order=False, + ) + + # Query with null struct, explicitly empty type + _check_sql_results( + sessions_database, + sql="SELECT @r IS NULL", + params={"r": None}, + param_types={"r": empty_type}, + expected=[(True,)], + order=False, + ) + + # Query with equality check for struct value + struct_equality_query = ( + "SELECT " '@struct_param=STRUCT(1,"bob")' + ) + struct_type = param_types.Struct( + [ + param_types.StructField("threadf", param_types.INT64), + param_types.StructField("userf", param_types.STRING), + ] + ) + _check_sql_results( + sessions_database, + sql=struct_equality_query, + params={"struct_param": (1, "bob")}, + param_types={"struct_param": struct_type}, + expected=[(True,)], + order=False, + ) + + # Query with nullness test for struct + _check_sql_results( + sessions_database, + sql="SELECT @struct_param IS NULL", + params={"struct_param": None}, + param_types={"struct_param": struct_type}, + expected=[(True,)], + order=False, + ) + + # Query with null array-of-struct + array_elem_type = param_types.Struct( + [param_types.StructField("threadid", param_types.INT64)] + ) + array_type = param_types.Array(array_elem_type) + _check_sql_results( + sessions_database, + sql="SELECT a.threadid FROM UNNEST(@struct_arr_param) a", + params={"struct_arr_param": None}, + param_types={"struct_arr_param": array_type}, + expected=[], + order=False, + ) + + # Query with non-null array-of-struct + _check_sql_results( + sessions_database, + sql="SELECT a.threadid FROM UNNEST(@struct_arr_param) a", + params={"struct_arr_param": [(123,), (456,)]}, + param_types={"struct_arr_param": array_type}, + expected=[(123,), (456,)], + order=False, + ) + + # Query with null array-of-struct field + struct_type_with_array_field = param_types.Struct( + [ + param_types.StructField("intf", param_types.INT64), + param_types.StructField("arraysf", array_type), + ] + ) + _check_sql_results( + sessions_database, + sql="SELECT a.threadid FROM UNNEST(@struct_param.arraysf) a", + params={"struct_param": (123, None)}, + param_types={"struct_param": struct_type_with_array_field}, + expected=[], + order=False, + ) + + # Query with non-null array-of-struct field + _check_sql_results( + sessions_database, + sql="SELECT a.threadid FROM UNNEST(@struct_param.arraysf) a", + params={"struct_param": (123, ((456,), (789,)))}, + param_types={"struct_param": struct_type_with_array_field}, + expected=[(456,), (789,)], + order=False, + ) + + # Query with anonymous / repeated-name fields + anon_repeated_array_elem_type = param_types.Struct( + [ + param_types.StructField("", param_types.INT64), + param_types.StructField("", param_types.STRING), + ] + ) + anon_repeated_array_type = param_types.Array(anon_repeated_array_elem_type) + _check_sql_results( + sessions_database, + sql="SELECT CAST(t as STRUCT).* " + "FROM UNNEST(@struct_param) t", + params={"struct_param": [(123, "abcdef")]}, + param_types={"struct_param": anon_repeated_array_type}, + expected=[(123, "abcdef")], + order=False, + ) + + # Query and return a struct parameter + value_type = param_types.Struct( + [ + param_types.StructField("message", param_types.STRING), + param_types.StructField("repeat", param_types.INT64), + ] + ) + value_query = ( + "SELECT ARRAY(SELECT AS STRUCT message, repeat " + "FROM (SELECT @value.message AS message, " + "@value.repeat AS repeat)) AS value" + ) + _check_sql_results( + sessions_database, + sql=value_query, + params={"value": ("hello", 1)}, + param_types={"value": value_type}, + expected=[([["hello", 1]],)], + order=False, + ) + + +def test_execute_sql_returning_transfinite_floats(sessions_database): + + with sessions_database.snapshot(multi_use=True) as snapshot: + # Query returning -inf, +inf, NaN as column values + rows = list( + snapshot.execute_sql( + "SELECT " + 'CAST("-inf" AS FLOAT64), ' + 'CAST("+inf" AS FLOAT64), ' + 'CAST("NaN" AS FLOAT64)' + ) + ) + assert len(rows) == 1 + assert rows[0][0] == float("-inf") + assert rows[0][1] == float("+inf") + # NaNs cannot be compared by equality. + assert math.isnan(rows[0][2]) + + # Query returning array of -inf, +inf, NaN as one column + rows = list( + snapshot.execute_sql( + "SELECT" + ' [CAST("-inf" AS FLOAT64),' + ' CAST("+inf" AS FLOAT64),' + ' CAST("NaN" AS FLOAT64)]' + ) + ) + assert len(rows) == 1 + + float_array = rows[0][0] + assert float_array[0] == float("-inf") + assert float_array[1] == float("+inf") + + # NaNs cannot be searched for by equality. + assert math.isnan(float_array[2]) + + +def test_partition_query(sessions_database): + row_count = 40 + sql = f"SELECT * FROM {_sample_data.TABLE}" + committed = _set_up_table(sessions_database, row_count) + + # Paritioned query does not support ORDER BY + all_data_rows = set(_row_data(row_count)) + union = set() + batch_txn = sessions_database.batch_snapshot(read_timestamp=committed) + for batch in batch_txn.generate_query_batches(sql): + p_results_iter = batch_txn.process(batch) + # Lists aren't hashable so the results need to be converted + rows = [tuple(result) for result in p_results_iter] + union.update(set(rows)) + + assert union == all_data_rows + batch_txn.close() + + +class FauxCall: + def __init__(self, code, details="FauxCall"): + self._code = code + self._details = details + + def initial_metadata(self): + return {} + + def trailing_metadata(self): + return {} + + def code(self): + return self._code + + def details(self): + return self._details + + +def _check_batch_status(status_code, expected=code_pb2.OK): + if status_code != expected: + + _status_code_to_grpc_status_code = { + member.value[0]: member for member in grpc.StatusCode + } + grpc_status_code = _status_code_to_grpc_status_code[status_code] + call = FauxCall(status_code) + raise exceptions.from_grpc_status( + grpc_status_code, "batch_update failed", errors=[call] + ) diff --git a/tests/system/test_streaming_chunking.py b/tests/system/test_streaming_chunking.py new file mode 100644 index 0000000000..5dded09d64 --- /dev/null +++ b/tests/system/test_streaming_chunking.py @@ -0,0 +1,75 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from tests.system.utils import streaming_utils + +_RUN_POPULATE_STREAMING = """\ +Run 'tests/system/utils/populate_streaming.py' to enable these tests.""" + + +@pytest.fixture(scope="session") +def streaming_instance(spanner_client): + instance = spanner_client.instance(streaming_utils.INSTANCE_NAME) + if not instance.exists(): + pytest.skip(_RUN_POPULATE_STREAMING) + + yield instance + + +@pytest.fixture(scope="session") +def streaming_database(streaming_instance): + database = streaming_instance.database(streaming_utils.DATABASE_NAME) + if not database.exists(): + pytest.skip(_RUN_POPULATE_STREAMING) + + yield database + + +def _verify_one_column(db, table_desc): + sql = f"SELECT chunk_me FROM {table_desc.table}" + with db.snapshot() as snapshot: + rows = list(snapshot.execute_sql(sql)) + assert len(rows) == table_desc.row_count + expected = table_desc.value() + for row in rows: + assert row[0] == expected + + +def _verify_two_columns(db, table_desc): + sql = f"SELECT chunk_me, chunk_me_2 FROM {table_desc.table}" + with db.snapshot() as snapshot: + rows = list(snapshot.execute_sql(sql)) + assert len(rows) == table_desc.row_count + expected = table_desc.value() + for row in rows: + assert row[0] == expected + assert row[1] == expected + + +def test_four_kay(streaming_database): + _verify_one_column(streaming_database, streaming_utils.FOUR_KAY) + + +def test_forty_kay(streaming_database): + _verify_one_column(streaming_database, streaming_utils.FORTY_KAY) + + +def test_four_hundred_kay(streaming_database): + _verify_one_column(streaming_database, streaming_utils.FOUR_HUNDRED_KAY) + + +def test_four_meg(streaming_database): + _verify_two_columns(streaming_database, streaming_utils.FOUR_MEG) diff --git a/tests/system/test_system.py b/tests/system/test_system.py deleted file mode 100644 index 845e79f805..0000000000 --- a/tests/system/test_system.py +++ /dev/null @@ -1,3200 +0,0 @@ -# Copyright 2016 Google LLC All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import collections -import datetime -import decimal -import math -import operator -import os -import struct -import threading -import time -import unittest -import uuid - -import grpc -from google.rpc import code_pb2 - -from google.api_core import exceptions -from google.api_core.datetime_helpers import DatetimeWithNanoseconds - -from google.cloud.spanner_v1 import param_types -from google.cloud.spanner_v1 import TypeCode -from google.cloud.spanner_v1 import Type - -from google.cloud._helpers import UTC -from google.cloud.spanner_v1 import BurstyPool -from google.cloud.spanner_v1 import COMMIT_TIMESTAMP -from google.cloud.spanner_v1 import Client -from google.cloud.spanner_v1 import KeyRange -from google.cloud.spanner_v1 import KeySet -from google.cloud.spanner_v1.instance import Backup -from google.cloud.spanner_v1.instance import Instance -from google.cloud.spanner_v1.table import Table -from google.cloud.spanner_v1 import RequestOptions - -from test_utils.retry import RetryErrors -from test_utils.retry import RetryInstanceState -from test_utils.retry import RetryResult -from test_utils.system import unique_resource_id - -from tests._fixtures import DDL_STATEMENTS -from tests._fixtures import EMULATOR_DDL_STATEMENTS -from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED - - -CREATE_INSTANCE = os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None -USE_EMULATOR = os.getenv("SPANNER_EMULATOR_HOST") is not None -SKIP_BACKUP_TESTS = os.getenv("SKIP_BACKUP_TESTS") is not None -SPANNER_OPERATION_TIMEOUT_IN_SECONDS = int( - os.getenv("SPANNER_OPERATION_TIMEOUT_IN_SECONDS", 60) -) - -if CREATE_INSTANCE: - INSTANCE_ID = "google-cloud" + unique_resource_id("-") -else: - INSTANCE_ID = os.environ.get( - "GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE", "google-cloud-python-systest" - ) -MULTI_REGION_INSTANCE_ID = "multi-region" + unique_resource_id("-") -EXISTING_INSTANCES = [] -COUNTERS_TABLE = "counters" -COUNTERS_COLUMNS = ("name", "value") - -BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "net.host.name": "spanner.googleapis.com", -} - -_STATUS_CODE_TO_GRPC_STATUS_CODE = { - member.value[0]: member for member in grpc.StatusCode -} - - -class Config(object): - """Run-time configuration to be modified at set-up. - - This is a mutable stand-in to allow test set-up to modify - global state. - """ - - CLIENT = None - INSTANCE_CONFIG = None - INSTANCE = None - - -def _has_all_ddl(database): - ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS - return len(database.ddl_statements) == len(ddl_statements) - - -def _list_instances(): - return list(Config.CLIENT.list_instances()) - - -def setUpModule(): - if USE_EMULATOR: - from google.auth.credentials import AnonymousCredentials - - emulator_project = os.getenv("GCLOUD_PROJECT", "emulator-test-project") - Config.CLIENT = Client( - project=emulator_project, credentials=AnonymousCredentials() - ) - else: - Config.CLIENT = Client() - retry = RetryErrors(exceptions.ServiceUnavailable) - - configs = list(retry(Config.CLIENT.list_instance_configs)()) - - instances = retry(_list_instances)() - EXISTING_INSTANCES[:] = instances - - # Delete test instances that are older than an hour. - cutoff = int(time.time()) - 1 * 60 * 60 - instance_pbs = Config.CLIENT.list_instances("labels.python-spanner-systests:true") - for instance_pb in instance_pbs: - instance = Instance.from_pb(instance_pb, Config.CLIENT) - if "created" not in instance.labels: - continue - create_time = int(instance.labels["created"]) - if create_time > cutoff: - continue - # Instance cannot be deleted while backups exist. - for backup_pb in instance.list_backups(): - backup = Backup.from_pb(backup_pb, instance) - backup.delete() - instance.delete() - - if CREATE_INSTANCE: - if not USE_EMULATOR: - # Defend against back-end returning configs for regions we aren't - # actually allowed to use. - configs = [config for config in configs if "-us-" in config.name] - - if not configs: - raise ValueError("List instance configs failed in module set up.") - - Config.INSTANCE_CONFIG = configs[0] - config_name = configs[0].name - create_time = str(int(time.time())) - labels = {"python-spanner-systests": "true", "created": create_time} - - Config.INSTANCE = Config.CLIENT.instance( - INSTANCE_ID, config_name, labels=labels - ) - created_op = Config.INSTANCE.create() - created_op.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # block until completion - - else: - Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID) - Config.INSTANCE.reload() - - -def tearDownModule(): - if CREATE_INSTANCE: - Config.INSTANCE.delete() - - -class TestInstanceAdminAPI(unittest.TestCase): - def setUp(self): - self.instances_to_delete = [] - - def tearDown(self): - for instance in self.instances_to_delete: - instance.delete() - - @unittest.skipIf( - CREATE_INSTANCE, "This test fails when system tests are run in parallel." - ) - def test_list_instances(self): - instances = list(Config.CLIENT.list_instances()) - # We have added one new instance in `setUpModule`. - if CREATE_INSTANCE: - self.assertEqual(len(instances), len(EXISTING_INSTANCES) + 1) - for instance in instances: - instance_existence = ( - instance in EXISTING_INSTANCES or instance == Config.INSTANCE - ) - self.assertTrue(instance_existence) - - def test_reload_instance(self): - # Use same arguments as Config.INSTANCE (created in `setUpModule`) - # so we can use reload() on a fresh instance. - instance = Config.CLIENT.instance(INSTANCE_ID) - # Make sure metadata unset before reloading. - instance.display_name = None - - def _expected_display_name(instance): - return instance.display_name == Config.INSTANCE.display_name - - retry = RetryInstanceState(_expected_display_name) - - retry(instance.reload)() - - self.assertEqual(instance.display_name, Config.INSTANCE.display_name) - - @unittest.skipUnless(CREATE_INSTANCE, "Skipping instance creation") - def test_create_instance(self): - ALT_INSTANCE_ID = "new" + unique_resource_id("-") - instance = Config.CLIENT.instance(ALT_INSTANCE_ID, Config.INSTANCE_CONFIG.name) - operation = instance.create() - # Make sure this instance gets deleted after the test case. - self.instances_to_delete.append(instance) - - # We want to make sure the operation completes. - operation.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - # Create a new instance instance and make sure it is the same. - instance_alt = Config.CLIENT.instance( - ALT_INSTANCE_ID, Config.INSTANCE_CONFIG.name - ) - instance_alt.reload() - - self.assertEqual(instance, instance_alt) - self.assertEqual(instance.display_name, instance_alt.display_name) - - @unittest.skipIf(USE_EMULATOR, "Skipping LCI tests") - @unittest.skipUnless(CREATE_INSTANCE, "Skipping instance creation") - def test_create_instance_with_processing_nodes(self): - ALT_INSTANCE_ID = "new" + unique_resource_id("-") - PROCESSING_UNITS = 5000 - instance = Config.CLIENT.instance( - instance_id=ALT_INSTANCE_ID, - configuration_name=Config.INSTANCE_CONFIG.name, - processing_units=PROCESSING_UNITS, - ) - operation = instance.create() - # Make sure this instance gets deleted after the test case. - self.instances_to_delete.append(instance) - - # We want to make sure the operation completes. - operation.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - # Create a new instance instance and make sure it is the same. - instance_alt = Config.CLIENT.instance( - ALT_INSTANCE_ID, Config.INSTANCE_CONFIG.name - ) - instance_alt.reload() - - self.assertEqual(instance, instance_alt) - self.assertEqual(instance.display_name, instance_alt.display_name) - self.assertEqual(instance.processing_units, instance_alt.processing_units) - - @unittest.skipIf(USE_EMULATOR, "Skipping updating instance") - def test_update_instance(self): - OLD_DISPLAY_NAME = Config.INSTANCE.display_name - NEW_DISPLAY_NAME = "Foo Bar Baz" - Config.INSTANCE.display_name = NEW_DISPLAY_NAME - operation = Config.INSTANCE.update() - - # We want to make sure the operation completes. - operation.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - # Create a new instance instance and reload it. - instance_alt = Config.CLIENT.instance(INSTANCE_ID, None) - self.assertNotEqual(instance_alt.display_name, NEW_DISPLAY_NAME) - instance_alt.reload() - self.assertEqual(instance_alt.display_name, NEW_DISPLAY_NAME) - - # Make sure to put the instance back the way it was for the - # other test cases. - Config.INSTANCE.display_name = OLD_DISPLAY_NAME - Config.INSTANCE.update() - - -class _TestData(object): - TABLE = "contacts" - COLUMNS = ("contact_id", "first_name", "last_name", "email") - ROW_DATA = ( - (1, u"Phred", u"Phlyntstone", u"phred@example.com"), - (2, u"Bharney", u"Rhubble", u"bharney@example.com"), - (3, u"Wylma", u"Phlyntstone", u"wylma@example.com"), - ) - ALL = KeySet(all_=True) - SQL = "SELECT * FROM contacts ORDER BY contact_id" - - _recurse_into_lists = True - - def _assert_timestamp(self, value, nano_value): - self.assertIsInstance(value, datetime.datetime) - self.assertIsNone(value.tzinfo) - self.assertIs(nano_value.tzinfo, UTC) - - self.assertEqual(value.year, nano_value.year) - self.assertEqual(value.month, nano_value.month) - self.assertEqual(value.day, nano_value.day) - self.assertEqual(value.hour, nano_value.hour) - self.assertEqual(value.minute, nano_value.minute) - self.assertEqual(value.second, nano_value.second) - self.assertEqual(value.microsecond, nano_value.microsecond) - if isinstance(value, DatetimeWithNanoseconds): - self.assertEqual(value.nanosecond, nano_value.nanosecond) - else: - self.assertEqual(value.microsecond * 1000, nano_value.nanosecond) - - def _check_rows_data(self, rows_data, expected=None): - if expected is None: - expected = self.ROW_DATA - - self.assertEqual(len(rows_data), len(expected)) - for row, expected in zip(rows_data, expected): - self._check_row_data(row, expected) - - def _check_row_data(self, row_data, expected): - self.assertEqual(len(row_data), len(expected)) - for found_cell, expected_cell in zip(row_data, expected): - self._check_cell_data(found_cell, expected_cell) - - def _check_cell_data(self, found_cell, expected_cell): - if isinstance(found_cell, DatetimeWithNanoseconds): - self._assert_timestamp(expected_cell, found_cell) - elif isinstance(found_cell, float) and math.isnan(found_cell): - self.assertTrue(math.isnan(expected_cell)) - elif isinstance(found_cell, list) and self._recurse_into_lists: - self.assertEqual(len(found_cell), len(expected_cell)) - for found_item, expected_item in zip(found_cell, expected_cell): - self._check_cell_data(found_item, expected_item) - else: - self.assertEqual(found_cell, expected_cell) - - -class TestDatabaseAPI(unittest.TestCase, _TestData): - DATABASE_NAME = "test_database" + unique_resource_id("_") - - @classmethod - def setUpClass(cls): - pool = BurstyPool(labels={"testcase": "database_api"}) - ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS - cls._db = Config.INSTANCE.database( - cls.DATABASE_NAME, ddl_statements=ddl_statements, pool=pool - ) - operation = cls._db.create() - operation.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - # Create a multi-region instance - multi_region_config = "nam3" - config_name = "{}/instanceConfigs/{}".format( - Config.CLIENT.project_name, multi_region_config - ) - create_time = str(int(time.time())) - labels = {"python-spanner-systests": "true", "created": create_time} - cls._instance = Config.CLIENT.instance( - instance_id=MULTI_REGION_INSTANCE_ID, - configuration_name=config_name, - labels=labels, - ) - operation = cls._instance.create() - operation.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) - - @classmethod - def tearDownClass(cls): - cls._db.drop() - cls._instance.delete() - - def setUp(self): - self.to_delete = [] - - def tearDown(self): - for doomed in self.to_delete: - doomed.drop() - - def test_list_databases(self): - # Since `Config.INSTANCE` is newly created in `setUpModule`, the - # database created in `setUpClass` here will be the only one. - database_names = [ - database.name for database in Config.INSTANCE.list_databases() - ] - self.assertTrue(self._db.name in database_names) - - def test_create_database(self): - pool = BurstyPool(labels={"testcase": "create_database"}) - temp_db_id = "temp_db" + unique_resource_id("_") - temp_db = Config.INSTANCE.database(temp_db_id, pool=pool) - operation = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - operation.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - database_ids = [database.name for database in Config.INSTANCE.list_databases()] - self.assertIn(temp_db.name, database_ids) - - @unittest.skipIf( - USE_EMULATOR, "PITR-lite features are not supported by the emulator" - ) - def test_create_database_pitr_invalid_retention_period(self): - pool = BurstyPool(labels={"testcase": "create_database_pitr"}) - temp_db_id = "temp_db" + unique_resource_id("_") - retention_period = "0d" - ddl_statements = [ - "ALTER DATABASE {}" - " SET OPTIONS (version_retention_period = '{}')".format( - temp_db_id, retention_period - ) - ] - temp_db = Config.INSTANCE.database( - temp_db_id, pool=pool, ddl_statements=ddl_statements - ) - with self.assertRaises(exceptions.InvalidArgument): - temp_db.create() - - @unittest.skipIf( - USE_EMULATOR, "PITR-lite features are not supported by the emulator" - ) - def test_create_database_pitr_success(self): - pool = BurstyPool(labels={"testcase": "create_database_pitr"}) - temp_db_id = "temp_db" + unique_resource_id("_") - retention_period = "7d" - ddl_statements = [ - "ALTER DATABASE {}" - " SET OPTIONS (version_retention_period = '{}')".format( - temp_db_id, retention_period - ) - ] - temp_db = Config.INSTANCE.database( - temp_db_id, pool=pool, ddl_statements=ddl_statements - ) - operation = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - operation.result(30) # raises on failure / timeout. - - database_ids = [database.name for database in Config.INSTANCE.list_databases()] - self.assertIn(temp_db.name, database_ids) - - temp_db.reload() - self.assertEqual(temp_db.version_retention_period, retention_period) - - with temp_db.snapshot() as snapshot: - results = snapshot.execute_sql( - "SELECT OPTION_VALUE AS version_retention_period " - "FROM INFORMATION_SCHEMA.DATABASE_OPTIONS " - "WHERE SCHEMA_NAME = '' AND OPTION_NAME = 'version_retention_period'" - ) - for result in results: - self.assertEqual(result[0], retention_period) - - @unittest.skipIf( - USE_EMULATOR, "Default leader setting is not supported by the emulator" - ) - def test_create_database_with_default_leader_success(self): - pool = BurstyPool(labels={"testcase": "create_database_default_leader"}) - - temp_db_id = "temp_db" + unique_resource_id("_") - default_leader = "us-east4" - ddl_statements = [ - "ALTER DATABASE {}" - " SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader) - ] - temp_db = self._instance.database( - temp_db_id, pool=pool, ddl_statements=ddl_statements - ) - operation = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - operation.result(30) # raises on failure / timeout. - - database_ids = [database.name for database in self._instance.list_databases()] - self.assertIn(temp_db.name, database_ids) - - temp_db.reload() - self.assertEqual(temp_db.default_leader, default_leader) - - with temp_db.snapshot() as snapshot: - results = snapshot.execute_sql( - "SELECT OPTION_VALUE AS default_leader " - "FROM INFORMATION_SCHEMA.DATABASE_OPTIONS " - "WHERE SCHEMA_NAME = '' AND OPTION_NAME = 'default_leader'" - ) - for result in results: - self.assertEqual(result[0], default_leader) - - def test_table_not_found(self): - temp_db_id = "temp_db" + unique_resource_id("_") - - correct_table = "MyTable" - incorrect_table = "NotMyTable" - self.assertNotEqual(correct_table, incorrect_table) - - create_table = ( - "CREATE TABLE {} (\n" - " Id STRING(36) NOT NULL,\n" - " Field1 STRING(36) NOT NULL\n" - ") PRIMARY KEY (Id)" - ).format(correct_table) - index = "CREATE INDEX IDX ON {} (Field1)".format(incorrect_table) - - temp_db = Config.INSTANCE.database( - temp_db_id, ddl_statements=[create_table, index] - ) - self.to_delete.append(temp_db) - with self.assertRaises(exceptions.NotFound): - temp_db.create() - - @unittest.skip( - ( - "update_dataset_ddl() has a flaky timeout" - "https://github.com/GoogleCloudPlatform/google-cloud-python/issues/" - "5629" - ) - ) - def test_update_database_ddl_with_operation_id(self): - pool = BurstyPool(labels={"testcase": "update_database_ddl"}) - temp_db_id = "temp_db" + unique_resource_id("_") - temp_db = Config.INSTANCE.database(temp_db_id, pool=pool) - create_op = temp_db.create() - self.to_delete.append(temp_db) - ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS - - # We want to make sure the operation completes. - create_op.result(240) # raises on failure / timeout. - # random but shortish always start with letter - operation_id = "a" + str(uuid.uuid4())[:8] - operation = temp_db.update_ddl(ddl_statements, operation_id=operation_id) - - self.assertEqual(operation_id, operation.operation.name.split("/")[-1]) - - # We want to make sure the operation completes. - operation.result(240) # raises on failure / timeout. - - temp_db.reload() - - self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements)) - - @unittest.skipIf( - USE_EMULATOR, "PITR-lite features are not supported by the emulator" - ) - def test_update_database_ddl_pitr_invalid(self): - pool = BurstyPool(labels={"testcase": "update_database_ddl_pitr"}) - temp_db_id = "temp_db" + unique_resource_id("_") - retention_period = "0d" - temp_db = Config.INSTANCE.database(temp_db_id, pool=pool) - create_op = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - create_op.result(240) # raises on failure / timeout. - - self.assertIsNone(temp_db.version_retention_period) - - ddl_statements = DDL_STATEMENTS + [ - "ALTER DATABASE {}" - " SET OPTIONS (version_retention_period = '{}')".format( - temp_db_id, retention_period - ) - ] - with self.assertRaises(exceptions.InvalidArgument): - temp_db.update_ddl(ddl_statements) - - @unittest.skipIf( - USE_EMULATOR, "PITR-lite features are not supported by the emulator" - ) - def test_update_database_ddl_pitr_success(self): - pool = BurstyPool(labels={"testcase": "update_database_ddl_pitr"}) - temp_db_id = "temp_db" + unique_resource_id("_") - retention_period = "7d" - temp_db = Config.INSTANCE.database(temp_db_id, pool=pool) - create_op = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - create_op.result(240) # raises on failure / timeout. - - self.assertIsNone(temp_db.version_retention_period) - - ddl_statements = DDL_STATEMENTS + [ - "ALTER DATABASE {}" - " SET OPTIONS (version_retention_period = '{}')".format( - temp_db_id, retention_period - ) - ] - operation = temp_db.update_ddl(ddl_statements) - - # We want to make sure the operation completes. - operation.result(240) # raises on failure / timeout. - - temp_db.reload() - self.assertEqual(temp_db.version_retention_period, retention_period) - self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements)) - - @unittest.skipIf( - USE_EMULATOR, "Default leader update is not supported by the emulator" - ) - def test_update_database_ddl_default_leader_success(self): - pool = BurstyPool(labels={"testcase": "update_database_ddl_default_leader"}) - - temp_db_id = "temp_db" + unique_resource_id("_") - default_leader = "us-east4" - temp_db = self._instance.database(temp_db_id, pool=pool) - create_op = temp_db.create() - self.to_delete.append(temp_db) - - # We want to make sure the operation completes. - create_op.result(240) # raises on failure / timeout. - - self.assertIsNone(temp_db.default_leader) - - ddl_statements = DDL_STATEMENTS + [ - "ALTER DATABASE {}" - " SET OPTIONS (default_leader = '{}')".format(temp_db_id, default_leader) - ] - operation = temp_db.update_ddl(ddl_statements) - - # We want to make sure the operation completes. - operation.result(240) # raises on failure / timeout. - - temp_db.reload() - self.assertEqual(temp_db.default_leader, default_leader) - self.assertEqual(len(temp_db.ddl_statements), len(ddl_statements)) - - def test_db_batch_insert_then_db_snapshot_read(self): - retry = RetryInstanceState(_has_all_ddl) - retry(self._db.reload)() - - with self._db.batch() as batch: - batch.delete(self.TABLE, self.ALL) - batch.insert(self.TABLE, self.COLUMNS, self.ROW_DATA) - - with self._db.snapshot(read_timestamp=batch.committed) as snapshot: - from_snap = list(snapshot.read(self.TABLE, self.COLUMNS, self.ALL)) - - self._check_rows_data(from_snap) - - def test_db_run_in_transaction_then_snapshot_execute_sql(self): - retry = RetryInstanceState(_has_all_ddl) - retry(self._db.reload)() - - with self._db.batch() as batch: - batch.delete(self.TABLE, self.ALL) - - def _unit_of_work(transaction, test): - rows = list(transaction.read(test.TABLE, test.COLUMNS, self.ALL)) - test.assertEqual(rows, []) - - transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) - - self._db.run_in_transaction(_unit_of_work, test=self) - - with self._db.snapshot() as after: - rows = list(after.execute_sql(self.SQL)) - self._check_rows_data(rows) - - def test_db_run_in_transaction_twice(self): - retry = RetryInstanceState(_has_all_ddl) - retry(self._db.reload)() - - with self._db.batch() as batch: - batch.delete(self.TABLE, self.ALL) - - def _unit_of_work(transaction, test): - transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) - - self._db.run_in_transaction(_unit_of_work, test=self) - self._db.run_in_transaction(_unit_of_work, test=self) - - with self._db.snapshot() as after: - rows = list(after.execute_sql(self.SQL)) - self._check_rows_data(rows) - - def test_db_run_in_transaction_twice_4181(self): - retry = RetryInstanceState(_has_all_ddl) - retry(self._db.reload)() - - with self._db.batch() as batch: - batch.delete(COUNTERS_TABLE, self.ALL) - - def _unit_of_work(transaction, name): - transaction.insert(COUNTERS_TABLE, COUNTERS_COLUMNS, [[name, 0]]) - - self._db.run_in_transaction(_unit_of_work, name="id_1") - - with self.assertRaises(exceptions.AlreadyExists): - self._db.run_in_transaction(_unit_of_work, name="id_1") - - self._db.run_in_transaction(_unit_of_work, name="id_2") - - with self._db.snapshot() as after: - rows = list(after.read(COUNTERS_TABLE, COUNTERS_COLUMNS, self.ALL)) - self.assertEqual(len(rows), 2) - - -class TestTableAPI(unittest.TestCase, _TestData): - DATABASE_NAME = "test_database" + unique_resource_id("_") - - @classmethod - def setUpClass(cls): - pool = BurstyPool(labels={"testcase": "database_api"}) - ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS - cls._db = Config.INSTANCE.database( - cls.DATABASE_NAME, ddl_statements=ddl_statements, pool=pool - ) - operation = cls._db.create() - operation.result(30) # raises on failure / timeout. - - @classmethod - def tearDownClass(cls): - cls._db.drop() - - def test_exists(self): - table = Table("all_types", self._db) - self.assertTrue(table.exists()) - - def test_exists_not_found(self): - table = Table("table_does_not_exist", self._db) - self.assertFalse(table.exists()) - - def test_list_tables(self): - tables = self._db.list_tables() - table_ids = set(table.table_id for table in tables) - self.assertIn("contacts", table_ids) - self.assertIn("contact_phones", table_ids) - self.assertIn("all_types", table_ids) - - def test_list_tables_reload(self): - tables = self._db.list_tables() - for table in tables: - self.assertTrue(table.exists()) - schema = table.schema - self.assertIsInstance(schema, list) - - def test_reload_not_found(self): - table = Table("table_does_not_exist", self._db) - with self.assertRaises(exceptions.NotFound): - table.reload() - - def test_schema(self): - table = Table("all_types", self._db) - schema = table.schema - names_and_types = set((field.name, field.type_.code) for field in schema) - self.assertIn(("pkey", TypeCode.INT64), names_and_types) - self.assertIn(("int_value", TypeCode.INT64), names_and_types) - self.assertIn(("int_array", TypeCode.ARRAY), names_and_types) - self.assertIn(("bool_value", TypeCode.BOOL), names_and_types) - self.assertIn(("bytes_value", TypeCode.BYTES), names_and_types) - self.assertIn(("date_value", TypeCode.DATE), names_and_types) - self.assertIn(("float_value", TypeCode.FLOAT64), names_and_types) - self.assertIn(("string_value", TypeCode.STRING), names_and_types) - self.assertIn(("timestamp_value", TypeCode.TIMESTAMP), names_and_types) - - -@unittest.skipIf(USE_EMULATOR, "Skipping backup tests") -@unittest.skipIf(SKIP_BACKUP_TESTS, "Skipping backup tests") -class TestBackupAPI(unittest.TestCase, _TestData): - DATABASE_NAME = "test_database" + unique_resource_id("_") - DATABASE_NAME_2 = "test_database2" + unique_resource_id("_") - - @classmethod - def setUpClass(cls): - from datetime import datetime - - pool = BurstyPool(labels={"testcase": "database_api"}) - ddl_statements = EMULATOR_DDL_STATEMENTS if USE_EMULATOR else DDL_STATEMENTS - db1 = Config.INSTANCE.database( - cls.DATABASE_NAME, ddl_statements=ddl_statements, pool=pool - ) - db2 = Config.INSTANCE.database(cls.DATABASE_NAME_2, pool=pool) - cls._db = db1 - cls._dbs = [db1, db2] - op1 = db1.create() - op2 = db2.create() - op1.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) # raises on failure / timeout. - op2.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) # raises on failure / timeout. - cls.database_version_time = datetime.utcnow().replace(tzinfo=UTC) - - current_config = Config.INSTANCE.configuration_name - same_config_instance_id = "same-config" + unique_resource_id("-") - create_time = str(int(time.time())) - labels = {"python-spanner-systests": "true", "created": create_time} - cls._same_config_instance = Config.CLIENT.instance( - same_config_instance_id, current_config, labels=labels - ) - op = cls._same_config_instance.create() - op.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) - cls._instances = [cls._same_config_instance] - - retry = RetryErrors(exceptions.ServiceUnavailable) - configs = list(retry(Config.CLIENT.list_instance_configs)()) - diff_configs = [ - config.name - for config in configs - if "-us-" in config.name and config.name is not current_config - ] - cls._diff_config_instance = None - if len(diff_configs) > 0: - diff_config_instance_id = "diff-config" + unique_resource_id("-") - create_time = str(int(time.time())) - labels = {"python-spanner-systests": "true", "created": create_time} - cls._diff_config_instance = Config.CLIENT.instance( - diff_config_instance_id, diff_configs[0], labels=labels - ) - op = cls._diff_config_instance.create() - op.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) - cls._instances.append(cls._diff_config_instance) - - @classmethod - def tearDownClass(cls): - for db in cls._dbs: - db.drop() - for instance in cls._instances: - instance.delete() - - def setUp(self): - self.to_delete = [] - self.to_drop = [] - - def tearDown(self): - for doomed in self.to_delete: - doomed.delete() - for doomed in self.to_drop: - doomed.drop() - - def test_create_invalid(self): - from datetime import datetime - from pytz import UTC - - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() - expire_time = expire_time.replace(tzinfo=UTC) - - backup = Config.INSTANCE.backup( - backup_id, database=self._db, expire_time=expire_time - ) - - with self.assertRaises(exceptions.InvalidArgument): - op = backup.create() - op.result() - - def test_backup_workflow(self): - from google.cloud.spanner_admin_database_v1 import ( - CreateBackupEncryptionConfig, - EncryptionConfig, - EncryptionInfo, - RestoreDatabaseEncryptionConfig, - ) - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - instance = Config.INSTANCE - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - encryption_config = CreateBackupEncryptionConfig( - encryption_type=CreateBackupEncryptionConfig.EncryptionType.GOOGLE_DEFAULT_ENCRYPTION, - ) - - # Create backup. - backup = instance.backup( - backup_id, - database=self._db, - expire_time=expire_time, - version_time=self.database_version_time, - encryption_config=encryption_config, - ) - operation = backup.create() - self.to_delete.append(backup) - - # Check metadata. - metadata = operation.metadata - self.assertEqual(backup.name, metadata.name) - self.assertEqual(self._db.name, metadata.database) - operation.result() - - # Check backup object. - backup.reload() - self.assertEqual(self._db.name, backup._database) - self.assertEqual(expire_time, backup.expire_time) - self.assertIsNotNone(backup.create_time) - self.assertEqual(self.database_version_time, backup.version_time) - self.assertIsNotNone(backup.size_bytes) - self.assertIsNotNone(backup.state) - self.assertEqual( - EncryptionInfo.Type.GOOGLE_DEFAULT_ENCRYPTION, - backup.encryption_info.encryption_type, - ) - - # Update with valid argument. - valid_expire_time = datetime.utcnow() + timedelta(days=7) - valid_expire_time = valid_expire_time.replace(tzinfo=UTC) - backup.update_expire_time(valid_expire_time) - self.assertEqual(valid_expire_time, backup.expire_time) - - # Restore database to same instance. - restored_id = "restored_db" + unique_resource_id("_") - encryption_config = RestoreDatabaseEncryptionConfig( - encryption_type=RestoreDatabaseEncryptionConfig.EncryptionType.GOOGLE_DEFAULT_ENCRYPTION, - ) - database = instance.database(restored_id, encryption_config=encryption_config) - self.to_drop.append(database) - operation = database.restore(source=backup) - restored_db = operation.result() - self.assertEqual( - self.database_version_time, - restored_db.restore_info.backup_info.version_time, - ) - - metadata = operation.metadata - self.assertEqual(self.database_version_time, metadata.backup_info.version_time) - database.reload() - expected_encryption_config = EncryptionConfig() - self.assertEqual(expected_encryption_config, database.encryption_config) - - database.drop() - backup.delete() - self.assertFalse(backup.exists()) - - def test_backup_version_time_defaults_to_create_time(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - instance = Config.INSTANCE - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - - # Create backup. - backup = instance.backup(backup_id, database=self._db, expire_time=expire_time,) - operation = backup.create() - self.to_delete.append(backup) - - # Check metadata. - metadata = operation.metadata - self.assertEqual(backup.name, metadata.name) - self.assertEqual(self._db.name, metadata.database) - operation.result() - - # Check backup object. - backup.reload() - self.assertEqual(self._db.name, backup._database) - self.assertIsNotNone(backup.create_time) - self.assertEqual(backup.create_time, backup.version_time) - - backup.delete() - self.assertFalse(backup.exists()) - - def test_create_backup_invalid_version_time_past(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - version_time = datetime.utcnow() - timedelta(days=10) - version_time = version_time.replace(tzinfo=UTC) - - backup = Config.INSTANCE.backup( - backup_id, - database=self._db, - expire_time=expire_time, - version_time=version_time, - ) - - with self.assertRaises(exceptions.InvalidArgument): - op = backup.create() - op.result() - - def test_create_backup_invalid_version_time_future(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - version_time = datetime.utcnow() + timedelta(days=2) - version_time = version_time.replace(tzinfo=UTC) - - backup = Config.INSTANCE.backup( - backup_id, - database=self._db, - expire_time=expire_time, - version_time=version_time, - ) - - with self.assertRaises(exceptions.InvalidArgument): - op = backup.create() - op.result() - - def test_restore_to_diff_instance(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - backup_id = "backup_id" + unique_resource_id("_") - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - - # Create backup. - backup = Config.INSTANCE.backup( - backup_id, database=self._db, expire_time=expire_time - ) - op = backup.create() - self.to_delete.append(backup) - op.result() - - # Restore database to different instance with same config. - restored_id = "restored_db" + unique_resource_id("_") - database = self._same_config_instance.database(restored_id) - self.to_drop.append(database) - operation = database.restore(source=backup) - operation.result() - - database.drop() - backup.delete() - self.assertFalse(backup.exists()) - - def test_multi_create_cancel_update_error_restore_errors(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - backup_id_1 = "backup_id1" + unique_resource_id("_") - backup_id_2 = "backup_id2" + unique_resource_id("_") - - instance = Config.INSTANCE - expire_time = datetime.utcnow() + timedelta(days=3) - expire_time = expire_time.replace(tzinfo=UTC) - - backup1 = instance.backup( - backup_id_1, database=self._dbs[0], expire_time=expire_time - ) - backup2 = instance.backup( - backup_id_2, database=self._dbs[1], expire_time=expire_time - ) - - # Create two backups. - op1 = backup1.create() - op2 = backup2.create() - self.to_delete.extend([backup1, backup2]) - - backup1.reload() - self.assertFalse(backup1.is_ready()) - backup2.reload() - self.assertFalse(backup2.is_ready()) - - # Cancel a create operation. - op2.cancel() - self.assertTrue(op2.cancelled()) - - op1.result() - backup1.reload() - self.assertTrue(backup1.is_ready()) - - # Update expire time to invalid value. - invalid_expire_time = datetime.now() + timedelta(days=366) - invalid_expire_time = invalid_expire_time.replace(tzinfo=UTC) - with self.assertRaises(exceptions.InvalidArgument): - backup1.update_expire_time(invalid_expire_time) - - # Restore to existing database. - with self.assertRaises(exceptions.AlreadyExists): - self._db.restore(source=backup1) - - # Restore to instance with different config. - if self._diff_config_instance is not None: - return - new_db = self._diff_config_instance.database("diff_config") - op = new_db.create() - op.result(SPANNER_OPERATION_TIMEOUT_IN_SECONDS) - self.to_drop.append(new_db) - with self.assertRaises(exceptions.InvalidArgument): - new_db.restore(source=backup1) - - def test_list_backups(self): - from datetime import datetime - from datetime import timedelta - from pytz import UTC - - backup_id_1 = "backup_id1" + unique_resource_id("_") - backup_id_2 = "backup_id2" + unique_resource_id("_") - - instance = Config.INSTANCE - expire_time_1 = datetime.utcnow() + timedelta(days=21) - expire_time_1 = expire_time_1.replace(tzinfo=UTC) - - backup1 = Config.INSTANCE.backup( - backup_id_1, - database=self._dbs[0], - expire_time=expire_time_1, - version_time=self.database_version_time, - ) - - expire_time_2 = datetime.utcnow() + timedelta(days=1) - expire_time_2 = expire_time_2.replace(tzinfo=UTC) - backup2 = Config.INSTANCE.backup( - backup_id_2, database=self._dbs[1], expire_time=expire_time_2 - ) - - # Create two backups. - op1 = backup1.create() - op1.result() - backup1.reload() - create_time_compare = datetime.utcnow().replace(tzinfo=UTC) - - backup2.create() - self.to_delete.extend([backup1, backup2]) - - # List backups filtered by state. - filter_ = "state:CREATING" - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup2.name) - - # List backups filtered by backup name. - filter_ = "name:{0}".format(backup_id_1) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup1.name) - - # List backups filtered by database name. - filter_ = "database:{0}".format(self._dbs[0].name) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup1.name) - - # List backups filtered by create time. - filter_ = 'create_time > "{0}"'.format( - create_time_compare.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - ) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup2.name) - - # List backups filtered by version time. - filter_ = 'version_time > "{0}"'.format( - create_time_compare.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - ) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup2.name) - - # List backups filtered by expire time. - filter_ = 'expire_time > "{0}"'.format( - expire_time_1.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - ) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup1.name) - - # List backups filtered by size bytes. - filter_ = "size_bytes < {0}".format(backup1.size_bytes) - for backup in instance.list_backups(filter_=filter_): - self.assertEqual(backup.name, backup2.name) - - # List backups using pagination. - count = 0 - for page in instance.list_backups(page_size=1): - count += 1 - self.assertEqual(count, 2) - - -SOME_DATE = datetime.date(2011, 1, 17) -SOME_TIME = datetime.datetime(1989, 1, 17, 17, 59, 12, 345612) -NANO_TIME = DatetimeWithNanoseconds(1995, 8, 31, nanosecond=987654321) -POS_INF = float("+inf") -NEG_INF = float("-inf") -(OTHER_NAN,) = struct.unpack(" - self._check_sql_results( - self._db, - sql="SELECT @v", - params={"v": single_value}, - param_types={"v": Type(code=type_name)}, - expected=[(single_value,)], - order=False, - ) - - # Bind a null - self._check_sql_results( - self._db, - sql="SELECT @v", - params={"v": None}, - param_types={"v": Type(code=type_name)}, - expected=[(None,)], - order=False, - ) - - # Bind an array of - array_type = Type(code=TypeCode.ARRAY, array_element_type=Type(code=type_name)) - - if expected_array_value is None: - expected_array_value = array_value - - self._check_sql_results( - self._db, - sql="SELECT @v", - params={"v": array_value}, - param_types={"v": array_type}, - expected=[(expected_array_value,)], - order=False, - ) - - # Bind an empty array of - self._check_sql_results( - self._db, - sql="SELECT @v", - params={"v": []}, - param_types={"v": array_type}, - expected=[([],)], - order=False, - ) - - # Bind a null array of - self._check_sql_results( - self._db, - sql="SELECT @v", - params={"v": None}, - param_types={"v": array_type}, - expected=[(None,)], - order=False, - ) - - def test_execute_sql_w_string_bindings(self): - self._bind_test_helper(TypeCode.STRING, "Phred", ["Phred", "Bharney"]) - - def test_execute_sql_w_bool_bindings(self): - self._bind_test_helper(TypeCode.BOOL, True, [True, False, True]) - - def test_execute_sql_w_int64_bindings(self): - self._bind_test_helper(TypeCode.INT64, 42, [123, 456, 789]) - - def test_execute_sql_w_float64_bindings(self): - self._bind_test_helper(TypeCode.FLOAT64, 42.3, [12.3, 456.0, 7.89]) - - def test_execute_sql_w_float_bindings_transfinite(self): - - # Find -inf - self._check_sql_results( - self._db, - sql="SELECT @neg_inf", - params={"neg_inf": NEG_INF}, - param_types={"neg_inf": param_types.FLOAT64}, - expected=[(NEG_INF,)], - order=False, - ) - - # Find +inf - self._check_sql_results( - self._db, - sql="SELECT @pos_inf", - params={"pos_inf": POS_INF}, - param_types={"pos_inf": param_types.FLOAT64}, - expected=[(POS_INF,)], - order=False, - ) - - def test_execute_sql_w_bytes_bindings(self): - self._bind_test_helper(TypeCode.BYTES, b"DEADBEEF", [b"FACEDACE", b"DEADBEEF"]) - - def test_execute_sql_w_timestamp_bindings(self): - import pytz - from google.api_core.datetime_helpers import DatetimeWithNanoseconds - - timestamp_1 = DatetimeWithNanoseconds( - 1989, 1, 17, 17, 59, 12, nanosecond=345612789 - ) - - timestamp_2 = DatetimeWithNanoseconds( - 1989, 1, 17, 17, 59, 13, nanosecond=456127893 - ) - - timestamps = [timestamp_1, timestamp_2] - - # In round-trip, timestamps acquire a timezone value. - expected_timestamps = [ - timestamp.replace(tzinfo=pytz.UTC) for timestamp in timestamps - ] - - self._recurse_into_lists = False - self._bind_test_helper( - TypeCode.TIMESTAMP, timestamp_1, timestamps, expected_timestamps - ) - - def test_execute_sql_w_date_bindings(self): - import datetime - - dates = [SOME_DATE, SOME_DATE + datetime.timedelta(days=1)] - self._bind_test_helper(TypeCode.DATE, SOME_DATE, dates) - - @unittest.skipIf(USE_EMULATOR, "Skipping NUMERIC") - def test_execute_sql_w_numeric_bindings(self): - self._bind_test_helper(TypeCode.NUMERIC, NUMERIC_1, [NUMERIC_1, NUMERIC_2]) - - def test_execute_sql_w_query_param_struct(self): - name = "Phred" - count = 123 - size = 23.456 - height = 188.0 - weight = 97.6 - - record_type = param_types.Struct( - [ - param_types.StructField("name", param_types.STRING), - param_types.StructField("count", param_types.INT64), - param_types.StructField("size", param_types.FLOAT64), - param_types.StructField( - "nested", - param_types.Struct( - [ - param_types.StructField("height", param_types.FLOAT64), - param_types.StructField("weight", param_types.FLOAT64), - ] - ), - ), - ] - ) - - # Query with null struct, explicit type - self._check_sql_results( - self._db, - sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", - params={"r": None}, - param_types={"r": record_type}, - expected=[(None, None, None, None)], - order=False, - ) - - # Query with non-null struct, explicit type, NULL values - self._check_sql_results( - self._db, - sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", - params={"r": (None, None, None, None)}, - param_types={"r": record_type}, - expected=[(None, None, None, None)], - order=False, - ) - - # Query with non-null struct, explicit type, nested NULL values - self._check_sql_results( - self._db, - sql="SELECT @r.nested.weight", - params={"r": (None, None, None, (None, None))}, - param_types={"r": record_type}, - expected=[(None,)], - order=False, - ) - - # Query with non-null struct, explicit type - self._check_sql_results( - self._db, - sql="SELECT @r.name, @r.count, @r.size, @r.nested.weight", - params={"r": (name, count, size, (height, weight))}, - param_types={"r": record_type}, - expected=[(name, count, size, weight)], - order=False, - ) - - # Query with empty struct, explicitly empty type - empty_type = param_types.Struct([]) - self._check_sql_results( - self._db, - sql="SELECT @r IS NULL", - params={"r": ()}, - param_types={"r": empty_type}, - expected=[(False,)], - order=False, - ) - - # Query with null struct, explicitly empty type - self._check_sql_results( - self._db, - sql="SELECT @r IS NULL", - params={"r": None}, - param_types={"r": empty_type}, - expected=[(True,)], - order=False, - ) - - # Query with equality check for struct value - struct_equality_query = ( - "SELECT " '@struct_param=STRUCT(1,"bob")' - ) - struct_type = param_types.Struct( - [ - param_types.StructField("threadf", param_types.INT64), - param_types.StructField("userf", param_types.STRING), - ] - ) - self._check_sql_results( - self._db, - sql=struct_equality_query, - params={"struct_param": (1, "bob")}, - param_types={"struct_param": struct_type}, - expected=[(True,)], - order=False, - ) - - # Query with nullness test for struct - self._check_sql_results( - self._db, - sql="SELECT @struct_param IS NULL", - params={"struct_param": None}, - param_types={"struct_param": struct_type}, - expected=[(True,)], - order=False, - ) - - # Query with null array-of-struct - array_elem_type = param_types.Struct( - [param_types.StructField("threadid", param_types.INT64)] - ) - array_type = param_types.Array(array_elem_type) - self._check_sql_results( - self._db, - sql="SELECT a.threadid FROM UNNEST(@struct_arr_param) a", - params={"struct_arr_param": None}, - param_types={"struct_arr_param": array_type}, - expected=[], - order=False, - ) - - # Query with non-null array-of-struct - self._check_sql_results( - self._db, - sql="SELECT a.threadid FROM UNNEST(@struct_arr_param) a", - params={"struct_arr_param": [(123,), (456,)]}, - param_types={"struct_arr_param": array_type}, - expected=[(123,), (456,)], - order=False, - ) - - # Query with null array-of-struct field - struct_type_with_array_field = param_types.Struct( - [ - param_types.StructField("intf", param_types.INT64), - param_types.StructField("arraysf", array_type), - ] - ) - self._check_sql_results( - self._db, - sql="SELECT a.threadid FROM UNNEST(@struct_param.arraysf) a", - params={"struct_param": (123, None)}, - param_types={"struct_param": struct_type_with_array_field}, - expected=[], - order=False, - ) - - # Query with non-null array-of-struct field - self._check_sql_results( - self._db, - sql="SELECT a.threadid FROM UNNEST(@struct_param.arraysf) a", - params={"struct_param": (123, ((456,), (789,)))}, - param_types={"struct_param": struct_type_with_array_field}, - expected=[(456,), (789,)], - order=False, - ) - - # Query with anonymous / repeated-name fields - anon_repeated_array_elem_type = param_types.Struct( - [ - param_types.StructField("", param_types.INT64), - param_types.StructField("", param_types.STRING), - ] - ) - anon_repeated_array_type = param_types.Array(anon_repeated_array_elem_type) - self._check_sql_results( - self._db, - sql="SELECT CAST(t as STRUCT).* " - "FROM UNNEST(@struct_param) t", - params={"struct_param": [(123, "abcdef")]}, - param_types={"struct_param": anon_repeated_array_type}, - expected=[(123, "abcdef")], - order=False, - ) - - # Query and return a struct parameter - value_type = param_types.Struct( - [ - param_types.StructField("message", param_types.STRING), - param_types.StructField("repeat", param_types.INT64), - ] - ) - value_query = ( - "SELECT ARRAY(SELECT AS STRUCT message, repeat " - "FROM (SELECT @value.message AS message, " - "@value.repeat AS repeat)) AS value" - ) - self._check_sql_results( - self._db, - sql=value_query, - params={"value": ("hello", 1)}, - param_types={"value": value_type}, - expected=[([["hello", 1]],)], - order=False, - ) - - def test_execute_sql_returning_transfinite_floats(self): - - with self._db.snapshot(multi_use=True) as snapshot: - # Query returning -inf, +inf, NaN as column values - rows = list( - snapshot.execute_sql( - "SELECT " - 'CAST("-inf" AS FLOAT64), ' - 'CAST("+inf" AS FLOAT64), ' - 'CAST("NaN" AS FLOAT64)' - ) - ) - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0][0], float("-inf")) - self.assertEqual(rows[0][1], float("+inf")) - # NaNs cannot be compared by equality. - self.assertTrue(math.isnan(rows[0][2])) - - # Query returning array of -inf, +inf, NaN as one column - rows = list( - snapshot.execute_sql( - "SELECT" - ' [CAST("-inf" AS FLOAT64),' - ' CAST("+inf" AS FLOAT64),' - ' CAST("NaN" AS FLOAT64)]' - ) - ) - self.assertEqual(len(rows), 1) - float_array = rows[0][0] - self.assertEqual(float_array[0], float("-inf")) - self.assertEqual(float_array[1], float("+inf")) - # NaNs cannot be searched for by equality. - self.assertTrue(math.isnan(float_array[2])) - - def test_partition_query(self): - row_count = 40 - sql = "SELECT * FROM {}".format(self.TABLE) - committed = self._set_up_table(row_count) - - # Paritioned query does not support ORDER BY - all_data_rows = set(self._row_data(row_count)) - union = set() - batch_txn = self._db.batch_snapshot(read_timestamp=committed) - for batch in batch_txn.generate_query_batches(sql): - p_results_iter = batch_txn.process(batch) - # Lists aren't hashable so the results need to be converted - rows = [tuple(result) for result in p_results_iter] - union.update(set(rows)) - - self.assertEqual(union, all_data_rows) - batch_txn.close() - - -class TestStreamingChunking(unittest.TestCase, _TestData): - @classmethod - def setUpClass(cls): - from tests.system.utils.streaming_utils import INSTANCE_NAME - from tests.system.utils.streaming_utils import DATABASE_NAME - - instance = Config.CLIENT.instance(INSTANCE_NAME) - if not instance.exists(): - raise unittest.SkipTest( - "Run 'tests/system/utils/populate_streaming.py' to enable." - ) - - database = instance.database(DATABASE_NAME) - if not instance.exists(): - raise unittest.SkipTest( - "Run 'tests/system/utils/populate_streaming.py' to enable." - ) - - cls._db = database - - def _verify_one_column(self, table_desc): - sql = "SELECT chunk_me FROM {}".format(table_desc.table) - with self._db.snapshot() as snapshot: - rows = list(snapshot.execute_sql(sql)) - self.assertEqual(len(rows), table_desc.row_count) - expected = table_desc.value() - for row in rows: - self.assertEqual(row[0], expected) - - def _verify_two_columns(self, table_desc): - sql = "SELECT chunk_me, chunk_me_2 FROM {}".format(table_desc.table) - with self._db.snapshot() as snapshot: - rows = list(snapshot.execute_sql(sql)) - self.assertEqual(len(rows), table_desc.row_count) - expected = table_desc.value() - for row in rows: - self.assertEqual(row[0], expected) - self.assertEqual(row[1], expected) - - def test_four_kay(self): - from tests.system.utils.streaming_utils import FOUR_KAY - - self._verify_one_column(FOUR_KAY) - - def test_forty_kay(self): - from tests.system.utils.streaming_utils import FORTY_KAY - - self._verify_one_column(FORTY_KAY) - - def test_four_hundred_kay(self): - from tests.system.utils.streaming_utils import FOUR_HUNDRED_KAY - - self._verify_one_column(FOUR_HUNDRED_KAY) - - def test_four_meg(self): - from tests.system.utils.streaming_utils import FOUR_MEG - - self._verify_two_columns(FOUR_MEG) - - -class CustomException(Exception): - """Placeholder for any user-defined exception.""" - - -class _DatabaseDropper(object): - """Helper for cleaning up databases created on-the-fly.""" - - def __init__(self, db): - self._db = db - - def delete(self): - self._db.drop() - - -class _ReadAbortTrigger(object): - """Helper for tests provoking abort-during-read.""" - - KEY1 = "key1" - KEY2 = "key2" - - def __init__(self): - self.provoker_started = threading.Event() - self.provoker_done = threading.Event() - self.handler_running = threading.Event() - self.handler_done = threading.Event() - - def _provoke_abort_unit_of_work(self, transaction): - keyset = KeySet(keys=[(self.KEY1,)]) - rows = list(transaction.read(COUNTERS_TABLE, COUNTERS_COLUMNS, keyset)) - - assert len(rows) == 1 - row = rows[0] - value = row[1] - - self.provoker_started.set() - - self.handler_running.wait() - - transaction.update(COUNTERS_TABLE, COUNTERS_COLUMNS, [[self.KEY1, value + 1]]) - - def provoke_abort(self, database): - database.run_in_transaction(self._provoke_abort_unit_of_work) - self.provoker_done.set() - - def _handle_abort_unit_of_work(self, transaction): - keyset_1 = KeySet(keys=[(self.KEY1,)]) - rows_1 = list(transaction.read(COUNTERS_TABLE, COUNTERS_COLUMNS, keyset_1)) - - assert len(rows_1) == 1 - row_1 = rows_1[0] - value_1 = row_1[1] - - self.handler_running.set() - - self.provoker_done.wait() - - keyset_2 = KeySet(keys=[(self.KEY2,)]) - rows_2 = list(transaction.read(COUNTERS_TABLE, COUNTERS_COLUMNS, keyset_2)) - - assert len(rows_2) == 1 - row_2 = rows_2[0] - value_2 = row_2[1] - - transaction.update( - COUNTERS_TABLE, COUNTERS_COLUMNS, [[self.KEY2, value_1 + value_2]] - ) - - def handle_abort(self, database): - database.run_in_transaction(self._handle_abort_unit_of_work) - self.handler_done.set() - - -class FauxCall(object): - def __init__(self, code, details="FauxCall"): - self._code = code - self._details = details - - def initial_metadata(self): - return {} - - def trailing_metadata(self): - return {} - - def code(self): - return self._code - - def details(self): - return self._details diff --git a/tests/system/test_system_dbapi.py b/tests/system/test_system_dbapi.py deleted file mode 100644 index 28636a561c..0000000000 --- a/tests/system/test_system_dbapi.py +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright 2016 Google LLC All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import hashlib -import os -import pickle -import time -import unittest - -from google.api_core import exceptions - -from google.cloud.spanner_v1 import BurstyPool -from google.cloud.spanner_v1 import Client -from google.cloud.spanner_v1.instance import Backup -from google.cloud.spanner_v1.instance import Instance - -from google.cloud.spanner_dbapi.connection import Connection - -from test_utils.retry import RetryErrors - -from .test_system import ( - CREATE_INSTANCE, - EXISTING_INSTANCES, - INSTANCE_ID, - USE_EMULATOR, - _list_instances, - Config, -) - - -SPANNER_OPERATION_TIMEOUT_IN_SECONDS = int( - os.getenv("SPANNER_OPERATION_TIMEOUT_IN_SECONDS", 60) -) - - -def setUpModule(): - if USE_EMULATOR: - from google.auth.credentials import AnonymousCredentials - - emulator_project = os.getenv("GCLOUD_PROJECT", "emulator-test-project") - Config.CLIENT = Client( - project=emulator_project, credentials=AnonymousCredentials() - ) - else: - Config.CLIENT = Client() - retry = RetryErrors(exceptions.ServiceUnavailable) - - configs = list(retry(Config.CLIENT.list_instance_configs)()) - - instances = retry(_list_instances)() - EXISTING_INSTANCES[:] = instances - - # Delete test instances that are older than an hour. - cutoff = int(time.time()) - 1 * 60 * 60 - for instance_pb in Config.CLIENT.list_instances( - "labels.python-spanner-dbapi-systests:true" - ): - instance = Instance.from_pb(instance_pb, Config.CLIENT) - if "created" not in instance.labels: - continue - create_time = int(instance.labels["created"]) - if create_time > cutoff: - continue - # Instance cannot be deleted while backups exist. - for backup_pb in instance.list_backups(): - backup = Backup.from_pb(backup_pb, instance) - backup.delete() - instance.delete() - - if CREATE_INSTANCE: - if not USE_EMULATOR: - # Defend against back-end returning configs for regions we aren't - # actually allowed to use. - configs = [config for config in configs if "-us-" in config.name] - - if not configs: - raise ValueError("List instance configs failed in module set up.") - - Config.INSTANCE_CONFIG = configs[0] - config_name = configs[0].name - create_time = str(int(time.time())) - labels = {"python-spanner-dbapi-systests": "true", "created": create_time} - - Config.INSTANCE = Config.CLIENT.instance( - INSTANCE_ID, config_name, labels=labels - ) - created_op = Config.INSTANCE.create() - created_op.result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # block until completion - - else: - Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID) - Config.INSTANCE.reload() - - -def tearDownModule(): - if CREATE_INSTANCE: - Config.INSTANCE.delete() - - -class TestTransactionsManagement(unittest.TestCase): - """Transactions management support tests.""" - - DATABASE_NAME = "db-api-transactions-management" - - DDL_STATEMENTS = ( - """CREATE TABLE contacts ( - contact_id INT64, - first_name STRING(1024), - last_name STRING(1024), - email STRING(1024) - ) - PRIMARY KEY (contact_id)""", - ) - - @classmethod - def setUpClass(cls): - """Create a test database.""" - cls._db = Config.INSTANCE.database( - cls.DATABASE_NAME, - ddl_statements=cls.DDL_STATEMENTS, - pool=BurstyPool(labels={"testcase": "database_api"}), - ) - cls._db.create().result( - SPANNER_OPERATION_TIMEOUT_IN_SECONDS - ) # raises on failure / timeout. - - @classmethod - def tearDownClass(cls): - """Delete the test database.""" - cls._db.drop() - - def tearDown(self): - """Clear the test table after every test.""" - self._db.run_in_transaction(clear_table) - - def test_commit(self): - """Test committing a transaction with several statements.""" - want_row = ( - 1, - "updated-first-name", - "last-name", - "test.email_updated@domen.ru", - ) - # connect to the test database - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - # execute several DML statements within one transaction - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - cursor.execute( - """ -UPDATE contacts -SET email = 'test.email_updated@domen.ru' -WHERE email = 'test.email@domen.ru' -""" - ) - conn.commit() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - self.assertEqual(got_rows, [want_row]) - - cursor.close() - conn.close() - - def test_rollback(self): - """Test rollbacking a transaction with several statements.""" - want_row = (2, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() - - # execute several DMLs with one transaction - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - cursor.execute( - """ -UPDATE contacts -SET email = 'test.email_updated@domen.ru' -WHERE email = 'test.email@domen.ru' -""" - ) - conn.rollback() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - self.assertEqual(got_rows, [want_row]) - - cursor.close() - conn.close() - - def test_autocommit_mode_change(self): - """Test auto committing a transaction on `autocommit` mode change.""" - want_row = ( - 2, - "updated-first-name", - "last-name", - "test.email@domen.ru", - ) - # connect to the test database - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.autocommit = True - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - - self.assertEqual(got_rows, [want_row]) - - cursor.close() - conn.close() - - def test_rollback_on_connection_closing(self): - """ - When closing a connection all the pending transactions - must be rollbacked. Testing if it's working this way. - """ - want_row = (1, "first-name", "last-name", "test.email@domen.ru") - # connect to the test database - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') - """ - ) - conn.commit() - - cursor.execute( - """ -UPDATE contacts -SET first_name = 'updated-first-name' -WHERE first_name = 'first-name' -""" - ) - conn.close() - - # connect again, as the previous connection is no-op after closing - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - # read the resulting data from the database - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - conn.commit() - - self.assertEqual(got_rows, [want_row]) - - cursor.close() - conn.close() - - def test_results_checksum(self): - """Test that results checksum is calculated properly.""" - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - cursor.execute( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES - (1, 'first-name', 'last-name', 'test.email@domen.ru'), - (2, 'first-name2', 'last-name2', 'test.email2@domen.ru') - """ - ) - self.assertEqual(len(conn._statements), 1) - conn.commit() - - cursor.execute("SELECT * FROM contacts") - got_rows = cursor.fetchall() - - self.assertEqual(len(conn._statements), 1) - conn.commit() - - checksum = hashlib.sha256() - checksum.update(pickle.dumps(got_rows[0])) - checksum.update(pickle.dumps(got_rows[1])) - - self.assertEqual(cursor._checksum.checksum.digest(), checksum.digest()) - - def test_execute_many(self): - # connect to the test database - conn = Connection(Config.INSTANCE, self._db) - cursor = conn.cursor() - - cursor.executemany( - """ -INSERT INTO contacts (contact_id, first_name, last_name, email) -VALUES (%s, %s, %s, %s) - """, - [ - (1, "first-name", "last-name", "test.email@example.com"), - (2, "first-name2", "last-name2", "test.email2@example.com"), - ], - ) - conn.commit() - - cursor.executemany( - """SELECT * FROM contacts WHERE contact_id = @a1""", ({"a1": 1}, {"a1": 2}), - ) - res = cursor.fetchall() - conn.commit() - - self.assertEqual(len(res), 2) - self.assertEqual(res[0][0], 1) - self.assertEqual(res[1][0], 2) - - # checking that execute() and executemany() - # results are not mixed together - cursor.execute( - """ -SELECT * FROM contacts WHERE contact_id = 1 -""", - ) - res = cursor.fetchone() - conn.commit() - - self.assertEqual(res[0], 1) - conn.close() - - def test_DDL_autocommit(self): - """Check that DDLs in autocommit mode are immediately executed.""" - conn = Connection(Config.INSTANCE, self._db) - conn.autocommit = True - - cur = conn.cursor() - cur.execute( - """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - conn.close() - - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(Config.INSTANCE, self._db) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - - def test_DDL_commit(self): - """Check that DDLs in commit mode are executed on calling `commit()`.""" - conn = Connection(Config.INSTANCE, self._db) - cur = conn.cursor() - - cur.execute( - """ - CREATE TABLE Singers ( - SingerId INT64 NOT NULL, - Name STRING(1024), - ) PRIMARY KEY (SingerId) - """ - ) - conn.commit() - conn.close() - - # if previous DDL wasn't committed, the next DROP TABLE - # statement will fail with a ProgrammingError - conn = Connection(Config.INSTANCE, self._db) - cur = conn.cursor() - - cur.execute("DROP TABLE Singers") - conn.commit() - - -def clear_table(transaction): - """Clear the test table.""" - transaction.execute_update("DELETE FROM contacts WHERE true") diff --git a/tests/system/test_table_api.py b/tests/system/test_table_api.py new file mode 100644 index 0000000000..73de78d7df --- /dev/null +++ b/tests/system/test_table_api.py @@ -0,0 +1,69 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.api_core import exceptions +from google.cloud import spanner_v1 + + +def test_table_exists(shared_database): + table = shared_database.table("all_types") + assert table.exists() + + +def test_table_exists_not_found(shared_database): + table = shared_database.table("table_does_not_exist") + assert not table.exists() + + +def test_db_list_tables(shared_database): + tables = shared_database.list_tables() + table_ids = set(table.table_id for table in tables) + assert "contacts" in table_ids + assert "contact_phones" in table_ids + assert "all_types" in table_ids + + +def test_db_list_tables_reload(shared_database): + for table in shared_database.list_tables(): + assert table.exists() + schema = table.schema + assert isinstance(schema, list) + + +def test_table_reload_miss(shared_database): + table = shared_database.table("table_does_not_exist") + with pytest.raises(exceptions.NotFound): + table.reload() + + +def test_table_schema(shared_database): + table = shared_database.table("all_types") + schema = table.schema + expected = [ + ("pkey", spanner_v1.TypeCode.INT64), + ("int_value", spanner_v1.TypeCode.INT64), + ("int_array", spanner_v1.TypeCode.ARRAY), + ("bool_value", spanner_v1.TypeCode.BOOL), + ("bytes_value", spanner_v1.TypeCode.BYTES), + ("date_value", spanner_v1.TypeCode.DATE), + ("float_value", spanner_v1.TypeCode.FLOAT64), + ("string_value", spanner_v1.TypeCode.STRING), + ("timestamp_value", spanner_v1.TypeCode.TIMESTAMP), + ] + found = {field.name: field.type_.code for field in schema} + + for field_name, type_code in expected: + assert found[field_name] == type_code