Skip to content

Commit

Permalink
MongoDB: Add integration test for the CDC subsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Oct 13, 2024
1 parent 48e4d77 commit 20904aa
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 32 deletions.
51 changes: 40 additions & 11 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import bson
import pymongo
import pymongo.collection
import pymongo.database
import yarl
from attrs import define, field
from boltons.urlutils import URL
from bson.raw_bson import RawBSONDocument
from undatum.common.iterable import IterableData

from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.data import asbool
from cratedb_toolkit.util.io import read_json

logger = logging.getLogger(__name__)
Expand All @@ -32,7 +35,11 @@ class MongoDBAdapterBase:
database_name: str
collection_name: str

_custom_query_parameters = ["batch-size", "filter", "limit", "offset"]
_custom_query_parameters = ["batch-size", "direct", "filter", "limit", "offset", "timeout"]
_default_timeout = 5000

direct: bool = False
timeout: int = _default_timeout

@classmethod
def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
Expand All @@ -42,13 +49,17 @@ def from_url(cls, url: t.Union[str, boltons.urlutils.URL, yarl.URL]):
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table
direct = asbool(mongodb_uri.query_params.pop("direct", False))
timeout = mongodb_uri.query_params.pop("timeout", cls._default_timeout)
for custom_query_parameter in cls._custom_query_parameters:
mongodb_uri.query_params.pop(custom_query_parameter, None)
return cls(
address=mongodb_address,
effective_url=mongodb_uri,
database_name=mongodb_database,
collection_name=mongodb_collection,
direct=direct,
timeout=timeout,
)

def __attrs_post_init__(self):
Expand All @@ -75,7 +86,7 @@ def setup(self):
raise NotImplementedError()

@abstractmethod
def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
raise NotImplementedError()

@abstractmethod
Expand All @@ -87,7 +98,7 @@ def query(self):
raise NotImplementedError()

@abstractmethod
def subscribe(self):
def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError()


Expand All @@ -98,7 +109,7 @@ class MongoDBFilesystemAdapter(MongoDBAdapterBase):
def setup(self):
self._path = Path(self.address.uri.path)

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
return sorted(glob.glob(str(self._path)))

def record_count(self, filter_=None) -> int:
Expand Down Expand Up @@ -126,7 +137,7 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._path.suffix}")
return batches(data, self.batch_size)

def subscribe(self):
def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError("Subscribing to a change stream is not supported by filesystem adapter")


Expand All @@ -139,7 +150,7 @@ def setup(self):
if "+bson" in self._url.scheme:
self._url.scheme = self._url.scheme.replace("+bson", "")

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
raise NotImplementedError("HTTP+BSON loader does not support directory inquiry yet")

def record_count(self, filter_=None) -> int:
Expand All @@ -160,25 +171,36 @@ def query(self):
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)

def subscribe(self):
def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
raise NotImplementedError("HTTP+BSON loader does not support subscribing to a change stream")


@define
class MongoDBServerAdapter(MongoDBAdapterBase):
_mongodb_client: pymongo.MongoClient = field(init=False)
_mongodb_database: pymongo.database.Database = field(init=False)
_mongodb_collection: pymongo.collection.Collection = field(init=False)

def setup(self):
self._mongodb_client: pymongo.MongoClient = pymongo.MongoClient(
str(self.effective_url),
document_class=RawBSONDocument,
datetime_conversion="DATETIME_AUTO",
directConnection=self.direct,
socketTimeoutMS=self.timeout,
connectTimeoutMS=self.timeout,
serverSelectionTimeoutMS=self.timeout,
)
if self.database_name:
self._mongodb_database = self._mongodb_client.get_database(self.database_name)
if self.collection_name:
self._mongodb_collection = self._mongodb_client[self.database_name][self.collection_name]
self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name)

@property
def collection(self):
return self._mongodb_collection

def get_collections(self) -> t.List[str]:
def get_collection_names(self) -> t.List[str]:
database = self._mongodb_client.get_database(self.database_name)
return sorted(database.list_collection_names())

Expand All @@ -203,8 +225,15 @@ def query(self):
)
return batches(data, self.batch_size)

def subscribe(self):
return self._mongodb_collection.watch(full_document="updateLookup")
def subscribe_cdc(self, resume_after: t.Optional[DocumentDict] = None):
return self._mongodb_collection.watch(
full_document="updateLookup", batch_size=self.batch_size, resume_after=resume_after
)

def create_collection(self):
self._mongodb_database.create_collection(self.collection_name)
self._mongodb_collection = self._mongodb_database.get_collection(self.collection_name)
return self._mongodb_collection


def mongodb_adapter_factory(mongodb_uri: URL) -> MongoDBAdapterBase:
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def mongodb_copy(
address_pair_root = AddressPair(source_url=source_url, target_url=target_url)

mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url)
collections = mongodb_adapter.get_collections()
collections = mongodb_adapter.get_collection_names()
logger.info(f"Discovered collections: {len(collections)}")
logger.debug(f"Processing collections: {collections}")

Expand Down
58 changes: 46 additions & 12 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
"""
Basic relaying of a MongoDB Change Stream into CrateDB table.
Relay a MongoDB Change Stream into a CrateDB table.
Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
"""

import logging
import typing as t

import pymongo
import pymongo.errors
import sqlalchemy as sa
from boltons.urlutils import URL
from commons_codec.transform.mongodb import MongoDBCDCTranslator, MongoDBCrateDBConverter
from pymongo.change_stream import CollectionChangeStream
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.process import FixedBackoff

logger = logging.getLogger(__name__)

