Skip to content

Commit

Permalink
update mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Jan 6, 2025
1 parent 5c9a04e commit 3694683
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
16 changes: 15 additions & 1 deletion nmdc_automation/import_automation/import_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
from typing import Dict, List, Union, Optional
import yaml
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -21,6 +22,7 @@ class ImportMapper:
"""
METAGENOME_RAW_READS = "Metagenome Raw Reads"
NMDC_DATA_OBJECT_TYPE = "nmdc:DataObject"
MAPPING_FILE = "id_mapping.json"

def __init__(self, nucleotide_sequencing_id: str, import_project_dir: str, import_yaml: str):
self.nucleotide_sequencing_id = nucleotide_sequencing_id
Expand All @@ -30,6 +32,11 @@ def __init__(self, nucleotide_sequencing_id: str, import_project_dir: str, impor
self._import_files = [
f for f in os.listdir(self.import_project_dir) if os.path.isfile(os.path.join(self.import_project_dir, f))
]
self.id_mappings = {} # old ID -> NMDC ID mapping
if os.path.exists(self.MAPPING_FILE):
with open(self.MAPPING_FILE, 'r') as f:
self.id_mapping = json.load(f)


@property
def import_specifications(self) -> Dict:
Expand Down Expand Up @@ -72,11 +79,18 @@ def file_mappings(self) -> List:
def file_mappings_by_data_object_type(self) -> Dict:
"""Return the file mappings by data object type."""
file_mappings = {
fm['data_object_type']: fm for fm in self._file_mappings
fm.data_object_type: fm for fm in self._file_mappings
}
return file_mappings


def update_file_mappings(self, data_object_type: str, workflow_execution_id: str) -> None:
for do_type, fm in self.file_mappings_by_data_object_type.items():
if do_type.upper() == data_object_type.upper():
fm.workflow_execution_id = workflow_execution_id



def _init_file_mappings(self) -> List:
"""Create the initial list of File Mapping based on the import files."""
file_mappings = []
Expand Down
38 changes: 23 additions & 15 deletions nmdc_automation/run_process/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from nmdc_automation.import_automation.import_mapper import ImportMapper
from nmdc_schema.nmdc import Database

MAPPING_FILE = "id_mapping.json"



@click.group()
Expand Down Expand Up @@ -44,12 +44,7 @@ def import_projects(ctx, import_file, import_yaml, site_configuration):

runtime = NmdcRuntimeApi(site_configuration)
nmdc_materialized = _get_nmdc_materialized()
# load existing ID mappings
if os.path.exists(MAPPING_FILE):
with open(MAPPING_FILE, 'r') as f:
id_mapping = json.load(f)
else:
id_mapping = {}


data_imports = _parse_tsv(import_file)
for data_import in data_imports:
Expand All @@ -62,9 +57,9 @@ def import_projects(ctx, import_file, import_yaml, site_configuration):
logger.info(f"Project has {len(import_mapper._import_files)} files")
file_mappings = import_mapper.file_mappings # This will create and cache the file mappings
logger.info(f"Mapped: {len(file_mappings)} files")
for fm in file_mappings:
logger.info(f"Mapping: {fm}")



# Data Generation Object
# Retrieve it from the Database. Check that there is only 1
logger.info(f"Searching for {nucleotide_sequencing_id} in the database")
Expand All @@ -78,6 +73,10 @@ def import_projects(ctx, import_file, import_yaml, site_configuration):
dg = dg_objs[0]
logger.info(f"Found {nucleotide_sequencing_id} in the database - checking output")

# init a db to hold workflow executions and their data objects, one per Data Generation
db = Database()
has_output_update = {}

# Sequencing Output - check for NMDC data object in Data Generation has_output
# Mint a new Data Object and Update Data Generation if has_output is empty or has a non-NMDC ID
dg_output = dg.get('has_output', [])
Expand All @@ -87,13 +86,22 @@ def import_projects(ctx, import_file, import_yaml, site_configuration):

if len(dg_output) == 0:
logger.info(f"{nucleotide_sequencing_id} has no output")
logger.info(f"Importing sequencing data for {nucleotide_sequencing_id}")
logger.info(f"Importing sequencing data and creating update for {nucleotide_sequencing_id}")
import_mapper.update_file_mappings(import_mapper.METAGENOME_RAW_READS, nucleotide_sequencing_id)

elif dg_output and dg_output[0].startswith('nmdc:dobj'):
logger.info(f"Found a non-NMDC data object as sequencing output: {dg_output[0]}")
logger.info(f"Importing sequencing data and replacing {dg_output[0]} with NMDC ID")
else:
logger.info(f"{nucleotide_sequencing_id} has output - skipping sequencing data import")
logger.info(f"{nucleotide_sequencing_id} has output: {dg_output[0]} - skipping sequencing data import")
pass
else: # shouldn't really happen
logger.info(f"{nucleotide_sequencing_id} has non-NMDC output: {dg_output[0]}")
logger.info(f"Importing sequencing data and creating update for {dg_output[0]}")
import_mapper.update_file_mappings(import_mapper.METAGENOME_RAW_READS, nucleotide_sequencing_id)



for fm in file_mappings:
logger.info(f"Mapping: {fm}")




Expand Down

0 comments on commit 3694683

Please sign in to comment.