Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

239 update import logic to update omicsprocessing record #274

Merged
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7777dcf
add test fixtures
mbthornton-lbl Oct 15, 2024
80c6bdc
add test for DataObject construction w invalid D.O Type
mbthornton-lbl Oct 16, 2024
52ea807
add test case for data object with invalid data category
mbthornton-lbl Oct 16, 2024
1b3bd9a
Refactor to move model classes
mbthornton-lbl Oct 16, 2024
d611c2c
incorporate linkml.validate into workflow_process_factory
mbthornton-lbl Oct 17, 2024
00b335a
implement map_sequencing_data with test
mbthornton-lbl Oct 17, 2024
cba6264
add map_data method and unit tests
mbthornton-lbl Oct 17, 2024
6aa29aa
update DataObject and serialization
mbthornton-lbl Oct 18, 2024
cf0b4cc
delete legacy warning
mbthornton-lbl Oct 18, 2024
3235112
update click script
mbthornton-lbl Oct 18, 2024
6ba615e
get validation working in run_import
mbthornton-lbl Oct 18, 2024
42f8dca
add update and post objects api calls
mbthornton-lbl Oct 18, 2024
0e1e3e5
clean up unused methods in activity mapper
mbthornton-lbl Oct 18, 2024
5a58e57
Do not create URL for imported sequencing data objects
mbthornton-lbl Oct 25, 2024
2a5965b
remove Metatranscriptome Raw Reads from sequencing types
mbthornton-lbl Oct 25, 2024
2907597
Remove profiler output that was mistakenly checked in
mbthornton-lbl Oct 25, 2024
7df1837
add prof to .gitignore
mbthornton-lbl Oct 25, 2024
8da0ceb
do not strip out "null" as an empty value
mbthornton-lbl Oct 25, 2024
05507fe
Handle gene_count = "null" case for mag bin
mbthornton-lbl Oct 25, 2024
e4b6b63
delete unused schema file
mbthornton-lbl Oct 25, 2024
b93775c
update test description
mbthornton-lbl Oct 25, 2024
a2a558f
modify the generated update query for DataGen to not clobber any has_…
mbthornton-lbl Oct 25, 2024
b19ef6c
ensure we are not adding dupes to has_output
mbthornton-lbl Oct 28, 2024
12db033
update unit test
mbthornton-lbl Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ nmdc_automation/workflow_automation/_state/*.json

# Ignore `coverage.xml` file in this directory.
/coverage.xml
prof/
1 change: 1 addition & 0 deletions nmdc_automation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .config import siteconfig
from .import_automation import activity_mapper
from .workflow_automation import watch_nmdc, wfutils, workflows, workflow_process
from .models import workflow, nmdc
4 changes: 4 additions & 0 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def post_objects(self, obj_data):
url = self._base_url + "workflows/workflow_executions"

resp = requests.post(url, headers=self.header, data=json.dumps(obj_data))
if not resp.ok:
resp.raise_for_status()
return resp.json()

@refresh_token
Expand Down Expand Up @@ -320,6 +322,8 @@ def update_op(self, opid, done=None, results=None, meta=None):
def run_query(self, query):
url = "%squeries:run" % self._base_url
resp = requests.post(url, headers=self.header, data=json.dumps(query))
if not resp.ok:
resp.raise_for_status()
return resp.json()


Expand Down
351 changes: 180 additions & 171 deletions nmdc_automation/import_automation/activity_mapper.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nmdc_automation/import_automation/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def file_link(
linked_path = os.path.join(destination_dir, updated_file)

try:
os.link(original_path, linked_path)
os.link(import_file, linked_path)
except FileExistsError:
logger.info(f"{linked_path} already exists")

Expand Down
1 change: 1 addition & 0 deletions nmdc_automation/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Data classes for NMDC automation. """
119 changes: 119 additions & 0 deletions nmdc_automation/models/nmdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
""" Factory methods for NMDC models. """
import importlib.resources
from typing import Any, Dict, Union
import linkml_runtime
import linkml.validator
import importlib.resources
from functools import lru_cache
from linkml_runtime.dumpers import yaml_dumper
import yaml


from nmdc_schema.nmdc import DataGeneration, FileTypeEnum, MagsAnalysis, MetagenomeAnnotation, MetagenomeAssembly, \
MetatranscriptomeAnnotation, MetatranscriptomeAssembly, MetatranscriptomeExpressionAnalysis, NucleotideSequencing, \
ReadBasedTaxonomyAnalysis, ReadQcAnalysis, WorkflowExecution
import nmdc_schema.nmdc as nmdc


@lru_cache(maxsize=None)
def get_nmdc_materialized():
with importlib.resources.open_text("nmdc_schema", "nmdc_materialized_patterns.yaml") as f:
return yaml.safe_load(f)

def workflow_process_factory(record: Dict[str, Any], validate: bool = False) -> Union[DataGeneration,
WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
either a WorkflowExecution or DataGeneration object.
"""
nmdc_materialized = get_nmdc_materialized()
process_types = {
"nmdc:MagsAnalysis": MagsAnalysis,
"nmdc:MetagenomeAnnotation": MetagenomeAnnotation,
"nmdc:MetagenomeAssembly": MetagenomeAssembly,
"nmdc:MetatranscriptomeAnnotation": MetatranscriptomeAnnotation,
"nmdc:MetatranscriptomeAssembly": MetatranscriptomeAssembly,
"nmdc:MetatranscriptomeExpressionAnalysis": MetatranscriptomeExpressionAnalysis,
"nmdc:NucleotideSequencing": NucleotideSequencing,
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record = _normalize_record(record)
target_class = record["type"].split(":")[1]
if validate:
validation_report = linkml.validator.validate(record, nmdc_materialized, target_class)
if validation_report.results:
raise ValueError(f"Validation error: {validation_report.results[0].message}")




try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
wfe = cls(**record)
return wfe


def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)


# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record


def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
# gene count should be a positive integer - remove if 'null'
if "gene_count" in mag and mag["gene_count"] == "null":
mag.pop("gene_count")
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", []]
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)


class DataObject(nmdc.DataObject):
"""
Extends the NMDC DataObject dataclass with additional methods for serialization.
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

def as_dict(self) -> Dict[str, Any]:
""" Convert the object to a dictionary """
return yaml.safe_load(yaml_dumper.dumps(self))
Original file line number Diff line number Diff line change
@@ -1,90 +1,11 @@
""" Model classes for the workflow automation app. """
""" Data classed for NMDC workflow automation. """
from dataclasses import dataclass, field
from dateutil import parser
from datetime import datetime
from typing import List, Dict, Any, Optional, Set, Union

from nmdc_schema.nmdc import (
DataGeneration,
FileTypeEnum,
NucleotideSequencing,
MagsAnalysis,
MetagenomeAssembly,
MetagenomeAnnotation,
MetatranscriptomeAssembly,
MetatranscriptomeAnnotation,
MetatranscriptomeExpressionAnalysis,
ReadBasedTaxonomyAnalysis,
ReadQcAnalysis,
WorkflowExecution
)
from nmdc_schema import nmdc


def workflow_process_factory(record: Dict[str, Any]) -> Union[DataGeneration, WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
either a WorkflowExecution or DataGeneration object.
"""
process_types = {
"nmdc:MagsAnalysis": MagsAnalysis,
"nmdc:MetagenomeAnnotation": MetagenomeAnnotation,
"nmdc:MetagenomeAssembly": MetagenomeAssembly,
"nmdc:MetatranscriptomeAnnotation": MetatranscriptomeAnnotation,
"nmdc:MetatranscriptomeAssembly": MetatranscriptomeAssembly,
"nmdc:MetatranscriptomeExpressionAnalysis": MetatranscriptomeExpressionAnalysis,
"nmdc:NucleotideSequencing": NucleotideSequencing,
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record = _normalize_record(record)

try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
wfe = cls(**record)
return wfe

def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)

# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record

def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", [], "null", 0]
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)
from typing import Any, Dict, List, Optional, Set

from dateutil import parser

from nmdc_automation.models.nmdc import DataObject, workflow_process_factory


class WorkflowProcessNode(object):
Expand All @@ -108,7 +29,7 @@ def __eq__(self, other):
return self.id == other.id and self.type == other.type

def add_data_object(self, data_object):
self.data_objects_by_type[data_object.data_object_type] = data_object
self.data_objects_by_type[data_object.data_object_type.code.text] = data_object

@property
def id(self):
Expand Down Expand Up @@ -151,42 +72,6 @@ def was_informed_by(self):
return getattr(self.process, "was_informed_by", self.id)


class DataObject(nmdc.DataObject):
"""
Extends the NMDC DataObject dataclass with additional methods for serialization.
"""
def __init__(self, **record):
""" Initialize the object from a dictionary """
# _id is a MongoDB field that makes the parent class fail to initialize
record.pop("_id", None)
if "type" not in record:
record["type"] = "nmdc:DataObject"
super().__init__(**record)

def as_dict(self):
""" Return the object as a dictionary, excluding None values, empty lists, and data_object_type as a string """
return {
key: value
for key, value in self.__dict__.items()
if not key.startswith("_") and value
} | {"data_object_type": self.data_object_type}

@property
def data_object_type(self):
""" Return the data object type as a string """
if isinstance(self._data_object_type, FileTypeEnum):
return self._data_object_type.code.text
return str(self._data_object_type)

@data_object_type.setter
def data_object_type(self, value):
""" Set the data object type from a string or FileTypeEnum """
if isinstance(value, FileTypeEnum):
self._data_object_type = value
else:
self._data_object_type = FileTypeEnum(value)


@dataclass
class WorkflowConfig:
""" Configuration for a workflow execution. Defined by .yaml files in nmdc_automation/config/workflows """
Expand Down Expand Up @@ -248,6 +133,7 @@ def add_parent(self, parent: "WorkflowConfig"):
class JobWorkflow:
id: str


@dataclass
class JobConfig:
""" Represents a job configuration from the NMDC API jobs endpoint / MongoDB jobs collection """
Expand All @@ -271,6 +157,7 @@ class JobClaim:
op_id: str
site_id: str


@dataclass
class JobOutput:
""" Represents a job output specification. """
Expand All @@ -292,6 +179,7 @@ def __post_init__(self):
description=self.description,
)


@dataclass
class Job:
""" Represents a job from the NMDC API jobs endpoint / MongoDB jobs collection """
Expand Down
Loading
Loading