Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

239 update import logic to update omicsprocessing record #274

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7777dcf
add test fixtures
mbthornton-lbl Oct 15, 2024
80c6bdc
add test for DataObject construction w invalid D.O Type
mbthornton-lbl Oct 16, 2024
52ea807
add test case for data object with invalid data category
mbthornton-lbl Oct 16, 2024
1b3bd9a
Refactor to move model classes
mbthornton-lbl Oct 16, 2024
d611c2c
incorporate linkml.validate into workflow_process_factory
mbthornton-lbl Oct 17, 2024
00b335a
implement map_sequencing_data with test
mbthornton-lbl Oct 17, 2024
cba6264
add map_data method and unit tests
mbthornton-lbl Oct 17, 2024
6aa29aa
update DataObject and serialization
mbthornton-lbl Oct 18, 2024
cf0b4cc
delete legacy warning
mbthornton-lbl Oct 18, 2024
3235112
update click script
mbthornton-lbl Oct 18, 2024
6ba615e
get validation working in run_import
mbthornton-lbl Oct 18, 2024
42f8dca
add update and post objects api calls
mbthornton-lbl Oct 18, 2024
0e1e3e5
clean up unused methods in activity mapper
mbthornton-lbl Oct 18, 2024
5a58e57
Do not create URL for imported sequencing data objects
mbthornton-lbl Oct 25, 2024
2a5965b
remove Metatranscriptome Raw Reads from sequencing types
mbthornton-lbl Oct 25, 2024
2907597
Remove profiler output that was mistakenly checked in
mbthornton-lbl Oct 25, 2024
7df1837
add prof to .gitignore
mbthornton-lbl Oct 25, 2024
8da0ceb
do not strip out "null" as an empty value
mbthornton-lbl Oct 25, 2024
05507fe
Handle gene_count = "null" case for mag bin
mbthornton-lbl Oct 25, 2024
e4b6b63
delete unused schema file
mbthornton-lbl Oct 25, 2024
b93775c
update test description
mbthornton-lbl Oct 25, 2024
a2a558f
modify the generated update query for DataGen to not clobber any has_…
mbthornton-lbl Oct 25, 2024
b19ef6c
ensure we are not adding dupes to has_output
mbthornton-lbl Oct 28, 2024
12db033
update unit test
mbthornton-lbl Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nmdc_automation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .config import siteconfig
from .import_automation import activity_mapper
from .workflow_automation import watch_nmdc, wfutils, workflows, workflow_process
from .models import workflow, nmdc
161 changes: 149 additions & 12 deletions nmdc_automation/import_automation/activity_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from linkml_runtime.dumpers import json_dumper
from nmdc_automation.api import NmdcRuntimeApi
from nmdc_automation.models.nmdc import DataObject
from .utils import object_action, file_link, get_md5, filter_import_by_type

