Skip to content

Commit

Permalink
Merge pull request #274 from microbiomedata/239-update-import-logic-t…
Browse files Browse the repository at this point in the history
…o-update-omicsprocessing-record

239 update import logic to update omicsprocessing record
  • Loading branch information
mbthornton-lbl authored Oct 28, 2024
2 parents 495142f + 12db033 commit c87b294
Show file tree
Hide file tree
Showing 33 changed files with 1,204 additions and 790 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ nmdc_automation/workflow_automation/_state/*.json

# Ignore `coverage.xml` file in this directory.
/coverage.xml
prof/
1 change: 1 addition & 0 deletions nmdc_automation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .config import siteconfig
from .import_automation import activity_mapper
from .workflow_automation import watch_nmdc, wfutils, workflows, workflow_process
from .models import workflow, nmdc
4 changes: 4 additions & 0 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def post_objects(self, obj_data):
url = self._base_url + "workflows/workflow_executions"

resp = requests.post(url, headers=self.header, data=json.dumps(obj_data))
if not resp.ok:
resp.raise_for_status()
return resp.json()

@refresh_token
Expand Down Expand Up @@ -320,6 +322,8 @@ def update_op(self, opid, done=None, results=None, meta=None):
def run_query(self, query):
url = "%squeries:run" % self._base_url
resp = requests.post(url, headers=self.header, data=json.dumps(query))
if not resp.ok:
resp.raise_for_status()
return resp.json()


Expand Down
351 changes: 180 additions & 171 deletions nmdc_automation/import_automation/activity_mapper.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nmdc_automation/import_automation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def file_link(
linked_path = os.path.join(destination_dir, updated_file)

try:
os.link(original_path, linked_path)
os.link(import_file, linked_path)
except FileExistsError:
logger.info(f"{linked_path} already exists")

Expand Down
1 change: 1 addition & 0 deletions nmdc_automation/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Data classes for NMDC automation. """
119 changes: 119 additions & 0 deletions nmdc_automation/models/nmdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
""" Factory methods for NMDC models. """
import importlib.resources
from typing import Any, Dict, Union
import linkml_runtime
import linkml.validator
import importlib.resources
from functools import lru_cache
from linkml_runtime.dumpers import yaml_dumper
import yaml


from nmdc_schema.nmdc import DataGeneration, FileTypeEnum, MagsAnalysis, MetagenomeAnnotation, MetagenomeAssembly, \
MetatranscriptomeAnnotation, MetatranscriptomeAssembly, MetatranscriptomeExpressionAnalysis, NucleotideSequencing, \
ReadBasedTaxonomyAnalysis, ReadQcAnalysis, WorkflowExecution
import nmdc_schema.nmdc as nmdc


@lru_cache(maxsize=None)
def get_nmdc_materialized():
with importlib.resources.open_text("nmdc_schema", "nmdc_materialized_patterns.yaml") as f:
return yaml.safe_load(f)

def workflow_process_factory(record: Dict[str, Any], validate: bool = False) -> Union[DataGeneration,
WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
either a WorkflowExecution or DataGeneration object.
"""
nmdc_materialized = get_nmdc_materialized()
process_types = {
"nmdc:MagsAnalysis": MagsAnalysis,
"nmdc:MetagenomeAnnotation": MetagenomeAnnotation,
"nmdc:MetagenomeAssembly": MetagenomeAssembly,
"nmdc:MetatranscriptomeAnnotation": MetatranscriptomeAnnotation,
"nmdc:MetatranscriptomeAssembly": MetatranscriptomeAssembly,
"nmdc:MetatranscriptomeExpressionAnalysis": MetatranscriptomeExpressionAnalysis,
"nmdc:NucleotideSequencing": NucleotideSequencing,
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record = _normalize_record(record)
target_class = record["type"].split(":")[1]
if validate:
validation_report = linkml.validator.validate(record, nmdc_materialized, target_class)
if validation_report.results:
raise ValueError(f"Validation error: {validation_report.results[0].message}")




try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
wfe = cls(**record)
return wfe


def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)


# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record


def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
# gene count should be a positive integer - remove if 'null'
if "gene_count" in mag and mag["gene_count"] == "null":
mag.pop("gene_count")
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", []]
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)


class DataObject(nmdc.DataObject):
"""
Extends the NMDC DataObject dataclass with additional methods for serialization.
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

def as_dict(self) -> Dict[str, Any]:
""" Convert the object to a dictionary """
return yaml.safe_load(yaml_dumper.dumps(self))
Original file line number Diff line number Diff line change
@@ -1,90 +1,11 @@
""" Model classes for the workflow automation app. """
""" Data classed for NMDC workflow automation. """
from dataclasses import dataclass, field
from dateutil import parser
from datetime import datetime
from typing import List, Dict, Any, Optional, Set, Union

from nmdc_schema.nmdc import (
DataGeneration,
FileTypeEnum,
NucleotideSequencing,
MagsAnalysis,
MetagenomeAssembly,
MetagenomeAnnotation,
MetatranscriptomeAssembly,
MetatranscriptomeAnnotation,
MetatranscriptomeExpressionAnalysis,
ReadBasedTaxonomyAnalysis,
ReadQcAnalysis,
WorkflowExecution
)
from nmdc_schema import nmdc


def workflow_process_factory(record: Dict[str, Any]) -> Union[DataGeneration, WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
either a WorkflowExecution or DataGeneration object.
"""
process_types = {
"nmdc:MagsAnalysis": MagsAnalysis,
"nmdc:MetagenomeAnnotation": MetagenomeAnnotation,
"nmdc:MetagenomeAssembly": MetagenomeAssembly,
"nmdc:MetatranscriptomeAnnotation": MetatranscriptomeAnnotation,
"nmdc:MetatranscriptomeAssembly": MetatranscriptomeAssembly,
"nmdc:MetatranscriptomeExpressionAnalysis": MetatranscriptomeExpressionAnalysis,
"nmdc:NucleotideSequencing": NucleotideSequencing,
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record = _normalize_record(record)

try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
wfe = cls(**record)
return wfe

def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)

# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record

def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", [], "null", 0]
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)
from typing import Any, Dict, List, Optional, Set

from dateutil import parser

from nmdc_automation.models.nmdc import DataObject, workflow_process_factory


class WorkflowProcessNode(object):
Expand All @@ -108,7 +29,7 @@ def __eq__(self, other):
return self.id == other.id and self.type == other.type

def add_data_object(self, data_object):
self.data_objects_by_type[data_object.data_object_type] = data_object
self.data_objects_by_type[data_object.data_object_type.code.text] = data_object

@property
def id(self):
Expand Down Expand Up @@ -151,42 +72,6 @@ def was_informed_by(self):
return getattr(self.process, "was_informed_by", self.id)


class DataObject(nmdc.DataObject):
"""
Extends the NMDC DataObject dataclass with additional methods for serialization.
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

def as_dict(self):
""" Return the object as a dictionary, excluding None values, empty lists, and data_object_type as a string """
return {
key: value
for key, value in self.__dict__.items()
if not key.startswith("_") and value
} | {"data_object_type": self.data_object_type}

@property
def data_object_type(self):
""" Return the data object type as a string """
if isinstance(self._data_object_type, FileTypeEnum):
return self._data_object_type.code.text
return str(self._data_object_type)

@data_object_type.setter
def data_object_type(self, value):
""" Set the data object type from a string or FileTypeEnum """
if isinstance(value, FileTypeEnum):
self._data_object_type = value
else:
self._data_object_type = FileTypeEnum(value)


@dataclass
class WorkflowConfig:
""" Configuration for a workflow execution. Defined by .yaml files in nmdc_automation/config/workflows """
Expand Down Expand Up @@ -248,6 +133,7 @@ def add_parent(self, parent: "WorkflowConfig"):
class JobWorkflow:
id: str


@dataclass
class JobConfig:
""" Represents a job configuration from the NMDC API jobs endpoint / MongoDB jobs collection """
Expand All @@ -271,6 +157,7 @@ class JobClaim:
op_id: str
site_id: str


@dataclass
class JobOutput:
""" Represents a job output specification. """
Expand All @@ -292,6 +179,7 @@ def __post_init__(self):
description=self.description,
)


@dataclass
class Job:
""" Represents a job from the NMDC API jobs endpoint / MongoDB jobs collection """
Expand Down
Loading

0 comments on commit c87b294

Please sign in to comment.