Skip to content

Commit

Permalink
Merge pull request #262 from microbiomedata/251-missing-keys-in-workf…
Browse files Browse the repository at this point in the history
…low-records

251 missing keys in workflow records
  • Loading branch information
mbthornton-lbl authored Oct 10, 2024
2 parents eb56382 + 9230913 commit 1ac043e
Show file tree
Hide file tree
Showing 48 changed files with 5,292 additions and 727 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ attic
private
configs/.local_*
attic/data/dryrun_data/
nmdc_automation/workflow_automation/_state/*.state
nmdc_automation/workflow_automation/_state/*.json

# Ignore `coverage.xml` file in this directory.
/coverage.xml
2 changes: 1 addition & 1 deletion configs/import.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Workflows:
Import: false
Type: nmdc:MagsAnalysis
Git_repo: https://github.com/microbiomedata/metaMAGs
Version: v1.0.6
Version: v1.3.11
Collection: workflow_execution_set
WorkflowExecutionRange: MagsAnalysis
Inputs:
Expand Down
2 changes: 1 addition & 1 deletion nmdc_automation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .api import nmdcapi
from .config import config
from .config import siteconfig
from .import_automation import activity_mapper
from .workflow_automation import watch_nmdc, wfutils, workflows, workflow_process
4 changes: 2 additions & 2 deletions nmdc_automation/api/jawsapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import requests
import uuid
from nmdc_automation.config import Config
from nmdc_automation.config import SiteConfig

_base_url = "http://jaws.lbl.gov:5003/api/v2"
_base_in = "/pscratch/sd/n/nmjaws/nmdc-prod/inputs"
Expand All @@ -14,7 +14,7 @@
class JawsApi:

def __init__(self, site_configuration):
self.config = Config(site_configuration)
self.config = SiteConfig(site_configuration)
self._base_url = self.config.api_url
self.client_id = self.config.client_id
self.client_secret = self.config.client_secret
Expand Down
7 changes: 4 additions & 3 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from time import time
from typing import Union, List
from datetime import datetime, timedelta, timezone
from nmdc_automation.config import Config, UserConfig
from nmdc_automation.config import SiteConfig, UserConfig
import logging


Expand Down Expand Up @@ -50,9 +50,9 @@ class NmdcRuntimeApi:
client_id = None
client_secret = None

def __init__(self, site_configuration: Union[str, Path, Config]):
def __init__(self, site_configuration: Union[str, Path, SiteConfig]):
if isinstance(site_configuration, str) or isinstance(site_configuration, Path):
site_configuration = Config(site_configuration)
site_configuration = SiteConfig(site_configuration)
self.config = site_configuration
self._base_url = self.config.api_url
self.client_id = self.config.client_id
Expand Down Expand Up @@ -211,6 +211,7 @@ def bump_time(self, obj):
resp = requests.patch(url, headers=self.header, data=json.dumps(d))
return resp.json()

# TODO test that this concatenates multi-page results
@refresh_token
def list_jobs(self, filt=None, max=100) -> List[dict]:
url = "%sjobs?max_page_size=%s" % (self._base_url, max)
Expand Down
2 changes: 1 addition & 1 deletion nmdc_automation/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .config import Config, UserConfig
from .siteconfig import SiteConfig, UserConfig
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from pathlib import Path
import tomli
from typing import Union
import yaml
import os
from pathlib import Path
import warnings

WORKFLOWS_DIR = Path(__file__).parent / "workflows"

class UserConfig:
def __init__(self, path):
warnings.warn(
"UserConfig is deprecated and will be removed in a future release. Use SiteConfig instead.",
DeprecationWarning,
stacklevel=2,
)
with open(path, "rb") as file:
self.config_data = tomli.load(file)

Expand All @@ -24,7 +28,7 @@ def username(self):
def password(self):
return self.config_data["api"]["password"]

class Config:
class SiteConfig:
def __init__(self, path: Union[str, Path]):
with open(path, "rb") as file:
self.config_data = tomli.load(file)
Expand Down Expand Up @@ -75,7 +79,7 @@ def watch_state(self):

@property
def agent_state(self):
return self.config_data["state"]["agent_state"]
return self.config_data.get("state", {}).get("agent_state", None)

@property
def activity_id_state(self):
Expand Down
2 changes: 1 addition & 1 deletion nmdc_automation/config/workflows/workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ Workflows:
Enabled: True
Analyte Category: Metagenome
Git_repo: https://github.com/microbiomedata/metaMAGs
Version: v1.3.10
Version: v1.3.11
WDL: mbin_nmdc.wdl
Collection: workflow_execution_set
Predecessors:
Expand Down
2 changes: 1 addition & 1 deletion nmdc_automation/run_process/run_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def resubmit(ctx, activity_ids):
else:
key = "activity_id"
for found_job in watcher.jobs:
job_record = found_job.get_state()
job_record = found_job.state()
if job_record[key] == act_id:
job = found_job
break
Expand Down
1 change: 0 additions & 1 deletion nmdc_automation/workflow_automation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .watch_nmdc import Watcher
from .workflows import load_workflow_configs
from .wfutils import WorkflowJob, NmdcSchema
50 changes: 45 additions & 5 deletions nmdc_automation/workflow_automation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Dict, Any, Optional, Set, Union

from nmdc_schema.nmdc import (
DataGeneration,
FileTypeEnum,
NucleotideSequencing,
MagsAnalysis,
Expand All @@ -13,15 +14,14 @@
MetatranscriptomeAssembly,
MetatranscriptomeAnnotation,
MetatranscriptomeExpressionAnalysis,
PlannedProcess,
ReadBasedTaxonomyAnalysis,
ReadQcAnalysis,
WorkflowExecution
)
from nmdc_schema import nmdc


def workflow_process_factory(record: Dict[str, Any]) -> PlannedProcess:
def workflow_process_factory(record: Dict[str, Any]) -> Union[DataGeneration, WorkflowExecution]:
"""
Factory function to create a PlannedProcess subclass object from a record.
Subclasses are determined by the "type" field in the record, and can be
Expand All @@ -38,12 +38,53 @@ def workflow_process_factory(record: Dict[str, Any]) -> PlannedProcess:
"nmdc:ReadBasedTaxonomyAnalysis": ReadBasedTaxonomyAnalysis,
"nmdc:ReadQcAnalysis": ReadQcAnalysis,
}
record.pop("_id", None)
record = _normalize_record(record)

try:
cls = process_types[record["type"]]
except KeyError:
raise ValueError(f"Invalid workflow execution type: {record['type']}")
return cls(**record)
wfe = cls(**record)
return wfe

def _normalize_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record by removing the _id field and converting the type field to a string """
record.pop("_id", None)
# for backwards compatibility strip Activity from the end of the type
record["type"] = record["type"].replace("Activity", "")
normalized_record = _strip_empty_values(record)

# type-specific normalization
if normalized_record["type"] == "nmdc:MagsAnalysis":
normalized_record = _normalize_mags_record(normalized_record)

return normalized_record

def _normalize_mags_record(record: Dict[str, Any]) -> Dict[str, Any]:
""" Normalize the record for a MagsAnalysis object """
for i, mag in enumerate(record.get("mags_list", [])):
if not mag.get("type"):
# Update the original dictionary in the list
record["mags_list"][i]["type"] = "nmdc:MagBin"
# for backwards compatibility normalize num_tRNA to num_t_rna
if "num_tRNA" in mag:
record["mags_list"][i]["num_t_rna"] = mag.pop("num_tRNA")
# add type to eukaryotic_evaluation if it exists
if "eukaryotic_evaluation" in mag:
record["mags_list"][i]["eukaryotic_evaluation"]["type"] = "nmdc:EukEval"
return record


def _strip_empty_values(d: Dict[str, Any]) -> Dict[str, Any]:
""" Strip empty values from a record """
empty_values = [None, "", [], "null", 0]
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v not in empty_values}
elif isinstance(d, list):
return [clean_dict(v) for v in d if v not in empty_values]
return d
return clean_dict(d)


class WorkflowProcessNode(object):
Expand Down Expand Up @@ -203,7 +244,6 @@ def add_parent(self, parent: "WorkflowConfig"):
self.parents.add(parent)



@dataclass
class JobWorkflow:
id: str
Expand Down
Loading

0 comments on commit 1ac043e

Please sign in to comment.