Skip to content

Commit

Permalink
MongoDB: Make migr8 export accept the --limit option
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 8, 2024
1 parent c0656db commit 50b46b6
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- MongoDB: Improve user interface messages
- MongoDB: Strip single leading underscore character from all top-level fields
- MongoDB: Map OID types to CrateDB TEXT columns
- MongoDB: Make `migr8 export` accept the `--limit` option

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, progress: bool = False):
def mongodb_copy(source_url, target_url, limit: int = 0, progress: bool = False):
"""
Synopsis
--------
Expand Down Expand Up @@ -64,7 +64,9 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
f"Transferring data from MongoDB to CrateDB: "
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection)
export_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, limit=limit
)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

Expand Down
1 change: 1 addition & 0 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def export_parser(subargs):
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
parser.add_argument("--limit", type=int, default=0, required=False, help="Limit export to N documents")


def get_args():
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ def export(args) -> t.IO[bytes]:
"""
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], file=buffer)
collection_to_json(db[args.collection], fp=buffer, limit=args.limit)
buffer.seek(0)
return buffer
21 changes: 14 additions & 7 deletions cratedb_toolkit/io/mongodb/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"""

import calendar
import sys
import typing as t

import bsonjs
Expand Down Expand Up @@ -57,6 +56,12 @@ def timestamp_converter(value):


def extract_value(value, parent_type=None):
"""
Decode MongoDB Extended JSON.
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/
- https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
"""
if isinstance(value, dict):
if len(value) == 1:
for k, v in value.items():
Expand All @@ -73,13 +78,16 @@ def extract_value(value, parent_type=None):


def convert(d):
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
newdict = {}
for k, v in sanitize_field_names(d).items():
newdict[k] = extract_value(v)
return newdict


def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.Any] = None):
def collection_to_json(collection: pymongo.collection.Collection, fp: t.IO[t.Any], limit: int = 0):
"""
Export a MongoDB collection's documents to standard JSON.
The output is suitable to be consumed by the `cr8` program.
Expand All @@ -88,11 +96,10 @@ def collection_to_json(collection: pymongo.collection.Collection, file: t.IO[t.A
a Pymongo collection object.
file
a file-like object (stream); defaults to the current sys.stdout.
a file-like object (stream).
"""
file = file or sys.stdout.buffer
for document in collection.find():
for document in collection.find().limit(limit):
bson_json = bsonjs.dumps(document.raw)
json_object = json.loads(bson_json)
file.write(json.dumps(convert(json_object)))
file.write(b"\n")
fp.write(json.dumps(convert(json_object)))
fp.write(b"\n")
3 changes: 3 additions & 0 deletions doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,8 @@ Alternatively, use [cr8] to directly write the MongoDB collection into a CrateDB
migr8 export --host localhost --port 27017 --database test_db --collection test | \
cr8 insert-json --hosts localhost:4200 --table test

In order to work on a smaller set of data for preview purposes, the `migr8 export`
subcommand also accepts the `--limit` command-line option.


[cr8]: https://github.com/mfussenegger/cr8

0 comments on commit 50b46b6

Please sign in to comment.