Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

696 update dagster op for materialize alldocs #817

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 64 additions & 83 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import defaultdict
from datetime import datetime, timezone
from io import BytesIO, StringIO
from toolz.dicttoolz import keyfilter
from typing import Tuple
from zipfile import ZipFile
from itertools import chain
Expand Down Expand Up @@ -93,17 +94,20 @@
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.util import (
drs_object_in_for,
get_names_of_classes_in_effective_range_of_slot,
pluralize,
put_object,
validate_json,
specialize_activity_set_docs,
collection_name_to_class_names,
class_hierarchy_as_list,
nmdc_schema_view,
populated_schema_collection_names_with_id_field,
)
from nmdc_schema import nmdc
from nmdc_schema.nmdc import Database as NMDCDatabase
from pydantic import BaseModel
from pymongo import InsertOne
from pymongo.database import Database as MongoDatabase
from starlette import status
from toolz import assoc, dissoc, get_in, valfilter, identity
Expand Down Expand Up @@ -1030,21 +1034,17 @@ def site_code_mapping() -> dict:

@op(required_resource_keys={"mongo"})
def materialize_alldocs(context) -> int:
"""
This function re-creates the alldocs collection to reflect the current state of the Mongo database.
See nmdc-runtime/docs/nb/bulk_validation_referential_integrity_check.ipynb for more details.
"""
mdb = context.resources.mongo.db
collection_names = populated_schema_collection_names_with_id_field(mdb)
schema_view = nmdc_schema_view()

# Insert a no-op as an anchor point for this comment.
#
# Note: There used to be code here that `assert`-ed that each collection could only contain documents of a single
# type. With the legacy schema, that assertion was true. With the Berkeley schema, it is false. That code was
# in place because subsequent code (further below) used a single document in a collection as the source of the
# class ancestry information of _all_ documents in that collection; an optimization that spared us from
# having to do the same for every single document in that collection. With the Berkeley schema, we have
# eliminated that optimization (since it is inadequate; it would produce some incorrect class ancestries
# for descendants of `PlannedProcess`, for example).
#
pass
# batch size for writing documents to alldocs
BULK_WRITE_BATCH_SIZE = 2000

collection_names = populated_schema_collection_names_with_id_field(mdb)
context.log.info(f"{collection_names=}")

# Drop any existing `alldocs` collection (e.g. from previous use of this op).
Expand All @@ -1058,87 +1058,68 @@ def materialize_alldocs(context) -> int:
# Build alldocs
context.log.info("constructing `alldocs` collection")

# For each collection, group its documents by their `type` value, transform them, and load them into `alldocs`.
for collection_name in collection_names:
context.log.info(
f"Found {mdb[collection_name].estimated_document_count()} estimated documents for {collection_name=}."
document_class_names = set(
chain.from_iterable(collection_name_to_class_names.values())
)

# Any ancestor of a document class is a document-referenceable range, i.e., a valid range of a document-reference-ranged slot.
document_referenceable_ranges = set(
chain.from_iterable(
schema_view.class_ancestors(cls_name) for cls_name in document_class_names
)
)

# Process all the distinct `type` values (i.e. value in the `type` field) of the documents in this collection.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct
#
distinct_type_values = mdb[collection_name].distinct(key="type")
cls_slot_map = {
cls_name: {
slot.name: slot for slot in schema_view.class_induced_slots(cls_name)
}
for cls_name in document_class_names
}

document_reference_ranged_slots = defaultdict(list)
for cls_name, slot_map in cls_slot_map.items():
for slot_name, slot in slot_map.items():
if (
set(get_names_of_classes_in_effective_range_of_slot(schema_view, slot))
& document_referenceable_ranges
):
document_reference_ranged_slots[cls_name].append(slot_name)

for coll_name in collection_names:
context.log.info(f"{coll_name=}")
requests = []
documents_processed_counter = 0
for doc in mdb[coll_name].find():
doc_type = doc["type"][5:] # lop off "nmdc:" prefix
slots_to_include = ["id", "type"] + document_reference_ranged_slots[
doc_type
]
new_doc = keyfilter(lambda slot: slot in slots_to_include, doc)
new_doc["_type_and_ancestors"] = schema_view.class_ancestors(doc_type)
dwinston marked this conversation as resolved.
Show resolved Hide resolved
requests.append(InsertOne(new_doc))
if len(requests) == BULK_WRITE_BATCH_SIZE:
result = mdb.alldocs.bulk_write(requests, ordered=False)
requests.clear()
documents_processed_counter += BULK_WRITE_BATCH_SIZE
if len(requests) > 0:
result = mdb.alldocs.bulk_write(requests, ordered=False)
documents_processed_counter += len(requests)
context.log.info(
f"Found {len(distinct_type_values)} distinct `type` values in {collection_name=}: {distinct_type_values=}"
f"Inserted {documents_processed_counter} documents from {coll_name=} "
)
for type_value in distinct_type_values:

# Process all the documents in this collection that have this value in their `type` field.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.count_documents
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find
#
filter_ = {"type": type_value}
num_docs_having_type = mdb[collection_name].count_documents(filter=filter_)
docs_having_type = mdb[collection_name].find(filter=filter_)
context.log.info(
f"Found {num_docs_having_type} documents having {type_value=} in {collection_name=}."
)

# Get a "representative" document from the result.
#
# Note: Since all of the documents in this batch have the same class ancestry, we will save time by
# determining the class ancestry of only _one_ of them (we call this the "representative") and then
# (later) attributing that class ancestry to all of them.
#
representative_doc = next(docs_having_type)

# Instantiate the Python class represented by the "representative" document.
db_dict = {
# Shed the `_id` attribute, since the constructor doesn't allow it.
collection_name: [dissoc(representative_doc, "_id")]
}
nmdc_db = NMDCDatabase(**db_dict)
representative_instance = getattr(nmdc_db, collection_name)[0]

# Get the class ancestry of that instance, as a list of class names (including its own class name).
ancestor_class_names = class_hierarchy_as_list(representative_instance)

# Store the documents belonging to this group, in the `alldocs` collection, setting their `type` field
# to the list of class names obtained from the "representative" document above.
#
# TODO: Document why clobbering the existing contents of the `type` field is OK.
#
# Note: The reason we `chain()` our "representative" document (in an iterable) with the `docs_having_type`
# iterator here is that, when we called `next(docs_having_type)` above, we "consumed" our
# "representative" document from that iterator. We use `chain()` here so that that document gets
# inserted alongside its cousins (i.e. the documents _still_ accessible via `docs_having_type`).
# Reference: https://docs.python.org/3/library/itertools.html#itertools.chain
#
inserted_many_result = mdb.alldocs.insert_many(
[
assoc(dissoc(doc, "type", "_id"), "type", ancestor_class_names)
for doc in chain([representative_doc], docs_having_type)
]
)
context.log.info(
f"Inserted {len(inserted_many_result.inserted_ids)} documents from {collection_name=} "
f"originally having {type_value=}."
)
context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
)

