From c2eeb64cf5eccbe9eb1320916d2d9ce7c55f04a2 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Thu, 7 Nov 2024 13:59:20 -0800 Subject: [PATCH 01/16] Start to refactor metaproteomics aggregation class --- README.md | 2 + generate_metap_agg.py | 508 +++++++++++++++++++++++++++++------------- 2 files changed, 356 insertions(+), 154 deletions(-) diff --git a/README.md b/README.md index c29dba9..75f7503 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,8 @@ Now that the container image is hosted there, you can configure a Spin workload - `POLL_TIME`: Number of seconds to sleep between each run (Default: `14400`, which is 4 hours) - `NMDC_BASE_URL`: Base URL to access the data (Default: `https://data.microbiomedata.org/data`) - `NMDC_BASE_PATH`: Base path to the data on disk (Default: `/global/cfs/cdirs/m3408/results`) +- `NMDC_API_DEV_BEARER_TOKEN`: Bearer token for NMDC's runtime dev API for submitting json records (no default) +- `NMDC_API_BEARER_TOKEN`: Bearer token for NMDC's runtime API for submitting json records (no default) ## Release Notes diff --git a/generate_metap_agg.py b/generate_metap_agg.py index eaba0a8..965439f 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -1,104 +1,281 @@ -import os import requests -from pymongo import MongoClient - - -DEBUG_MODE = False +import csv +import io +import os +import time +# TODO KRH: Change metagmetagenome_anlaysis_id to was_generated_by throughout after https://github.com/microbiomedata/nmdc-schema/pull/2203 has been merged -class AnnotationLine(): +class MetaProtAgg: """ - Object representation built from a GFF annotation line + MetaP Aggregation class - The object countains a list of annotations for KEGG, - COG, EC, and product. Currently on the KEGG annotations - are used. - """ + Parameters + ---------- + dev : bool + Flag to indicate if production or development API should be used + Default is True, which uses the development API - def __init__(self, line, filter=None): - self.id = None - self.kegg = None - self.cogs = None - self.product = None - self.ec_numbers = None - - if line.find("ko=") > 0: - annotations = line.split("\t")[8].split(";") - self.id = annotations[0][3:] - if filter and self.id not in filter: - return - - for anno in annotations: - if anno.startswith("ko="): - kos = anno[3:].replace("KO:", "KEGG.ORTHOLOGY:") - self.kegg = kos.rstrip().split(',') - elif anno.startswith("cog="): - self.cogs = anno[4:].split(',') - elif anno.startswith("product="): - self.product = anno[8:] - elif anno.startswith("ec_number="): - self.ec_numbers = anno[10:].split(",") - - -class MetaProtAgg(): - """ - MetaP Aggregation class + Attributes + ---------- + base_url : str + Base URL for the API + nmdc_api_token : str + API token to access the API + + Notes + ----- + This class is used to aggregate functional annotations from metaproteomics activities in the NMDC database. + There must be an environment variable called NMDC_API_TOKEN that contains the API token to access the API. """ - _BASE_URL_ENV = "NMDC_BASE_URL" - _base_url = "https://data.microbiomedata.org/data" - _BASE_PATH_ENV = "NMDC_BASE_PATH" - _base_dir = "/global/cfs/cdirs/m3408/results" - - def __init__(self): - url = os.environ["MONGO_URL"] - client = MongoClient(url, directConnection=True) - self.db = client.nmdc - self.agg_col = self.db.metap_gene_function_aggregation - self.act_col = self.db.workflow_execution_set - self.do_col = self.db.data_object_set - self.base_url = os.environ.get(self._BASE_URL_ENV, self._base_url) - self.base_dir = os.environ.get(self._BASE_PATH_ENV, self._base_dir) - - def get_kegg_terms(self, url, gene_list): - kos = {} - fn = url.replace(self.base_url, self.base_dir) - - if os.path.exists(fn): - lines = open(fn) + + def __init__(self, dev=True): + if dev: + self.base_url = "https://api-dev.microbiomedata.org" + self.nmdc_api_token = os.getenv("NMDC_API_DEV_BEARER_TOKEN") else: - s = requests.Session() - resp = s.get(url, headers=None, stream=True) - if not resp.ok: - raise OSError(f"Failed to read {url}") - lines = resp.iter_lines() - - for line in lines: - if isinstance(line, bytes): - line = line.decode() - anno = AnnotationLine(line, gene_list) - if anno.kegg: - kos[anno.id] = anno.kegg - return kos - - def find_anno(self, dos): - """ - Find the GFF annotation URL - We use the protein file to get the base part of the URL. - input: list of data object IDs - returns: GFF functional annotation URL + self.base_url = "https://api.microbiomedata.org" + self.nmdc_api_token = os.getenv("NMDC_API_BEARER_TOKEN") + + def get_results( + self, collection: str, filter="", max_page_size=100, fields="" + ): + """General function to get results from the API using the collection endpoint with optional filter and fields + + Parameters + ---------- + collection : str + Collection name to query, e.g. "functional_annotation_agg" + filter : str, optional + Filter to apply to the query written in JSON format for MongoDB + e.g. '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + Default is an empty string, which does not apply a filter + max_page_size : int, optional + Maximum number of records to return in a single page + Default is 100 + fields : str, optional + Fields to return in the query, separated by commas without spaces if multiple + e.g. "id,data_object_type,url" + Default is an empty string, which returns all fields + + """ + + # Get initial results (before next_page_token is given in the results) + result_list = [] + og_url = f"{self.base_url}/nmdcschema/{collection}?&filter={filter}&max_page_size={max_page_size}&projection={fields}" + resp = requests.get(og_url) + initial_data = resp.json() + results = initial_data.get("resources", []) + + if results == []: + # if no results are returned + return result_list + + # append first page of results to an empty list + for result in results: + result_list.append(result) + + # if there are multiple pages of results returned + if initial_data.get("next_page_token"): + next_page_token = initial_data["next_page_token"] + + while True: + url = f"{self.base_url}/nmdcschema/{collection}?&filter={filter}&max_page_size={max_page_size}&next_page_token={next_page_token}&projection={fields}" + response = requests.get(url) + data_next = response.json() + + results = data_next.get("resources", []) + result_list.extend(results) + next_page_token = data_next.get("next_page_token") + + if not next_page_token: + break + + return result_list + + def get_previously_aggregated_records(self): + """ + Function to return all ids of metaproteomics activity records that have already been aggregated + + Returns + ------- + list + List of metagenome_annotation_ids that have already been aggregated + """ + agg_col = self.get_results( + collection="functional_annotation_agg", + filter='{"metagenome_annotation_id":{"$regex":"wfmp"}}', + max_page_size=1000, + fields="metagenome_annotation_id", + ) + ids = list(set([x["metagenome_annotation_id"] for x in agg_col])) + return ids + + def get_activity_records(self): + """ + Function to return full metaproteomics activity records in the database + + Returns + ------- + list + List of metaproteomics activity records + """ + act_col = self.get_results( + collection="workflow_execution_set", + filter='{"type":"nmdc:MetaproteomicsAnalysis"}', + max_page_size=1000, + fields="" + ) + return act_col + + def submit_json_records(self, json_records): + """ + Function to submit aggregation records to the database + + Parameters + ---------- + json_records : list + List of dictionaries where each dictionary represents a record to be submitted to the database + """ + url = f"{self.base_url}/metadata/json:submit" + + headers = { + "accept": "application/json", + "Authorization": f"Bearer {self.nmdc_api_token}", + "Content-Type": "application/json" + } + + response = requests.post(url, headers=headers, json=json_records) + + return response.status_code + + def read_url_tsv(self, url): + """ + Function to read a URL that points to a TSV file + + Parameters + ---------- + url : str + URL to the TSV file + + Returns + ------- + list + List of dictionaries where each dictionary represents a row in the TSV file + """ + response = requests.get(url) + + # Read the TSV content + tsv_content = response.content.decode("utf-8") + + # Use csv.reader to parse the TSV content + tsv_reader = csv.reader(io.StringIO(tsv_content), delimiter="\t") + + # Convert the TSV content to a list of dictionaries + tsv_data = [] + headers = next(tsv_reader) # Get the headers from the first row + for row in tsv_reader: + tsv_data.append(dict(zip(headers, row))) + + return tsv_data + + def get_functional_terms(self, url): + """ + Function to get the functional terms from a URL of a Protein Report + + Parameters + ---------- + url : str + URL to the Protein Report + + Returns + ------- + dict + Dictionary of KO, COG, and pfam terms with their respective spectral counts + """ + fxns = {} + + content = self.read_url_tsv(url) + + # Parse the Protein Report content into KO, COG, and Pfam terms + for line in content: + # Add ko terms to the dictionary + ko = line.get("KO") + if ko != "" and ko is not None: + # Replace KO: with KEGG.ORTHOLOGY: + ko_clean = ko.replace("KO:", "KEGG.ORTHOLOGY:") + if ko_clean not in fxns.keys(): + fxns[ko_clean] = int(line.get("SummedSpectraCounts")) + else: + fxns[ko_clean] += int(line.get("SummedSpectraCounts")) + + # Add cog terms to the dictionary + cog = line.get("COG") + if cog != "" and cog is not None: + cog_clean = "COG:" + cog + if cog_clean not in fxns.keys(): + fxns[cog_clean] = int(line.get("SummedSpectraCounts")) + else: + fxns[cog_clean] += int(line.get("SummedSpectraCounts")) + + # Add pfam terms to the dictionary + pfam = line.get("pfam") + if pfam != "" and pfam is not None: + pfam_clean = "PFAM:" + pfam + if pfam_clean not in fxns.keys(): + fxns[pfam_clean] = int(line.get("SummedSpectraCounts")) + else: + fxns[pfam_clean] += int(line.get("SummedSpectraCounts")) + + + # For all, loop through keys and separate into multiple keys if there are multiple pfams + new_fxns = {} + for k, v in fxns.items(): + if "," in k: + for pfam in k.split(","): + # Check if pfam is already "PFAM:" prefixed + if not pfam.startswith("PFAM:"): + pfam = "PFAM:" + pfam + if pfam not in new_fxns.keys(): + new_fxns[pfam] = v + else: + new_fxns[pfam] += v + else: + new_fxns[k] = v + + return new_fxns + + def find_anno_url(self, dos): + """Find the URL for the protein report from a list of data object IDs + + Parameters + ---------- + dos : list + List of data object IDs + + Returns + ------- + str + URL for the Protein Report data object if found """ url = None - for doid in dos: - do = self.do_col.find_one({"id": doid}) - # skip over bad records - if not do or 'data_object_type' not in do: - continue - if do['data_object_type'] == 'Functional Annotation GFF': - url = do['url'] + + # Get all the data object records + id_filter = '{"id": {"$in": ["' + '","'.join(dos) + '"]}}' + do_recs = self.get_results( + collection="data_object_set", + filter=id_filter, + max_page_size=1000, + fields="id,data_object_type,url", + ) + + # Find the Protein Report data object and return the URL to access it + for do in do_recs: + if do.get("data_object_type") == "Protein Report": + url = do.get("url") return url - elif do['data_object_type'] == 'Annotation Amino Acid FASTA': - url = do['url'] - return url.replace("_proteins.faa", "_functional_annotation.gff") + + # If no Protein Report data object is found, return None return None def process_activity(self, act): @@ -108,86 +285,109 @@ def process_activity(self, act): Output: Dictonary of KEGG records This currently relies on the has_peptide_quantificiations - records in the activity record. This may change in the future. + records in the activity record. This may change in the future. """ # Get the URL and ID - url = self.find_anno(act['has_input']) + url = self.find_anno_url(act["has_output"]) if not url: raise ValueError(f"Missing url for {act['id']}") - url_id = url.split('/')[-1].replace("nmdc_", "nmdc:").split('_')[0] - id_list = set() - # Get the filter list - for pep in act['has_peptide_quantifications']: - # This check is because some activities have - # bogus peptides - if len(pep['all_proteins']) > 0: - mid = pep['all_proteins'][0].split('_')[0] - if not mid.startswith(url_id): - continue - id_list.update(pep['all_proteins']) - # get the KEGG protein terms from the annotation file - proteins = self.get_kegg_terms(url, id_list) - kegg_recs = {} - for pep in act['has_peptide_quantifications']: - # Loop through all proteins - for prot in pep['all_proteins']: - # ignore any proteins that don't have KEGG - # annotations - if prot not in proteins: - continue - kos = proteins[prot] - # Loop through the KEGG annotations for the protein - for ko in kos: - # Initialize the record - if ko not in kegg_recs: - new_rec = {"metaproteomic_analysis_id": act['id'], - "gene_function_id": ko, - "count": 0, - "best_protein": False} - kegg_recs[ko] = new_rec - # increment the count - kegg_recs[ko]["count"] += 1 - # If there is a best_protein then set the flag to true - if prot == pep['best_protein']: - kegg_recs[ko]["best_protein"] = True - return kegg_recs + + # Parse the KEGG, COG, and PFAM annotations + return self.get_functional_terms(url) def sweep(self): """ - This is the main action function. This goes through all - of the metaP activity records and finds any that are missing - aggregations. + This is the main action function. + + Steps: + 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection + 2. Get list of all metaproteomics activities in the database + 3. For each activity that is not in the list of previously aggregated records, process the activity: + a. Find the Protein Report URL + b. From the Protein Report URL, extract the KEGG, COG, and PFAM annotations and associated counts + c. Prepare a json record for the database with the annotations and counts + d. Validate the json record using the post /metadata/json:validate endpoint + e. If the json record is valid, submit it to the database using the post /metadata/json endpoint """ - print("Getting list of indexed objects") - done = self.agg_col.distinct("metaproteomic_analysis_id") + # Get list of workflow IDs that have already been processed + mp_wf_in_agg = self.get_previously_aggregated_records() + + # Get list of all metaproteomics activities + mp_wf_recs = self.get_activity_records() + + # Records to add to the aggregation + agg_records = {} # Iterate through all of the metaP activities - for actrec in self.act_col.find({"type":"nmdc:MetaproteomicsAnalysis"}): - # New annotations should have this - act_id = actrec['id'] - # Skip if already processed - if act_id in done: + for mp_wf_rec in mp_wf_recs: + if mp_wf_rec["id"] in mp_wf_in_agg: continue try: - kegg_term_counts = self.process_activity(actrec) - rows = kegg_term_counts.values() - - if DEBUG_MODE: - print(f"{type(rows)=}") # print type - print(f"{len(rows)=}") # print length - print(f"{rows=}") # print the value - + agg_records[mp_wf_rec["id"]] = self.process_activity(mp_wf_rec) except Exception as ex: # Continue on errors print(ex) continue - if len(rows) > 0: - print(f"Inserting: {len(rows)} rows for {act_id}") - self.agg_col.insert_many(rows) - else: - print(f' - No rows for {act_id}') + # Prepare a json record for the database + json_records = [] + for key, value in agg_records.items(): + for k, v in value.items(): + json_records.append( + {"metagenome_annotation_id": key, "gene_function_id": k, "count": v} + ) + json_record_full = {"functional_annotation_agg": json_records} + + # Validate the json record using the post /metadata/json:validate endpoint + url = f"{self.base_url}/metadata/json:validate" + response = requests.post(url, json=json_record_full) + + # If the json record is valid, submit it to the database using the post /metadata/json endpoint + if response.status_code == 200: + response = self.submit_json_records(json_records) + if response != 200: + print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"]) + if response == 200: + print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) + + + def sweep_success(self): + """Function to check the results of the sweep and ensure that the records were added to the database + + Returns + ------- + bool + True if all records were added to the functional_annotation_agg collection, False otherwise + """ + # Get list of workflow IDs that have already been processed + mp_wf_in_agg = self.get_previously_aggregated_records() + + # Get list of all metaproteomics activities + mp_wf_recs = self.get_activity_records() + + # If there are any records that were not processed, return FALSE + check = [x for x in mp_wf_recs if x["id"] in mp_wf_in_agg] + if all(check): + return True + else: + return False if __name__ == "__main__": - mp = MetaProtAgg() - mp.sweep() + mp_dev = MetaProtAgg() + mp_dev.sweep() + + # Wait for the records to be added to the database before running check (5 minutes) + time.sleep(300) + success_check = mp_dev.sweep_success() + +""" +# This is commented out until script is ready for production + if success_check: + # Reprocess in the production API + mp_prod = MetaProtAgg(dev=False) + mp_prod.sweep() + + # Wait for the records to be added to the database before running check (5 minutes) + time.sleep(300) + success_check = mp_prod.sweep_success() +""" \ No newline at end of file From 1981145ee825a9a1cc65810b25d386d88f17d933 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Fri, 8 Nov 2024 14:30:54 -0800 Subject: [PATCH 02/16] Subclass aggregation scripts --- generate_metap_agg.py | 271 ++++++++++++++++++++++++------------------ 1 file changed, 153 insertions(+), 118 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 965439f..6975f5f 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -3,32 +3,14 @@ import io import os import time +from abc import ABC, abstractmethod -# TODO KRH: Change metagmetagenome_anlaysis_id to was_generated_by throughout after https://github.com/microbiomedata/nmdc-schema/pull/2203 has been merged +# TODO KRH: Change metagmetagenome_anlaysis_id to was_generated_by throughout after https://github.com/microbiomedata/nmdc-schema/pull/2203 has been merged and data have been migrated -class MetaProtAgg: +class Aggregator(ABC): """ - MetaP Aggregation class - - Parameters - ---------- - dev : bool - Flag to indicate if production or development API should be used - Default is True, which uses the development API - - Attributes - ---------- - base_url : str - Base URL for the API - nmdc_api_token : str - API token to access the API - - Notes - ----- - This class is used to aggregate functional annotations from metaproteomics activities in the NMDC database. - There must be an environment variable called NMDC_API_TOKEN that contains the API token to access the API. + Abstract class for Aggregators """ - def __init__(self, dev=True): if dev: self.base_url = "https://api-dev.microbiomedata.org" @@ -37,6 +19,10 @@ def __init__(self, dev=True): self.base_url = "https://api.microbiomedata.org" self.nmdc_api_token = os.getenv("NMDC_API_BEARER_TOKEN") + # The following attributes are set in the subclasses + self.aggregation_filter = "" + self.workflow_filter = "" + def get_results( self, collection: str, filter="", max_page_size=100, fields="" ): @@ -57,7 +43,6 @@ def get_results( Fields to return in the query, separated by commas without spaces if multiple e.g. "id,data_object_type,url" Default is an empty string, which returns all fields - """ # Get initial results (before next_page_token is given in the results) @@ -92,28 +77,30 @@ def get_results( break return result_list - + def get_previously_aggregated_records(self): """ - Function to return all ids of metaproteomics activity records that have already been aggregated + Function to return all ids of activity records that have already been aggregated. + + Uses the aggregation_filter attribute to filter the results for subclasses. Returns ------- list - List of metagenome_annotation_ids that have already been aggregated + List of workflow ids that have already been aggregated """ agg_col = self.get_results( collection="functional_annotation_agg", - filter='{"metagenome_annotation_id":{"$regex":"wfmp"}}', + filter=self.aggregation_filter, max_page_size=1000, fields="metagenome_annotation_id", ) ids = list(set([x["metagenome_annotation_id"] for x in agg_col])) return ids - + def get_activity_records(self): """ - Function to return full metaproteomics activity records in the database + Function to return full workflow execution records in the database Returns ------- @@ -122,12 +109,12 @@ def get_activity_records(self): """ act_col = self.get_results( collection="workflow_execution_set", - filter='{"type":"nmdc:MetaproteomicsAnalysis"}', + filter=self.workflow_filter, max_page_size=1000, fields="" ) return act_col - + def submit_json_records(self, json_records): """ Function to submit aggregation records to the database @@ -151,7 +138,7 @@ def submit_json_records(self, json_records): def read_url_tsv(self, url): """ - Function to read a URL that points to a TSV file + Function to read a TSV file's content from a URL and convert it to a list of dictionaries Parameters ---------- @@ -179,7 +166,125 @@ def read_url_tsv(self, url): return tsv_data - def get_functional_terms(self, url): + def sweep(self): + """ + This is the main action function. + + Steps: + 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection + 2. Get list of all applicable workflow in the database, as defined by the workflow_filter attribute + 3. For each workflow that is not in the list of previously aggregated records, process the activity according to the process_activity method + a. Process the activity according to the process_activity method in the subclass + b. Prepare a json record for the database with the annotations and counts + c. Submit json it to the database using the post /metadata/json endpoint + """ + # Get list of workflow IDs that have already been processed + mp_wf_in_agg = self.get_previously_aggregated_records() + + # Get list of all metaproteomics activities + mp_wf_recs = self.get_activity_records() + + # Records to add to the aggregation + agg_records = {} + + # Iterate through all of the metaP activities + for mp_wf_rec in mp_wf_recs: + if mp_wf_rec["id"] in mp_wf_in_agg: + continue + try: + agg_records[mp_wf_rec["id"]] = self.process_activity(mp_wf_rec) + except Exception as ex: + # Continue on errors + print(ex) + continue + + # Prepare a json record for the database + json_records = [] + for key, value in agg_records.items(): + for k, v in value.items(): + json_records.append( + {"metagenome_annotation_id": key, "gene_function_id": k, "count": v} + ) + json_record_full = {"functional_annotation_agg": json_records} + + # Validate the json record using the post /metadata/json:validate endpoint + url = f"{self.base_url}/metadata/json:validate" + response = requests.post(url, json=json_record_full) + + # If the json record is valid, submit it to the database using the post /metadata/json endpoint + if response.status_code == 200: + response = self.submit_json_records(json_records) + if response != 200: + print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"]) + if response == 200: + print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) + + def sweep_success(self): + """Function to check the results of the sweep and ensure that the records were added to the database + + Returns + ------- + bool + True if all records were added to the functional_annotation_agg collection, False otherwise + """ + # Get list of workflow IDs that have already been processed + mp_wf_in_agg = self.get_previously_aggregated_records() + + # Get list of all metaproteomics activities + mp_wf_recs = self.get_activity_records() + + # If there are any records that were not processed, return FALSE + check = [x for x in mp_wf_recs if x["id"] in mp_wf_in_agg] + if all(check): + return True + else: + return False + + @abstractmethod + def process_activity(self, act): + """ + Abstract method to process an activity record. This method must be implemented in the subclass. + + Parameters + ---------- + act : dict + Activity record to process + + Returns + ------- + dict + Dictionary of functional annotations with their respective counts + """ + pass + +class MetaProtAgg(Aggregator): + """ + MetaP Aggregation class + + Parameters + ---------- + dev : bool + Flag to indicate if production or development API should be used + Default is True, which uses the development API + + Attributes + ---------- + base_url : str + Base URL for the API + nmdc_api_token : str + API token to access the API + + Notes + ----- + This class is used to aggregate functional annotations from metaproteomics activities in the NMDC database. + There must be an environment variable called NMDC_API_TOKEN that contains the API token to access the API. + """ + def __init__(self, dev=True): + super().__init__(dev) + self.aggregation_filter = '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + self.workflow_filter = '{"type":"nmdc:MetaproteomicsAnalysis"}' + + def get_functional_terms_from_protein_report(self, url): """ Function to get the functional terms from a URL of a Protein Report @@ -245,7 +350,7 @@ def get_functional_terms(self, url): return new_fxns - def find_anno_url(self, dos): + def find_protein_report_url(self, dos): """Find the URL for the protein report from a list of data object IDs Parameters @@ -280,97 +385,27 @@ def find_anno_url(self, dos): def process_activity(self, act): """ - Function to process an activity record. - Input: activity record - Output: Dictonary of KEGG records + Function to process a metaproteomics workflow record - This currently relies on the has_peptide_quantificiations - records in the activity record. This may change in the future. + Parameters + ---------- + act : dict + Metaproteomics workflow record to process + + Output + ------ + dict + Dictionary of functional annotations with their respective spectral counts + e.g. {"KEGG.ORTHOLOGY:K00001": 100, "COG:C00001": 50, "PFAM:PF00001": 25} """ # Get the URL and ID - url = self.find_anno_url(act["has_output"]) + url = self.find_protein_report_url(act["has_output"]) if not url: raise ValueError(f"Missing url for {act['id']}") # Parse the KEGG, COG, and PFAM annotations - return self.get_functional_terms(url) - - def sweep(self): - """ - This is the main action function. - - Steps: - 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection - 2. Get list of all metaproteomics activities in the database - 3. For each activity that is not in the list of previously aggregated records, process the activity: - a. Find the Protein Report URL - b. From the Protein Report URL, extract the KEGG, COG, and PFAM annotations and associated counts - c. Prepare a json record for the database with the annotations and counts - d. Validate the json record using the post /metadata/json:validate endpoint - e. If the json record is valid, submit it to the database using the post /metadata/json endpoint - """ - # Get list of workflow IDs that have already been processed - mp_wf_in_agg = self.get_previously_aggregated_records() - - # Get list of all metaproteomics activities - mp_wf_recs = self.get_activity_records() - - # Records to add to the aggregation - agg_records = {} - - # Iterate through all of the metaP activities - for mp_wf_rec in mp_wf_recs: - if mp_wf_rec["id"] in mp_wf_in_agg: - continue - try: - agg_records[mp_wf_rec["id"]] = self.process_activity(mp_wf_rec) - except Exception as ex: - # Continue on errors - print(ex) - continue - - # Prepare a json record for the database - json_records = [] - for key, value in agg_records.items(): - for k, v in value.items(): - json_records.append( - {"metagenome_annotation_id": key, "gene_function_id": k, "count": v} - ) - json_record_full = {"functional_annotation_agg": json_records} - - # Validate the json record using the post /metadata/json:validate endpoint - url = f"{self.base_url}/metadata/json:validate" - response = requests.post(url, json=json_record_full) - - # If the json record is valid, submit it to the database using the post /metadata/json endpoint - if response.status_code == 200: - response = self.submit_json_records(json_records) - if response != 200: - print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"]) - if response == 200: - print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) - - - def sweep_success(self): - """Function to check the results of the sweep and ensure that the records were added to the database - - Returns - ------- - bool - True if all records were added to the functional_annotation_agg collection, False otherwise - """ - # Get list of workflow IDs that have already been processed - mp_wf_in_agg = self.get_previously_aggregated_records() - - # Get list of all metaproteomics activities - mp_wf_recs = self.get_activity_records() - - # If there are any records that were not processed, return FALSE - check = [x for x in mp_wf_recs if x["id"] in mp_wf_in_agg] - if all(check): - return True - else: - return False + return self.get_functional_terms_from_protein_report(url) + if __name__ == "__main__": mp_dev = MetaProtAgg() From a828bb149141708cc058d34e2d013b0eafed5816 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Fri, 8 Nov 2024 15:52:18 -0800 Subject: [PATCH 03/16] Restructure to fetch bearer token from username and pw --- README.md | 4 +- generate_metap_agg.py | 122 +++++++++++++++++++++++++++--------------- 2 files changed, 81 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 75f7503..ca0c322 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,8 @@ Now that the container image is hosted there, you can configure a Spin workload - `POLL_TIME`: Number of seconds to sleep between each run (Default: `14400`, which is 4 hours) - `NMDC_BASE_URL`: Base URL to access the data (Default: `https://data.microbiomedata.org/data`) - `NMDC_BASE_PATH`: Base path to the data on disk (Default: `/global/cfs/cdirs/m3408/results`) -- `NMDC_API_DEV_BEARER_TOKEN`: Bearer token for NMDC's runtime dev API for submitting json records (no default) -- `NMDC_API_BEARER_TOKEN`: Bearer token for NMDC's runtime API for submitting json records (no default) +- `NMDC_CLIENT_ID`: Client ID for interacting with NMDC's runtime API (used for both dev and production) +- `NMDC_CLIENT_PW`: Password for interacting with NMDC's runtime API, production ## Release Notes diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 6975f5f..10b667e 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -10,19 +10,58 @@ class Aggregator(ABC): """ Abstract class for Aggregators + + Parameters + ---------- + dev : bool + Flag to indicate if production or development API should be used + Default is True, which uses the development API + + Attributes + ---------- + base_url : str + Base URL for the API, either production or development + "https://api.microbiomedata.org" or "https://api-dev.microbiomedata.org" + nmdc_api_token : str + API bearer token to access the API + aggregation_filter : str + Filter to apply to the aggregation collection endpoint to get applicable records (set in subclasses) + e.g. '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + workflow_filter : str + Filter to apply to the workflow collection endpoint to get applicable records (set in subclasses) + e.g. '{"type":"nmdc:MetaproteomicsAnalysis"}' """ def __init__(self, dev=True): if dev: self.base_url = "https://api-dev.microbiomedata.org" - self.nmdc_api_token = os.getenv("NMDC_API_DEV_BEARER_TOKEN") else: self.base_url = "https://api.microbiomedata.org" - self.nmdc_api_token = os.getenv("NMDC_API_BEARER_TOKEN") - + self.get_bearer_token() + # The following attributes are set in the subclasses self.aggregation_filter = "" self.workflow_filter = "" + def get_bearer_token(self): + """Function to get the bearer token from the API using the /token endpoint + + Parameters + ---------- + None, but uses the NMDC_API_USERNAME and NMDC_API_PASSWORD environment variables to get the token and set the nmdc_api_token attribute + """ + token_request_body = { + "grant_type": "password", + "username": os.getenv("NMDC_API_USERNAME"), + "password": os.getenv("NMDC_API_PASSWORD"), + } + + rv = requests.post(self.base_url + "/token", data=token_request_body) + token_response = rv.json() + if "access_token" not in token_response: + raise Exception(f"Getting token failed: {token_response}") + + self.nmdc_api_token = token_response["access_token"] + def get_results( self, collection: str, filter="", max_page_size=100, fields="" ): @@ -78,9 +117,8 @@ def get_results( return result_list - def get_previously_aggregated_records(self): - """ - Function to return all ids of activity records that have already been aggregated. + def get_previously_aggregated_workflow_ids(self): + """Function to return all ids of workflow execution ids that have already been aggregated. Uses the aggregation_filter attribute to filter the results for subclasses. @@ -98,14 +136,13 @@ def get_previously_aggregated_records(self): ids = list(set([x["metagenome_annotation_id"] for x in agg_col])) return ids - def get_activity_records(self): - """ - Function to return full workflow execution records in the database + def get_workflow_records(self): + """Function to return full workflow execution records in the database Returns ------- - list - List of metaproteomics activity records + list of dict + List of workflow execution records, each represented as a dictionary """ act_col = self.get_results( collection="workflow_execution_set", @@ -116,13 +153,17 @@ def get_activity_records(self): return act_col def submit_json_records(self, json_records): - """ - Function to submit aggregation records to the database + """Function to submit records to the database using the post /metadata/json:submit endpoint Parameters ---------- json_records : list List of dictionaries where each dictionary represents a record to be submitted to the database + + Returns + ------- + int + HTTP status code of the response """ url = f"{self.base_url}/metadata/json:submit" @@ -137,8 +178,7 @@ def submit_json_records(self, json_records): return response.status_code def read_url_tsv(self, url): - """ - Function to read a TSV file's content from a URL and convert it to a list of dictionaries + """Function to read a TSV file's content from a URL and convert it to a list of dictionaries Parameters ---------- @@ -167,27 +207,30 @@ def read_url_tsv(self, url): return tsv_data def sweep(self): - """ - This is the main action function. + """This is the main action function for the Aggregator class. - Steps: + It performs the following steps: 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection 2. Get list of all applicable workflow in the database, as defined by the workflow_filter attribute 3. For each workflow that is not in the list of previously aggregated records, process the activity according to the process_activity method a. Process the activity according to the process_activity method in the subclass b. Prepare a json record for the database with the annotations and counts c. Submit json it to the database using the post /metadata/json endpoint + + Returns + ------- + None """ # Get list of workflow IDs that have already been processed - mp_wf_in_agg = self.get_previously_aggregated_records() + mp_wf_in_agg = self.get_previously_aggregated_workflow_ids() - # Get list of all metaproteomics activities - mp_wf_recs = self.get_activity_records() + # Get list of all workflow records + mp_wf_recs = self.get_workflow_records() # Records to add to the aggregation agg_records = {} - # Iterate through all of the metaP activities + # Iterate through all of the workflow records for mp_wf_rec in mp_wf_recs: if mp_wf_rec["id"] in mp_wf_in_agg: continue @@ -203,21 +246,16 @@ def sweep(self): for key, value in agg_records.items(): for k, v in value.items(): json_records.append( - {"metagenome_annotation_id": key, "gene_function_id": k, "count": v} + {"metagenome_annotation_id": key, "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} ) json_record_full = {"functional_annotation_agg": json_records} - # Validate the json record using the post /metadata/json:validate endpoint - url = f"{self.base_url}/metadata/json:validate" - response = requests.post(url, json=json_record_full) - - # If the json record is valid, submit it to the database using the post /metadata/json endpoint - if response.status_code == 200: - response = self.submit_json_records(json_records) - if response != 200: - print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"]) - if response == 200: - print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) + response = self.submit_json_records(json_record_full) + if response != 200: + print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"], + "Response code: ", response) + if response == 200: + print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) def sweep_success(self): """Function to check the results of the sweep and ensure that the records were added to the database @@ -228,10 +266,10 @@ def sweep_success(self): True if all records were added to the functional_annotation_agg collection, False otherwise """ # Get list of workflow IDs that have already been processed - mp_wf_in_agg = self.get_previously_aggregated_records() + mp_wf_in_agg = self.get_previously_aggregated_workflow_ids() - # Get list of all metaproteomics activities - mp_wf_recs = self.get_activity_records() + # Get list of all workflow records + mp_wf_recs = self.get_workflow_records() # If there are any records that were not processed, return FALSE check = [x for x in mp_wf_recs if x["id"] in mp_wf_in_agg] @@ -259,7 +297,7 @@ def process_activity(self, act): class MetaProtAgg(Aggregator): """ - MetaP Aggregation class + Metaproteomics Aggregator class Parameters ---------- @@ -276,8 +314,7 @@ class MetaProtAgg(Aggregator): Notes ----- - This class is used to aggregate functional annotations from metaproteomics activities in the NMDC database. - There must be an environment variable called NMDC_API_TOKEN that contains the API token to access the API. + This class is used to aggregate functional annotations from metaproteomics workflows in the NMDC database. """ def __init__(self, dev=True): super().__init__(dev) @@ -285,8 +322,7 @@ def __init__(self, dev=True): self.workflow_filter = '{"type":"nmdc:MetaproteomicsAnalysis"}' def get_functional_terms_from_protein_report(self, url): - """ - Function to get the functional terms from a URL of a Protein Report + """Function to get the functional terms from a URL of a Protein Report Parameters ---------- @@ -296,7 +332,7 @@ def get_functional_terms_from_protein_report(self, url): Returns ------- dict - Dictionary of KO, COG, and pfam terms with their respective spectral counts + Dictionary of KEGG, COG, and PFAM terms with their respective spectral counts derived from the Protein Report """ fxns = {} From 226b816b705af876e19045b06bbd47ae3f623828 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Mon, 11 Nov 2024 13:01:23 -0800 Subject: [PATCH 04/16] Fix sweep_success function --- generate_metap_agg.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 10b667e..2ab1541 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -272,7 +272,7 @@ def sweep_success(self): mp_wf_recs = self.get_workflow_records() # If there are any records that were not processed, return FALSE - check = [x for x in mp_wf_recs if x["id"] in mp_wf_in_agg] + check = [x["id"] in mp_wf_in_agg for x in mp_wf_recs] if all(check): return True else: @@ -445,6 +445,7 @@ def process_activity(self, act): if __name__ == "__main__": mp_dev = MetaProtAgg() + mp_dev.sweep_success() mp_dev.sweep() # Wait for the records to be added to the database before running check (5 minutes) From b02e3205cd72f4e28fb54a65e43b10c70c0eb0b3 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Tue, 19 Nov 2024 18:28:09 -0800 Subject: [PATCH 05/16] Change to was_generated_by --- generate_metap_agg.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 2ab1541..55d0245 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -26,7 +26,7 @@ class Aggregator(ABC): API bearer token to access the API aggregation_filter : str Filter to apply to the aggregation collection endpoint to get applicable records (set in subclasses) - e.g. '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + e.g. '{"was_generated_by":{"$regex":"wfmp"}}' workflow_filter : str Filter to apply to the workflow collection endpoint to get applicable records (set in subclasses) e.g. '{"type":"nmdc:MetaproteomicsAnalysis"}' @@ -73,7 +73,7 @@ def get_results( Collection name to query, e.g. "functional_annotation_agg" filter : str, optional Filter to apply to the query written in JSON format for MongoDB - e.g. '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + e.g. '{"was_generated_by":{"$regex":"wfmp"}}' Default is an empty string, which does not apply a filter max_page_size : int, optional Maximum number of records to return in a single page @@ -131,9 +131,9 @@ def get_previously_aggregated_workflow_ids(self): collection="functional_annotation_agg", filter=self.aggregation_filter, max_page_size=1000, - fields="metagenome_annotation_id", + fields="was_generated_by", ) - ids = list(set([x["metagenome_annotation_id"] for x in agg_col])) + ids = list(set([x["was_generated_by"] for x in agg_col])) return ids def get_workflow_records(self): @@ -212,10 +212,10 @@ def sweep(self): It performs the following steps: 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection 2. Get list of all applicable workflow in the database, as defined by the workflow_filter attribute - 3. For each workflow that is not in the list of previously aggregated records, process the activity according to the process_activity method + 3. For each workflow that is not in the list of previously aggregated records: a. Process the activity according to the process_activity method in the subclass b. Prepare a json record for the database with the annotations and counts - c. Submit json it to the database using the post /metadata/json endpoint + c. Submit json to the database using the post /metadata/json endpoint Returns ------- @@ -246,7 +246,7 @@ def sweep(self): for key, value in agg_records.items(): for k, v in value.items(): json_records.append( - {"metagenome_annotation_id": key, "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} + {"was_generated_by": key, "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} ) json_record_full = {"functional_annotation_agg": json_records} @@ -318,7 +318,7 @@ class MetaProtAgg(Aggregator): """ def __init__(self, dev=True): super().__init__(dev) - self.aggregation_filter = '{"metagenome_annotation_id":{"$regex":"wfmp"}}' + self.aggregation_filter = '{"was_generated_by":{"$regex":"wfmp"}}' self.workflow_filter = '{"type":"nmdc:MetaproteomicsAnalysis"}' def get_functional_terms_from_protein_report(self, url): @@ -445,10 +445,9 @@ def process_activity(self, act): if __name__ == "__main__": mp_dev = MetaProtAgg() - mp_dev.sweep_success() mp_dev.sweep() - # Wait for the records to be added to the database before running check (5 minutes) + # Wait for the records to be added to the database before running check (5 minutes?) time.sleep(300) success_check = mp_dev.sweep_success() From 62b3e122fd9db2b96e179d92931abd64429b11be Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Tue, 19 Nov 2024 18:40:23 -0800 Subject: [PATCH 06/16] Update scripts to use single base url set as optional environmental variable --- README.md | 5 +++-- generate_metap_agg.py | 40 ++++++++-------------------------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index ca0c322..10df454 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,9 @@ Now that the container image is hosted there, you can configure a Spin workload - `POLL_TIME`: Number of seconds to sleep between each run (Default: `14400`, which is 4 hours) - `NMDC_BASE_URL`: Base URL to access the data (Default: `https://data.microbiomedata.org/data`) - `NMDC_BASE_PATH`: Base path to the data on disk (Default: `/global/cfs/cdirs/m3408/results`) -- `NMDC_CLIENT_ID`: Client ID for interacting with NMDC's runtime API (used for both dev and production) -- `NMDC_CLIENT_PW`: Password for interacting with NMDC's runtime API, production +- `NMDC_CLIENT_ID`: Client ID for interacting with NMDC's runtime API +- `NMDC_CLIENT_PW`: Password for interacting with NMDC's runtime API +- `NMDC_API_URL`: Base url for NMCD runtime API (Default: `https://api-dev.microbiomedata.org`, which is the dev url) ## Release Notes diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 55d0245..c6ad27a 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -2,20 +2,12 @@ import csv import io import os -import time from abc import ABC, abstractmethod -# TODO KRH: Change metagmetagenome_anlaysis_id to was_generated_by throughout after https://github.com/microbiomedata/nmdc-schema/pull/2203 has been merged and data have been migrated - class Aggregator(ABC): """ Abstract class for Aggregators - Parameters - ---------- - dev : bool - Flag to indicate if production or development API should be used - Default is True, which uses the development API Attributes ---------- @@ -31,11 +23,11 @@ class Aggregator(ABC): Filter to apply to the workflow collection endpoint to get applicable records (set in subclasses) e.g. '{"type":"nmdc:MetaproteomicsAnalysis"}' """ - def __init__(self, dev=True): - if dev: - self.base_url = "https://api-dev.microbiomedata.org" - else: - self.base_url = "https://api.microbiomedata.org" + # Set the base URL for the API + _NMDC_API_URL = "https://api-dev.microbiomedata.org" + + def __init__(self): + self.base_url = os.getenv("NMDC_API_URL") or self._NMDC_API_URL self.get_bearer_token() # The following attributes are set in the subclasses @@ -316,8 +308,8 @@ class MetaProtAgg(Aggregator): ----- This class is used to aggregate functional annotations from metaproteomics workflows in the NMDC database. """ - def __init__(self, dev=True): - super().__init__(dev) + def __init__(self): + super().__init__() self.aggregation_filter = '{"was_generated_by":{"$regex":"wfmp"}}' self.workflow_filter = '{"type":"nmdc:MetaproteomicsAnalysis"}' @@ -445,20 +437,4 @@ def process_activity(self, act): if __name__ == "__main__": mp_dev = MetaProtAgg() - mp_dev.sweep() - - # Wait for the records to be added to the database before running check (5 minutes?) - time.sleep(300) - success_check = mp_dev.sweep_success() - -""" -# This is commented out until script is ready for production - if success_check: - # Reprocess in the production API - mp_prod = MetaProtAgg(dev=False) - mp_prod.sweep() - - # Wait for the records to be added to the database before running check (5 minutes) - time.sleep(300) - success_check = mp_prod.sweep_success() -""" \ No newline at end of file + mp_dev.sweep() \ No newline at end of file From 7e726ad2a47d8a5ff9d0b67b12f52cba5a09291e Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Mon, 25 Nov 2024 10:11:11 -0800 Subject: [PATCH 07/16] Remove _dev from call --- generate_metap_agg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index c6ad27a..9618654 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -436,5 +436,5 @@ def process_activity(self, act): if __name__ == "__main__": - mp_dev = MetaProtAgg() - mp_dev.sweep() \ No newline at end of file + mp = MetaProtAgg() + mp.sweep() \ No newline at end of file From a9447770c86834099c854706f27f10363fcc3136 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Tue, 26 Nov 2024 14:38:22 -0800 Subject: [PATCH 08/16] Adjust API call for functional_annotation_agg query This is a temporary fix --- generate_metap_agg.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 9618654..7b071c4 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -122,7 +122,9 @@ def get_previously_aggregated_workflow_ids(self): agg_col = self.get_results( collection="functional_annotation_agg", filter=self.aggregation_filter, - max_page_size=1000, + #FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection - see issue: + # https://github.com/microbiomedata/nmdc-runtime/issues/797 + max_page_size=0, fields="was_generated_by", ) ids = list(set([x["was_generated_by"] for x in agg_col])) @@ -437,4 +439,4 @@ def process_activity(self, act): if __name__ == "__main__": mp = MetaProtAgg() - mp.sweep() \ No newline at end of file + mp.sweep_success() \ No newline at end of file From 143c60c75594bb5be1945706de462f471715cfd0 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Tue, 26 Nov 2024 17:46:35 -0800 Subject: [PATCH 09/16] Fix json construction --- generate_metap_agg.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 7b071c4..8c7d31c 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -221,15 +221,13 @@ def sweep(self): # Get list of all workflow records mp_wf_recs = self.get_workflow_records() - # Records to add to the aggregation - agg_records = {} - # Iterate through all of the workflow records for mp_wf_rec in mp_wf_recs: + if mp_wf_rec["id"] in mp_wf_in_agg: continue try: - agg_records[mp_wf_rec["id"]] = self.process_activity(mp_wf_rec) + functional_agg_dict = self.process_activity(mp_wf_rec) except Exception as ex: # Continue on errors print(ex) @@ -237,11 +235,10 @@ def sweep(self): # Prepare a json record for the database json_records = [] - for key, value in agg_records.items(): - for k, v in value.items(): - json_records.append( - {"was_generated_by": key, "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} - ) + for k, v in functional_agg_dict.items(): + json_records.append( + {"was_generated_by": mp_wf_rec["id"], "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} + ) json_record_full = {"functional_annotation_agg": json_records} response = self.submit_json_records(json_record_full) @@ -439,4 +436,4 @@ def process_activity(self, act): if __name__ == "__main__": mp = MetaProtAgg() - mp.sweep_success() \ No newline at end of file + mp.sweep() \ No newline at end of file From a3d814cbf6b8661896dd7e42b93069e8cf5167c4 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Wed, 27 Nov 2024 09:41:57 -0800 Subject: [PATCH 10/16] Add logging for exceptions --- generate_metap_agg.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 8c7d31c..43b65db 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -3,6 +3,12 @@ import io import os from abc import ABC, abstractmethod +import logging + +# Configure logging +logging.basicConfig(level=logging.ERROR) +logger = logging.getLogger(__name__) + class Aggregator(ABC): """ @@ -50,8 +56,8 @@ def get_bearer_token(self): rv = requests.post(self.base_url + "/token", data=token_request_body) token_response = rv.json() if "access_token" not in token_response: - raise Exception(f"Getting token failed: {token_response}") - + logger.error(f"Getting token failed: {token_response}, Status code: {rv.status_code}") + raise Exception(f"Getting token failed: {token_response}, Status code: {rv.status_code}") self.nmdc_api_token = token_response["access_token"] def get_results( @@ -229,8 +235,8 @@ def sweep(self): try: functional_agg_dict = self.process_activity(mp_wf_rec) except Exception as ex: - # Continue on errors - print(ex) + # Log the error and continue to the next record + logger.error(f"Error processing activity {mp_wf_rec['id']}: {ex}") continue # Prepare a json record for the database @@ -243,8 +249,7 @@ def sweep(self): response = self.submit_json_records(json_record_full) if response != 200: - print("Error submitting the aggregation records for the workflow: ", mp_wf_rec["id"], - "Response code: ", response) + logger.error(f"Error submitting the aggregation records for the workflow: {mp_wf_rec['id']}, Response code: {response}") if response == 200: print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) From 5767452f9f98b1b1549e78fb88829205bbab540e Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Wed, 27 Nov 2024 09:42:58 -0800 Subject: [PATCH 11/16] Lint metap agg script --- generate_metap_agg.py | 80 +++++++++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 43b65db..6d6f0f6 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -14,7 +14,7 @@ class Aggregator(ABC): """ Abstract class for Aggregators - + Attributes ---------- base_url : str @@ -29,6 +29,7 @@ class Aggregator(ABC): Filter to apply to the workflow collection endpoint to get applicable records (set in subclasses) e.g. '{"type":"nmdc:MetaproteomicsAnalysis"}' """ + # Set the base URL for the API _NMDC_API_URL = "https://api-dev.microbiomedata.org" @@ -56,15 +57,17 @@ def get_bearer_token(self): rv = requests.post(self.base_url + "/token", data=token_request_body) token_response = rv.json() if "access_token" not in token_response: - logger.error(f"Getting token failed: {token_response}, Status code: {rv.status_code}") - raise Exception(f"Getting token failed: {token_response}, Status code: {rv.status_code}") + logger.error( + f"Getting token failed: {token_response}, Status code: {rv.status_code}" + ) + raise Exception( + f"Getting token failed: {token_response}, Status code: {rv.status_code}" + ) self.nmdc_api_token = token_response["access_token"] - def get_results( - self, collection: str, filter="", max_page_size=100, fields="" - ): + def get_results(self, collection: str, filter="", max_page_size=100, fields=""): """General function to get results from the API using the collection endpoint with optional filter and fields - + Parameters ---------- collection : str @@ -114,7 +117,7 @@ def get_results( break return result_list - + def get_previously_aggregated_workflow_ids(self): """Function to return all ids of workflow execution ids that have already been aggregated. @@ -128,14 +131,14 @@ def get_previously_aggregated_workflow_ids(self): agg_col = self.get_results( collection="functional_annotation_agg", filter=self.aggregation_filter, - #FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection - see issue: + # FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection - see issue: # https://github.com/microbiomedata/nmdc-runtime/issues/797 max_page_size=0, fields="was_generated_by", ) ids = list(set([x["was_generated_by"] for x in agg_col])) return ids - + def get_workflow_records(self): """Function to return full workflow execution records in the database @@ -145,13 +148,13 @@ def get_workflow_records(self): List of workflow execution records, each represented as a dictionary """ act_col = self.get_results( - collection="workflow_execution_set", - filter=self.workflow_filter, - max_page_size=1000, - fields="" + collection="workflow_execution_set", + filter=self.workflow_filter, + max_page_size=1000, + fields="", ) return act_col - + def submit_json_records(self, json_records): """Function to submit records to the database using the post /metadata/json:submit endpoint @@ -170,13 +173,13 @@ def submit_json_records(self, json_records): headers = { "accept": "application/json", "Authorization": f"Bearer {self.nmdc_api_token}", - "Content-Type": "application/json" + "Content-Type": "application/json", } response = requests.post(url, headers=headers, json=json_records) return response.status_code - + def read_url_tsv(self, url): """Function to read a TSV file's content from a URL and convert it to a list of dictionaries @@ -184,7 +187,7 @@ def read_url_tsv(self, url): ---------- url : str URL to the TSV file - + Returns ------- list @@ -207,8 +210,8 @@ def read_url_tsv(self, url): return tsv_data def sweep(self): - """This is the main action function for the Aggregator class. - + """This is the main action function for the Aggregator class. + It performs the following steps: 1. Get list of workflow IDs that have already been added to the functional_annotation_agg collection 2. Get list of all applicable workflow in the database, as defined by the workflow_filter attribute @@ -229,7 +232,6 @@ def sweep(self): # Iterate through all of the workflow records for mp_wf_rec in mp_wf_recs: - if mp_wf_rec["id"] in mp_wf_in_agg: continue try: @@ -243,19 +245,28 @@ def sweep(self): json_records = [] for k, v in functional_agg_dict.items(): json_records.append( - {"was_generated_by": mp_wf_rec["id"], "gene_function_id": k, "count": v, "type": "nmdc:FunctionalAnnotationAggMember"} + { + "was_generated_by": mp_wf_rec["id"], + "gene_function_id": k, + "count": v, + "type": "nmdc:FunctionalAnnotationAggMember", + } ) json_record_full = {"functional_annotation_agg": json_records} response = self.submit_json_records(json_record_full) if response != 200: - logger.error(f"Error submitting the aggregation records for the workflow: {mp_wf_rec['id']}, Response code: {response}") + logger.error( + f"Error submitting the aggregation records for the workflow: {mp_wf_rec['id']}, Response code: {response}" + ) if response == 200: - print("Submitted aggregation records for the workflow: ", mp_wf_rec["id"]) + print( + "Submitted aggregation records for the workflow: ", mp_wf_rec["id"] + ) def sweep_success(self): """Function to check the results of the sweep and ensure that the records were added to the database - + Returns ------- bool @@ -272,7 +283,7 @@ def sweep_success(self): if all(check): return True else: - return False + return False @abstractmethod def process_activity(self, act): @@ -283,14 +294,15 @@ def process_activity(self, act): ---------- act : dict Activity record to process - + Returns ------- dict Dictionary of functional annotations with their respective counts """ pass - + + class MetaProtAgg(Aggregator): """ Metaproteomics Aggregator class @@ -307,11 +319,12 @@ class MetaProtAgg(Aggregator): Base URL for the API nmdc_api_token : str API token to access the API - + Notes ----- This class is used to aggregate functional annotations from metaproteomics workflows in the NMDC database. """ + def __init__(self): super().__init__() self.aggregation_filter = '{"was_generated_by":{"$regex":"wfmp"}}' @@ -345,7 +358,7 @@ def get_functional_terms_from_protein_report(self, url): fxns[ko_clean] = int(line.get("SummedSpectraCounts")) else: fxns[ko_clean] += int(line.get("SummedSpectraCounts")) - + # Add cog terms to the dictionary cog = line.get("COG") if cog != "" and cog is not None: @@ -354,7 +367,7 @@ def get_functional_terms_from_protein_report(self, url): fxns[cog_clean] = int(line.get("SummedSpectraCounts")) else: fxns[cog_clean] += int(line.get("SummedSpectraCounts")) - + # Add pfam terms to the dictionary pfam = line.get("pfam") if pfam != "" and pfam is not None: @@ -363,7 +376,6 @@ def get_functional_terms_from_protein_report(self, url): fxns[pfam_clean] = int(line.get("SummedSpectraCounts")) else: fxns[pfam_clean] += int(line.get("SummedSpectraCounts")) - # For all, loop through keys and separate into multiple keys if there are multiple pfams new_fxns = {} @@ -437,8 +449,8 @@ def process_activity(self, act): # Parse the KEGG, COG, and PFAM annotations return self.get_functional_terms_from_protein_report(url) - + if __name__ == "__main__": mp = MetaProtAgg() - mp.sweep() \ No newline at end of file + mp.sweep() From 4304e6ef35c93f4f6d9dcdb3f1928118f18c4b32 Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Wed, 27 Nov 2024 09:56:17 -0800 Subject: [PATCH 12/16] Modify shell to accept logging --- agg.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agg.sh b/agg.sh index dfb28e1..0050740 100644 --- a/agg.sh +++ b/agg.sh @@ -13,7 +13,7 @@ fi while true ; do date |tee -a $LOG_FILE python -m generate_functional_agg |tee -a $LOG_FILE - python -m generate_metap_agg |tee -a $LOG_FILE + python -m generate_metap_agg 2>&1 |tee -a $LOG_FILE echo "Sleeping $POLL_TIME" sleep $POLL_TIME done From 088fa4847f85dbe7a202d1c1359677e44658310a Mon Sep 17 00:00:00 2001 From: Donny Winston Date: Wed, 27 Nov 2024 22:06:17 +0100 Subject: [PATCH 13/16] feat: make regex a "prefix expression" From : > Further optimization can occur if the regular expression is a "prefix expression", which means that all potential matches start with the same string. This allows MongoDB to construct a "range" from that prefix and only match against those values from the index that fall within that range. > A regular expression is a "prefix expression" if it starts with a caret (^) or a left anchor (\A), followed by a string of simple symbols. For example, the regex /^abc.*/ will be optimized by matching only against the values from the index that start with abc. --- generate_metap_agg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 6d6f0f6..1b728ce 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -327,7 +327,7 @@ class MetaProtAgg(Aggregator): def __init__(self): super().__init__() - self.aggregation_filter = '{"was_generated_by":{"$regex":"wfmp"}}' + self.aggregation_filter = '{"was_generated_by":{"$regex":"^nmdc:wfmp"}}' self.workflow_filter = '{"type":"nmdc:MetaproteomicsAnalysis"}' def get_functional_terms_from_protein_report(self, url): From 4c8fcfa34522380b16423aeea02cc3e56b975b6b Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Mon, 2 Dec 2024 08:37:38 -0800 Subject: [PATCH 14/16] Modify FIXME note to point to new bug --- generate_metap_agg.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index 1b728ce..ec70b53 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -24,7 +24,8 @@ class Aggregator(ABC): API bearer token to access the API aggregation_filter : str Filter to apply to the aggregation collection endpoint to get applicable records (set in subclasses) - e.g. '{"was_generated_by":{"$regex":"wfmp"}}' + Note the use of the ^ character to match the beginning of the string which optimizes the query + e.g. '{"was_generated_by":{"$regex":"^nmdc:wfmp"}}' workflow_filter : str Filter to apply to the workflow collection endpoint to get applicable records (set in subclasses) e.g. '{"type":"nmdc:MetaproteomicsAnalysis"}' @@ -91,6 +92,7 @@ def get_results(self, collection: str, filter="", max_page_size=100, fields=""): resp = requests.get(og_url) initial_data = resp.json() results = initial_data.get("resources", []) + i = 0 if results == []: # if no results are returned @@ -105,6 +107,8 @@ def get_results(self, collection: str, filter="", max_page_size=100, fields=""): next_page_token = initial_data["next_page_token"] while True: + i = i + max_page_size + print(str(i) + " records processed") url = f"{self.base_url}/nmdcschema/{collection}?&filter={filter}&max_page_size={max_page_size}&next_page_token={next_page_token}&projection={fields}" response = requests.get(url) data_next = response.json() @@ -131,8 +135,8 @@ def get_previously_aggregated_workflow_ids(self): agg_col = self.get_results( collection="functional_annotation_agg", filter=self.aggregation_filter, - # FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection - see issue: - # https://github.com/microbiomedata/nmdc-runtime/issues/797 + # FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection but pagination gets stuck in infinite loop + # see issue: https://github.com/microbiomedata/nmdc-runtime/issues/806 max_page_size=0, fields="was_generated_by", ) @@ -313,13 +317,6 @@ class MetaProtAgg(Aggregator): Flag to indicate if production or development API should be used Default is True, which uses the development API - Attributes - ---------- - base_url : str - Base URL for the API - nmdc_api_token : str - API token to access the API - Notes ----- This class is used to aggregate functional annotations from metaproteomics workflows in the NMDC database. From 5e3e245b9af7063e74f14eb436a41ad0d759419b Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Mon, 2 Dec 2024 09:04:37 -0800 Subject: [PATCH 15/16] Remove pagination for both collection endpoint calls --- generate_metap_agg.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index ec70b53..eb606fc 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -154,7 +154,9 @@ def get_workflow_records(self): act_col = self.get_results( collection="workflow_execution_set", filter=self.workflow_filter, - max_page_size=1000, + # FIXME: Using max_page_size of 0 may not work as we scale up the workflow_execution_set collection but pagination gets stuck in infinite loop + # see issue: https://github.com/microbiomedata/nmdc-runtime/issues/806 + max_page_size=0, fields="", ) return act_col From a0c81dfdfaaf4ab1897eb252c9ecab048e30293e Mon Sep 17 00:00:00 2001 From: Katherine Heal Date: Mon, 2 Dec 2024 13:13:28 -0800 Subject: [PATCH 16/16] Fix page_token parameter in collection endpoint --- generate_metap_agg.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/generate_metap_agg.py b/generate_metap_agg.py index eb606fc..28e24f2 100644 --- a/generate_metap_agg.py +++ b/generate_metap_agg.py @@ -108,8 +108,7 @@ def get_results(self, collection: str, filter="", max_page_size=100, fields=""): while True: i = i + max_page_size - print(str(i) + " records processed") - url = f"{self.base_url}/nmdcschema/{collection}?&filter={filter}&max_page_size={max_page_size}&next_page_token={next_page_token}&projection={fields}" + url = f"{self.base_url}/nmdcschema/{collection}?&filter={filter}&max_page_size={max_page_size}&page_token={next_page_token}&projection={fields}" response = requests.get(url) data_next = response.json() @@ -135,9 +134,7 @@ def get_previously_aggregated_workflow_ids(self): agg_col = self.get_results( collection="functional_annotation_agg", filter=self.aggregation_filter, - # FIXME: Using max_page_size of 0 may not work as we scale up the functional_annotation_agg collection but pagination gets stuck in infinite loop - # see issue: https://github.com/microbiomedata/nmdc-runtime/issues/806 - max_page_size=0, + max_page_size=10000, fields="was_generated_by", ) ids = list(set([x["was_generated_by"] for x in agg_col])) @@ -154,9 +151,7 @@ def get_workflow_records(self): act_col = self.get_results( collection="workflow_execution_set", filter=self.workflow_filter, - # FIXME: Using max_page_size of 0 may not work as we scale up the workflow_execution_set collection but pagination gets stuck in infinite loop - # see issue: https://github.com/microbiomedata/nmdc-runtime/issues/806 - max_page_size=0, + max_page_size=500, fields="", ) return act_col