Expand All @@ -39,6 +44,8 @@ def __init__(
self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

logger.info(f"Initializing MongoDB CDC Relay. mongodb={mongodb_url}, cratedb={cratedb_url}")

# Decode database URL: MongoDB.
self.mongodb_adapter = mongodb_adapter_factory(self.mongodb_uri)

Expand Down Expand Up @@ -68,9 +75,11 @@ def __init__(
)

self.cdc = MongoDBCDCTranslator(table_name=self.cratedb_table, converter=self.converter)
self.ccs: CollectionChangeStream

self.on_error = on_error
self.debug = debug
self.stopping: bool = False

def start(self):
"""
Expand All @@ -79,18 +88,43 @@ def start(self):
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for operation in self.cdc_to_sql():
for event in self.consume():
operation = self.cdc.to_sql(event)
if operation:
connection.execute(sa.text(operation.statement), operation.parameters)

def cdc_to_sql(self):
def stop(self):
self.stopping = True
self.ccs._closed = True

def consume(self, resume_after: t.Optional[DocumentDict] = None):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
Subscribe to change stream events, and emit change events.
"""
# Note that `.subscribe()` (calling `.watch()`) will block until events are ready
# for consumption, so this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_adapter.subscribe() as change_stream:
for change in change_stream:
yield self.cdc.to_sql(change)
self.ccs = self.mongodb_adapter.subscribe_cdc(resume_after=resume_after)

if self.stopping:
return

backoff = FixedBackoff(sequence=[2, 4, 6, 8, 10])
resume_token = None
try:
with self.ccs as stream:
for event in stream:
yield event
resume_token = stream.resume_token
backoff.reset()
except pymongo.errors.PyMongoError:
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
if resume_token is None:
# There is no usable resume token because there was a
# failure during ChangeStream initialization.
logger.exception("Initializing change stream failed")
else:
# Use the interrupted ChangeStream's resume token to create
# a new ChangeStream. The new stream will continue from the
# last seen insert change without missing any events.
backoff.next()
logger.info("Resuming change stream")
self.consume(resume_after=resume_after)
102 changes: 101 additions & 1 deletion cratedb_toolkit/testing/testcontainers/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
import typing as t

import pymongo.errors
from pymongo import MongoClient
from testcontainers.core.exceptions import ContainerStartException
from testcontainers.mongodb import MongoDbContainer

from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer
Expand All @@ -30,12 +35,107 @@ class MongoDbContainerWithKeepalive(KeepaliveContainer, MongoDbContainer):
useful when used within a test matrix. Its default value is `latest`.
"""

NAME = "testcontainers-mongodb-vanilla"
MONGODB_VERSION = os.environ.get("MONGODB_VERSION", "latest")
TIMEOUT = 5000
DIRECT_CONNECTION = False

def __init__(
self,
image: str = f"mongo:{MONGODB_VERSION}",
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)
self.with_name("testcontainers-mongodb")
self.with_name(self.NAME)

def get_connection_client(self) -> MongoClient:
return MongoClient(
self.get_connection_url(),
directConnection=self.DIRECT_CONNECTION,
socketTimeoutMS=self.TIMEOUT,
connectTimeoutMS=self.TIMEOUT,
serverSelectionTimeoutMS=self.TIMEOUT,
)


class MongoDbReplicasetContainer(MongoDbContainerWithKeepalive):
"""
A Testcontainer for MongoDB with transparent replica set configuration.
Overwritten to nullify MONGO_INITDB_ROOT_USERNAME + _PASSWORD,
and username + password, because replicaset + authentication
is more complicated to configure.
"""

NAME = "testcontainers-mongodb-replicaset"
DIRECT_CONNECTION = True

def _configure(self) -> None:
self.with_command("mongod --replSet testcontainers-rs")
self.with_env("MONGO_DB", self.dbname)

def _create_connection_url(
self,
dialect: str,
host: t.Optional[str] = None,
port: t.Optional[int] = None,
dbname: t.Optional[str] = None,
**kwargs,
) -> str:
from testcontainers.core.utils import raise_for_deprecated_parameter

if raise_for_deprecated_parameter(kwargs, "db_name", "dbname"):
raise ValueError(f"Unexpected arguments: {','.join(kwargs)}")
if self._container is None:
raise ContainerStartException("container has not been started")
host = host or self.get_container_host_ip()
port = self.get_exposed_port(port)
url = f"{dialect}://{host}:{port}"
if dbname:
url = f"{url}/{dbname}"
return url

def get_connection_url(self) -> str:
return self._create_connection_url(
dialect="mongodb",
port=self.port,
)

def _connect(self) -> None:
"""
Connect to MongoDB, and establish replica set.
https://www.mongodb.com/docs/v5.0/reference/command/replSetInitiate/
https://www.mongodb.com/docs/v5.0/reference/method/rs.initiate/#mongodb-method-rs.initiate
https://www.mongodb.com/docs/v5.0/reference/command/replSetGetStatus/
https://www.mongodb.com/docs/v5.0/reference/method/rs.status/#mongodb-method-rs.status
"""
super()._connect()

rs_config = {"_id": "testcontainers-rs", "members": [{"_id": 0, "host": "localhost:27017"}]}

client = self.get_connection_client()
db = client.get_database("admin")
for _ in range(10):
response = db.command("ping")
if response["ok"]:
break
time.sleep(0.5)

try:
db.command({"replSetInitiate": rs_config})
except pymongo.errors.OperationFailure as ex:
if ex.details is None or ex.details["codeName"] != "AlreadyInitialized":
raise

response = db.command({"replSetGetStatus": 1})
if not response["myState"]:
raise IOError("MongoDB replica set failed")

for _ in range(10):
if client.is_primary:
return
time.sleep(0.5)

raise IOError("Unable to spin up MongoDB with replica set")
Loading

0 comments on commit 20904aa

Please sign in to comment.