# Re-idx for `alldocs` collection
mdb.alldocs.create_index("id", unique=True)
# The indexes were added to improve the performance of the
# /data_objects/study/{study_id} endpoint
mdb.alldocs.create_index("has_input")
mdb.alldocs.create_index("has_output")
mdb.alldocs.create_index("was_informed_by")
context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
)
slots_to_index = ["has_input", "has_output", "was_informed_by"]
[mdb.alldocs.create_index(slot) for slot in slots_to_index]

context.log.info(f"created indexes on id, {slots_to_index}.")
return mdb.alldocs.estimated_document_count()


Expand Down
44 changes: 44 additions & 0 deletions nmdc_runtime/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import requests
from frozendict import frozendict
from jsonschema.validators import Draft7Validator
from linkml_runtime import linkml_model
from linkml_runtime.utils.schemaview import SchemaView
from nmdc_schema.nmdc import Database as NMDCDatabase
from nmdc_schema.get_nmdc_view import ViewGetter
from pydantic import Field, BaseModel
Expand All @@ -29,6 +31,48 @@
from typing_extensions import Annotated


def get_names_of_classes_in_effective_range_of_slot(
schema_view: SchemaView, slot_definition: linkml_model.SlotDefinition
) -> List[str]:
r"""
Determine the slot's "effective" range, by taking into account its `any_of` constraints (if defined).

Note: The `any_of` constraints constrain the slot's "effective" range beyond that described by the
induced slot definition's `range` attribute. `SchemaView` does not seem to provide the result
of applying those additional constraints, so we do it manually here (if any are defined).
Reference: https://github.com/orgs/linkml/discussions/2101#discussion-6625646

Reference: https://linkml.io/linkml-model/latest/docs/any_of/
"""

# Initialize the list to be empty.
names_of_eligible_target_classes = []

# If the `any_of` constraint is defined on this slot, use that instead of the `range`.
if "any_of" in slot_definition and len(slot_definition.any_of) > 0:
for slot_expression in slot_definition.any_of:
# Use the slot expression's `range` to get the specified eligible class name
# and the names of all classes that inherit from that eligible class.
if slot_expression.range in schema_view.all_classes():
own_and_descendant_class_names = schema_view.class_descendants(
slot_expression.range
)
names_of_eligible_target_classes.extend(own_and_descendant_class_names)
else:
# Use the slot's `range` to get the specified eligible class name
# and the names of all classes that inherit from that eligible class.
if slot_definition.range in schema_view.all_classes():
own_and_descendant_class_names = schema_view.class_descendants(
slot_definition.range
)
names_of_eligible_target_classes.extend(own_and_descendant_class_names)

# Remove duplicate class names.
names_of_eligible_target_classes = list(set(names_of_eligible_target_classes))

return names_of_eligible_target_classes


def get_class_names_from_collection_spec(
spec: dict, prefix: Optional[str] = None
) -> List[str]:
Expand Down
Loading