diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index acb8fc45..e7a40699 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -12,14 +12,17 @@ import bson import pymongo import pymongo.collection +import pymongo.database import yarl from attrs import define, field from boltons.urlutils import URL from bson.raw_bson import RawBSONDocument from undatum.common.iterable import IterableData +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.util import batches from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util.data import asbool from cratedb_toolkit.util.io import read_json logger = logging.getLogger(__name__) @@ -32,7 +35,11 @@ class MongoDBAdapterBase: database_name: str collection_name: str - _custom_query_parameters = ["batch-size", "filter", "limit", "offset"] + _custom_query_parameters = ["batch-size", "direct", "filter", "limit", "offset", "timeout"] + _default_timeout = 5000 + + direct: bool = False + timeout: int = _default_timeout @classmethod def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): @@ -42,6 +49,8 @@ def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): mongodb_uri, mongodb_collection_address = mongodb_address.decode() mongodb_database = mongodb_collection_address.schema mongodb_collection = mongodb_collection_address.table + direct = asbool(mongodb_uri.query_params.pop("direct", False)) + timeout = mongodb_uri.query_params.pop("timeout", cls._default_timeout) for custom_query_parameter in cls._custom_query_parameters: mongodb_uri.query_params.pop(custom_query_parameter, None) return cls( @@ -49,6 +58,8 @@ def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]): effective_url=mongodb_uri, database_name=mongodb_database, collection_name=mongodb_collection, + direct=direct, + timeout=timeout, ) def __attrs_post_init__(self): @@ -75,7 +86,7 @@ def setup(self): raise NotImplementedError() @abstractmethod - def get_collections(self) -> t.List[str]: + def get_collection_names(self) -> t.List[str]: raise NotImplementedError() @abstractmethod @@ -87,7 +98,7 @@ def query(self): raise NotImplementedError() @abstractmethod - def subscribe(self): + def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None): raise NotImplementedError() @@ -98,7 +109,7 @@ class MongoDBFilesystemAdapter(MongoDBAdapterBase): def setup(self): self._path = Path(self.address.uri.path) - def get_collections(self) -> t.List[str]: + def get_collection_names(self) -> t.List[str]: return sorted(glob.glob(str(self._path))) def record_count(self, filter_=None) -> int: @@ -126,7 +137,7 @@ def query(self): raise ValueError(f"Unsupported file type: {self._path.suffix}") return batches(data, self.batch_size) - def subscribe(self): + def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None): raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter") @@ -139,7 +150,7 @@ def setup(self): if "+bson" in self._url.scheme: self._url.scheme = self._url.scheme.replace("+bson", "") - def get_collections(self) -> t.List[str]: + def get_collection_names(self) -> t.List[str]: raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet") def record_count(self, filter_=None) -> int: @@ -160,13 +171,14 @@ def query(self): raise ValueError(f"Unsupported file type: {self._url}") return batches(data, self.batch_size) - def subscribe(self): + def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None): raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream") @define class MongoDBServerAdapter(MongoDBAdapterBase): _mongodb_client: pymongo.MongoClient = field(init=False) + _mongodb_database: pymongo.database.Database = field(init=False) _mongodb_collection: pymongo.collection.Collection = field(init=False) def setup(self): @@ -174,11 +186,21 @@ def setup(self): str(self.effective_url), document_class=RawBSONDocument, datetime_conversion="DATETIME_AUTO", + directConnection=self.direct, + socketTimeoutMS=self.timeout, + connectTimeoutMS=self.timeout, + serverSelectionTimeoutMS=self.timeout, ) + if self.database_name: + self._mongodb_database = self._mongodb_client.get_database(self.database_name) if self.collection_name: - self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name] + self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name) + + @property + def collection(self): + return self._mongodb_collection - def get_collections(self) -> t.List[str]: + def get_collection_names(self) -> t.List[str]: database = self._mongodb_client.get_database(self.database_name) return sorted(database.list_collection_names()) @@ -203,8 +225,15 @@ def query(self): ) return batches(data, self.batch_size) - def subscribe(self): - return self._mongodb_collection.watch(full_document="updateLookup") + def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None): + return self._mongodb_collection.watch( + full_document="updateLookup", batch_size=self.batch_size, resume_after=resume_after + ) + + def create_collection(self): + self._mongodb_database.create_collection(self.collection_name) + self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name) + return self._mongodb_collection def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase: diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index dd68abfa..32815277 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -145,7 +145,7 @@ def mongodb_copy( address_pair_root = AddressPair(source_url=source_url, target_url=target_url) mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url) - collections = mongodb_adapter.get_collections() + collections = mongodb_adapter.get_collection_names() logger.info(f"Discovered collections: {len(collections)}") logger.debug(f"Processing collections: {collections}") diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py index 9b802c75..92b7ecc4 100644 --- a/cratedb_toolkit/io/mongodb/cdc.py +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -1,24 +1,29 @@ """ -Basic relaying of a MongoDB Change Stream into CrateDB table. +Relay a MongoDB Change Stream into a CrateDB table. Documentation: -- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md - https://www.mongodb.com/docs/manual/changeStreams/ - https://www.mongodb.com/developer/languages/python/python-change-streams/ +- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md """ import logging import typing as t +import pymongo +import pymongo.errors import sqlalchemy as sa from boltons.urlutils import URL from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter +from pymongo.change_stream import CollectionChangeStream from zyp.model.collection import CollectionAddress from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory +from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.process import FixedBackoff logger = logging.getLogger(__name__) @@ -39,6 +44,8 @@ def __init__( self.mongodb_uri = URL(mongodb_url) self.cratedb_uri = URL(cratedb_url) + logger.info(f"Initializing MongoDB CDC Relay. mongodb={mongodb_url}, cratedb={cratedb_url}") + # Decode database URL: MongoDB. self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri) @@ -68,9 +75,11 @@ def __init__( ) self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter) + self.ccs: CollectionChangeStream self.on_error = on_error self.debug = debug + self.stopping: bool = False def start(self): """ @@ -79,18 +88,43 @@ def start(self): # FIXME: Note that the function does not perform any sensible error handling yet. with self.cratedb_adapter.engine.connect() as connection: connection.execute(sa.text(self.cdc.sql_ddl)) - for operation in self.cdc_to_sql(): + for event in self.consume(): + operation = self.cdc.to_sql(event) if operation: connection.execute(sa.text(operation.statement), operation.parameters) - def cdc_to_sql(self): + def stop(self): + self.stopping = True + self.ccs._closed = True + + def consume(self, resume_after: t.Optional[DocumentDict] = None): """ - Subscribe to change stream events, and emit corresponding SQL statements. + Subscribe to change stream events, and emit change events. """ - # Note that `.subscribe()` (calling `.watch()`) will block until events are ready - # for consumption, so this is not a busy loop. - # FIXME: Note that the function does not perform any sensible error handling yet. - while True: - with self.mongodb_adapter.subscribe() as change_stream: - for change in change_stream: - yield self.cdc.to_sql(change) + self.ccs = self.mongodb_adapter.subscribe_cdc(resume_after=resume_after) + + if self.stopping: + return + + backoff = FixedBackoff(sequence=[2, 4, 6, 8, 10]) + resume_token = None + try: + with self.ccs as stream: + for event in stream: + yield event + resume_token = stream.resume_token + backoff.reset() + except pymongo.errors.PyMongoError: + # The ChangeStream encountered an unrecoverable error or the + # resume attempt failed to recreate the cursor. + if resume_token is None: + # There is no usable resume token because there was a + # failure during ChangeStream initialization. + logger.exception("Initializing change stream failed") + else: + # Use the interrupted ChangeStream's resume token to create + # a new ChangeStream. The new stream will continue from the + # last seen insert change without missing any events. + backoff.next() + logger.info("Resuming change stream") + self.consume(resume_after=resume_after) diff --git a/cratedb_toolkit/testing/testcontainers/mongodb.py b/cratedb_toolkit/testing/testcontainers/mongodb.py index 5250980a..b8627dce 100644 --- a/cratedb_toolkit/testing/testcontainers/mongodb.py +++ b/cratedb_toolkit/testing/testcontainers/mongodb.py @@ -11,7 +11,12 @@ # License for the specific language governing permissions and limitations # under the License. import os +import time +import typing as t +import pymongo.errors +from pymongo import MongoClient +from testcontainers.core.exceptions import ContainerStartException from testcontainers.mongodb import MongoDbContainer from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer @@ -30,7 +35,10 @@ class MongoDbContainerWithKeepalive(KeepaliveContainer, MongoDbContainer): useful when used within a test matrix. Its default value is `latest`. """ + NAME = "testcontainers-mongodb-vanilla" MONGODB_VERSION = os.environ.get("MONGODB_VERSION", "latest") + TIMEOUT = 5000 + DIRECT_CONNECTION = False def __init__( self, @@ -38,4 +46,96 @@ def __init__( **kwargs, ) -> None: super().__init__(image=image, **kwargs) - self.with_name("testcontainers-mongodb") + self.with_name(self.NAME) + + def get_connection_client(self) -> MongoClient: + return MongoClient( + self.get_connection_url(), + directConnection=self.DIRECT_CONNECTION, + socketTimeoutMS=self.TIMEOUT, + connectTimeoutMS=self.TIMEOUT, + serverSelectionTimeoutMS=self.TIMEOUT, + ) + + +class MongoDbReplicasetContainer(MongoDbContainerWithKeepalive): + """ + A Testcontainer for MongoDB with transparent replica set configuration. + + Overwritten to nullify MONGO_INITDB_ROOT_USERNAME + _PASSWORD, + and username + password, because replicaset + authentication + is more complicated to configure. + """ + + NAME = "testcontainers-mongodb-replicaset" + DIRECT_CONNECTION = True + + def _configure(self) -> None: + self.with_command("mongod --replSet testcontainers-rs") + self.with_env("MONGO_DB", self.dbname) + + def _create_connection_url( + self, + dialect: str, + host: t.Optional[str] = None, + port: t.Optional[int] = None, + dbname: t.Optional[str] = None, + **kwargs, + ) -> str: + from testcontainers.core.utils import raise_for_deprecated_parameter + + if raise_for_deprecated_parameter(kwargs, "db_name", "dbname"): + raise ValueError(f"Unexpected arguments: {','.join(kwargs)}") + if self._container is None: + raise ContainerStartException("container has not been started") + host = host or self.get_container_host_ip() + port = self.get_exposed_port(port) + url = f"{dialect}://{host}:{port}" + if dbname: + url = f"{url}/{dbname}" + return url + + def get_connection_url(self) -> str: + return self._create_connection_url( + dialect="mongodb", + port=self.port, + ) + + def _connect(self) -> None: + """ + Connect to MongoDB, and establish replica set. + + https://www.mongodb.com/docs/v5.0/reference/command/replSetInitiate/ + https://www.mongodb.com/docs/v5.0/reference/method/rs.initiate/#mongodb-method-rs.initiate + + https://www.mongodb.com/docs/v5.0/reference/command/replSetGetStatus/ + https://www.mongodb.com/docs/v5.0/reference/method/rs.status/#mongodb-method-rs.status + """ + super()._connect() + + rs_config = {"_id": "testcontainers-rs", "members": [{"_id": 0, "host": "localhost:27017"}]} + + client = self.get_connection_client() + db = client.get_database("admin") + for _ in range(10): + response = db.command("ping") + if response["ok"]: + break + time.sleep(0.5) + + try: + db.command({"replSetInitiate": rs_config}) + except pymongo.errors.OperationFailure as ex: + if ex.details is None or ex.details["codeName"] != "AlreadyInitialized": + raise + + response = db.command({"replSetGetStatus": 1}) + if not response["myState"]: + raise IOError("MongoDB replica set failed") + + for _ in range(10): + if client.is_primary: + return + time.sleep(0.5) + + raise IOError("Unable to spin up MongoDB with replica set") diff --git a/cratedb_toolkit/util/process.py b/cratedb_toolkit/util/process.py new file mode 100644 index 00000000..c5a6507f --- /dev/null +++ b/cratedb_toolkit/util/process.py @@ -0,0 +1,23 @@ +import logging +import time +import typing as t + +logger = logging.getLogger(__name__) + + +class FixedBackoff: + def __init__(self, sequence: t.List[int]): + self.sequence = sequence + self.attempt = 0 + + def next(self) -> None: + self.attempt += 1 + if self.attempt < len(self.sequence): + delay = self.sequence[self.attempt] + else: + delay = self.sequence[-1] + logger.info(f"Retry attempt #{self.attempt} in {delay} seconds") + time.sleep(delay) + + def reset(self) -> None: + self.attempt = 0 diff --git a/tests/io/mongodb/conftest.py b/tests/io/mongodb/conftest.py index 0e38c79f..2981ca57 100644 --- a/tests/io/mongodb/conftest.py +++ b/tests/io/mongodb/conftest.py @@ -1,4 +1,5 @@ import logging +import os import pytest @@ -25,18 +26,23 @@ class MongoDBFixture: A little helper wrapping Testcontainer's `MongoDbContainer`. """ - def __init__(self): + def __init__(self, container_class): from pymongo import MongoClient + self.container_class = container_class self.container = None self.client: MongoClient = None self.setup() def setup(self): # TODO: Make image name configurable. - from cratedb_toolkit.testing.testcontainers.mongodb import MongoDbContainerWithKeepalive - self.container = MongoDbContainerWithKeepalive() + mongodb_version = os.environ.get("MONGODB_VERSION", "7") + mongodb_image = f"mongo:{mongodb_version}" + + self.container = self.container_class( + image=mongodb_image, + ) self.container.start() self.client = self.container.get_connection_client() @@ -56,6 +62,21 @@ def get_connection_url(self): def get_connection_client(self): return self.container.get_connection_client() + def get_connection_client_replicaset(self): + return self.container.get_connection_client_replicaset() + + +class MongoDBFixtureFactory: + def __init__(self, container): + self.db = MongoDBFixture(container) + + def __enter__(self): + self.db.reset() + return self.db + + def __exit__(self, exc_type, exc_val, exc_tb): + self.db.finalize() + @pytest.fixture(scope="session") def mongodb_service(): @@ -63,10 +84,10 @@ def mongodb_service(): Provide an MongoDB service instance to the test suite. """ check_sqlalchemy2() - db = MongoDBFixture() - db.reset() - yield db - db.finalize() + from cratedb_toolkit.testing.testcontainers.mongodb import MongoDbContainerWithKeepalive + + with MongoDBFixtureFactory(container=MongoDbContainerWithKeepalive) as mongo: + yield mongo @pytest.fixture(scope="function") @@ -76,3 +97,24 @@ def mongodb(mongodb_service): """ mongodb_service.reset() yield mongodb_service + + +@pytest.fixture(scope="session") +def mongodb_replicaset_service(): + """ + Provide an MongoDB service instance to the test suite. + """ + check_sqlalchemy2() + from cratedb_toolkit.testing.testcontainers.mongodb import MongoDbReplicasetContainer + + with MongoDBFixtureFactory(container=MongoDbReplicasetContainer) as mongo: + yield mongo + + +@pytest.fixture(scope="function") +def mongodb_replicaset(mongodb_replicaset_service): + """ + Provide a fresh canvas to each test case invocation, by resetting database content. + """ + mongodb_replicaset_service.reset() + yield mongodb_replicaset_service diff --git a/tests/io/mongodb/test_cdc.py b/tests/io/mongodb/test_cdc.py new file mode 100644 index 00000000..32e6bb07 --- /dev/null +++ b/tests/io/mongodb/test_cdc.py @@ -0,0 +1,82 @@ +import datetime as dt +from pathlib import Path + +from bson import ObjectId + +from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB +from cratedb_toolkit.io.mongodb.transform import TransformationManager +from tests.util.processor import BackgroundProcessor + +DOCUMENT_INSERT = { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "tombstone": "foo", +} + + +DOCUMENT_UPDATE = { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.5}, + "new": "foo", + "some_date": dt.datetime(2024, 7, 11, 23, 17, 42), +} + + +def test_mongodb_cdc_insert_update(caplog, mongodb_replicaset, cratedb): + """ + Roughly verify that the MongoDB CDC processing works as expected. + """ + + # Define source and target URLs. + mongodb_url = f"{mongodb_replicaset.get_connection_url()}/testdrive/demo?direct=true" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Optionally configure transformations. + tm = TransformationManager.from_any(Path("examples/zyp/zyp-transformation.yaml")) + + # Initialize table loader. + table_loader = MongoDBCDCRelayCrateDB( + mongodb_url=mongodb_url, + cratedb_url=cratedb_url, + tm=tm, + ) + + # Define target table name. + table_name = table_loader.cratedb_table + + # Create source collection. + table_loader.mongodb_adapter.create_collection() + + # Create target table. + table_loader.cratedb_adapter.run_sql(table_loader.cdc.sql_ddl) + + # Start event processor / stream consumer in separate thread, consuming forever. + with BackgroundProcessor(loader=table_loader, cratedb=cratedb) as processor: + # Populate source database with data. + processor.loader.mongodb_adapter.collection.insert_one(DOCUMENT_INSERT) + next(processor) + + processor.loader.mongodb_adapter.collection.replace_one( + filter={"_id": DOCUMENT_UPDATE["_id"]}, replacement=DOCUMENT_UPDATE + ) + next(processor) + + # Verify data in target database, more specifically that both events have been processed well. + assert cratedb.database.refresh_table(table_name) is True + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + record = results[0]["data"] + + # Container content amendment. + assert record["data"] == {"temperature": 42.5} + + # New attribute added. + assert record["new"] == "foo" + + # Attribute deleted. + assert "tombstone" not in record + + # Zyp transformation from dt.datetime. + assert record["some_date"] == 1720739862000 diff --git a/tests/util/processor.py b/tests/util/processor.py new file mode 100644 index 00000000..cba5e72a --- /dev/null +++ b/tests/util/processor.py @@ -0,0 +1,38 @@ +import threading +import time + +from cratedb_toolkit.testing.testcontainers.cratedb import CrateDBTestAdapter + + +class BackgroundProcessor: + """ + Manage event processor / stream consumer in separate thread, consuming forever. + """ + + delay_start = 0.25 + delay_step = 0.25 + + def __init__(self, loader, cratedb: CrateDBTestAdapter): + self.loader = loader + self.cratedb = cratedb + self.table_name = self.loader.cratedb_table + self.thread = threading.Thread(target=self.loader.start) + + def __enter__(self): + """ + Start stream consumer. + """ + self.thread.start() + time.sleep(self.delay_start) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Stop stream consumer. + """ + self.loader.stop() + self.thread.join() + + def __next__(self): + time.sleep(self.delay_step) + self.cratedb.database.refresh_table(self.table_name)