logger = logging.getLogger(__name__)
Expand All @@ -39,9 +40,7 @@ def __init__(
project_directory: Project directory path.
"""

with open(yaml_file, "r") as file:
self.import_data = yaml.safe_load(file)

self.import_data = self.load_yaml_file(yaml_file)
self.nmdc_db = nmdc.Database()
self.iteration = iteration
self.file_list = file_list
Expand All @@ -52,20 +51,158 @@ def __init__(
self.project_dir = project_directory
self.url = self.import_data["Workflow Metadata"]["Source URL"]
self.data_object_type = "nmdc:DataObject"
self.objects = {}
self.data_object_map = {}
self.workflow_execution_ids = {}
self.workflows_by_type = {}

self.workflows_by_type = self.build_workflows_by_type()
self.runtime = NmdcRuntimeApi(site_config_file)

for wf in self.import_data["Workflows"]:
self.workflows_by_type[wf["Type"]] = wf

def load_yaml_file(self, yaml_file: Union[str, Path]) -> Dict:
"""Utility function to load YAML file."""
with open(yaml_file, "r") as file:
return yaml.safe_load(file)

def build_workflows_by_type(self) -> Dict:
"""Builds a dictionary of workflows by their type."""
return {wf["Type"]: wf for wf in self.import_data["Workflows"]}

def map_sequencing_data(self) -> Tuple[nmdc.Database, Dict]:
"""
Map sequencing data to an NMDC data object and create an update to be applied to the has_output
list of the sequencing data generation.
"""
sequencing_types = ["Metagenome Raw Reads", "Metatranscriptome Raw Reads"]
mbthornton-lbl marked this conversation as resolved.
Show resolved Hide resolved
db = nmdc.Database()

# get the Metagenome Raw Reads import data
sequencing_import_data = [
d for d in self.import_data["Data Objects"]["Unique"] if d["data_object_type"] in sequencing_types
]
has_output = []
for data_object_dict in sequencing_import_data:
# get the file(s) that match the import suffix
for file in self.file_list:
file = str(file)
if re.search(data_object_dict["import_suffix"], file):
logging.debug(f"Processing {data_object_dict['data_object_type']}")
file_destination_name = object_action(
file,
data_object_dict["action"],
self.nucelotide_sequencing_id,
data_object_dict["nmdc_suffix"],
)
sequencing_dir = os.path.join(self.root_dir, self.nucelotide_sequencing_id)
updated_file = file_link(
self.project_dir, file, sequencing_dir, file_destination_name
)
filemeta = os.stat(updated_file)
md5 = get_md5(updated_file)
data_object_id = self.runtime.minter(self.data_object_type)
do_record = {
"id": data_object_id,
"type": self.data_object_type,
"name": file_destination_name,
"url": f"{self.url}/{self.nucelotide_sequencing_id}/{file_destination_name}",
"file_size_bytes": filemeta.st_size,
"md5_checksum": md5,
"data_object_type": data_object_dict["data_object_type"],
"description": data_object_dict["description"].replace(
"{id}", self.nucelotide_sequencing_id
)
}
db.data_object_set.append(DataObject(**do_record))
has_output.append(data_object_id)
update = {
"collection": "data_generation_set",
"filter": {"id": self.nucelotide_sequencing_id},
"update": {"has_output": has_output}
mbthornton-lbl marked this conversation as resolved.
Show resolved Hide resolved
}
return db, update


def map_data(self,db: nmdc.Database, unique: bool = True) -> Tuple[nmdc.Database, Dict]:
"""
Map data objects to the NMDC database.
"""

def process_files(files: Union[str, List[str]], data_object_dict: Dict, workflow_execution_id: str,
multiple: bool = False) -> DataObject:
"""
Process import file(s) and return a DataObject instance. Map data object ids to input_to and
output_of workflow execution types.
"""
file_destination_name = object_action(
files,
data_object_dict["action"],
workflow_execution_id,
data_object_dict["nmdc_suffix"],
workflow_execution_dir=os.path.join(self.root_dir, workflow_execution_id),
multiple=multiple,
)
updated_file = file_link(
self.project_dir,
files,
os.path.join(self.root_dir, workflow_execution_id),
file_destination_name,
)
filemeta = os.stat(updated_file)
md5 = get_md5(updated_file)
data_object_id = self.runtime.minter(self.data_object_type)
do_record = {
"id": data_object_id,
"type": self.data_object_type,
"name": file_destination_name,
"url": f"{self.url}/{self.nucelotide_sequencing_id}/{workflow_execution_id}/{file_destination_name}",
"file_size_bytes": filemeta.st_size,
"md5_checksum": md5,
"data_object_type": data_object_dict["data_object_type"],
"description": data_object_dict["description"].replace(
"{id}", self.nucelotide_sequencing_id
)
}
# update self.objects mapping
self.data_object_map[data_object_dict["data_object_type"]] = (
data_object_dict["input_to"],
[data_object_dict["output_of"]],
data_object_id,
)
return DataObject(**do_record)

# Select the correct data source (unique or multiple)
data_objects_key = "Unique" if unique else "Multiples"
data_object_specs = self.import_data["Data Objects"][data_objects_key]
for data_object_spec in data_object_specs:
if not filter_import_by_type(self.import_data["Workflows"], data_object_spec["output_of"]):
continue
if not "import_suffix" in data_object_spec:
logging.warning("Missing suffix")
continue

# Process unique data objects
if unique:
for file in map(str, self.file_list):
if re.search(data_object_spec["import_suffix"], file):
workflow_execution_id = self.get_workflow_execution_id(data_object_spec["output_of"])
db.data_object_set.append(process_files(file, data_object_spec, workflow_execution_id))

# Process multiple data data files into a single data object
else:
multiple_files = []
for file in map(str, self.file_list):
if re.search(data_object_spec["import_suffix"], file):
multiple_files.append(file)
if multiple_files:
workflow_execution_id = self.get_workflow_execution_id(data_object_spec["output_of"])
db.data_object_set.append(process_files(multiple_files, data_object_spec, workflow_execution_id, multiple=True))

return db, self.data_object_map


def unique_object_mapper(self) -> None:
"""
Map unique data objects from the file list based on unique matching import suffix.
The method relates each object to an workflow execution ID and updates the file with object action.
It updates the nmdc database with the DataObject and stores the information in the objects dictionary.
It updates the nmdc database with the DataObject and stores the information in self.data_object_map.
"""

for data_object_dict in self.import_data["Data Objects"]["Unique"]:
Expand Down Expand Up @@ -117,7 +254,7 @@ def unique_object_mapper(self) -> None:
),
)
)
self.objects[data_object_dict["data_object_type"]] = (
self.data_object_map[data_object_dict["data_object_type"]] = (
data_object_dict["input_to"],
[data_object_dict["output_of"]],
dobj,
Expand Down Expand Up @@ -179,7 +316,7 @@ def multiple_objects_mapper(self) -> None:
)
)

self.objects[data_object_dict["data_object_type"]] = (
self.data_object_map[data_object_dict["data_object_type"]] = (
data_object_dict["input_to"],
[data_object_dict["output_of"]],
dobj,
Expand Down Expand Up @@ -273,7 +410,7 @@ def attach_objects_to_workflow_execution(

data_object_inputs_to_list = []

for _, data_object_items in self.objects.items():
for _, data_object_items in self.data_object_map.items():
if workflow_execution_type in data_object_items[1]:
data_object_outputs_of_list.append(data_object_items[2])
elif workflow_execution_type in data_object_items[0]:
Expand Down
1 change: 1 addition & 0 deletions nmdc_automation/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Data classes for NMDC automation. """
136 changes: 136 additions & 0 deletions nmdc_automation/models/nmdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
""" Factory methods for NMDC models. """
import importlib.resources
from typing import Any, Dict, Union
import linkml_runtime
import linkml.validator

import yaml

with importlib.resources.open_text("nmdc_schema", "nmdc_materialized_patterns.yaml") as f:
nmdc_materialized = yaml.safe_load(f)


from nmdc_schema.nmdc import DataGeneration, FileTypeEnum, MagsAnalysis, MetagenomeAnnotation, MetagenomeAssembly, \
MetatranscriptomeAnnotation, MetatranscriptomeAssembly, MetatranscriptomeExpressionAnalysis, NucleotideSequencing, \
ReadBasedTaxonomyAnalysis, ReadQcAnalysis, WorkflowExecution
import nmdc_schema.nmdc as nmdc


def workflow_process_factory(record: Dict[str, Any]) -> Union[DataGeneration, WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
either a WorkflowExecution or DataGeneration object.
"""
process_types = {
"nmdc:MagsAnalysis": MagsAnalysis,
"nmdc:MetagenomeAnnotation": MetagenomeAnnotation,
"nmdc:MetagenomeAssembly": MetagenomeAssembly,
"nmdc:MetatranscriptomeAnnotation": MetatranscriptomeAnnotation,
"nmdc:MetatranscriptomeAssembly": MetatranscriptomeAssembly,
"nmdc:MetatranscriptomeExpressionAnalysis": MetatranscriptomeExpressionAnalysis,
"nmdc:NucleotideSequencing": NucleotideSequencing,
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record = _normalize_record(record)
target_class = record["type"].split(":")[1]
validation_report = linkml.validator.validate(record, nmdc_materialized, target_class)
if validation_report.results:
for result in validation_report.results:
# TODO: remove this once the schema is fixed
# ignore the members_id error for MagsAnalysis
if result.instantiates == 'MagsAnalysis' and "members_id" in result.message:
pass
else:
raise ValueError(f"Validation error: {result.message}")



try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
wfe = cls(**record)
return wfe


def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)

# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record


def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", [], "null",]
mbthornton-lbl marked this conversation as resolved.
Show resolved Hide resolved
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)


class DataObject(nmdc.DataObject):
"""
Extends the NMDC DataObject dataclass with additional methods for serialization.
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
if "type" not in record:
record["type"] = "nmdc:DataObject"
validation_report = linkml.validator.validate(record, nmdc_materialized, "DataObject")
if validation_report.results:
for result in validation_report.results:
raise ValueError(f"Validation error: {result.message}")
super().__init__(**record)

def as_dict(self):
""" Return the object as a dictionary, excluding None values, empty lists, and data_object_type as a string """
return {
key: value
for key, value in self.__dict__.items()
if not key.startswith("_") and value
} | {"data_object_type": self.data_object_type}

@property
def data_object_type(self):
""" Return the data object type as a string """
if isinstance(self._data_object_type, FileTypeEnum):
return self._data_object_type.code.text
return str(self._data_object_type)

@data_object_type.setter
def data_object_type(self, value):
""" Set the data object type from a string or FileTypeEnum """
if isinstance(value, FileTypeEnum):
self._data_object_type = value
else:
self._data_object_type = FileTypeEnum(value)
Loading
Loading