Skip to content

Commit

Permalink
Merge pull request #518 from microbiomedata/issue-503
Browse files Browse the repository at this point in the history
Code for NMDC to NCBI export
  • Loading branch information
sujaypatil96 authored Jun 25, 2024
2 parents 636b968 + b9c6d38 commit dca5a70
Show file tree
Hide file tree
Showing 12 changed files with 1,553 additions and 106 deletions.
433 changes: 433 additions & 0 deletions nmdc_runtime/site/export/ncbi_xml.py

Large diffs are not rendered by default.

206 changes: 206 additions & 0 deletions nmdc_runtime/site/export/ncbi_xml_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
from io import BytesIO, StringIO
from nmdc_runtime.minter.config import typecodes
from lxml import etree

import csv
import requests


def _build_class_map(class_map_data):
return {
entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data
}


def get_classname_from_typecode(doc_id):
class_map_data = typecodes()
class_map = _build_class_map(class_map_data)

typecode = doc_id.split(":")[1].split("-")[0]
return class_map.get(typecode)


def fetch_data_objects_from_biosamples(all_docs_collection, biosamples_list):
biosample_data_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
document = all_docs_collection.find_one(query)

if not document:
continue

has_output = document.get("has_output")
if not has_output:
continue

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
data_object_doc = all_docs_collection.find_one(
{"id": output_id}
)
if data_object_doc:
collected_data_objects.append(data_object_doc)
else:
new_current_ids.append(output_id)

current_ids = new_current_ids

if collected_data_objects:
biosample_data_objects.append({biosample["id"]: collected_data_objects})

return biosample_data_objects


def fetch_omics_processing_from_biosamples(all_docs_collection, biosamples_list):
biosample_data_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
document = all_docs_collection.find_one(query)

if not document:
continue

has_output = document.get("has_output")
if not has_output:
continue

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
omics_processing_doc = all_docs_collection.find_one(
{"id": document["id"]}
)
if omics_processing_doc:
collected_data_objects.append(omics_processing_doc)
else:
new_current_ids.append(output_id)

current_ids = new_current_ids

if collected_data_objects:
biosample_data_objects.append({biosample["id"]: collected_data_objects})

return biosample_data_objects


def handle_quantity_value(slot_value):
if "has_numeric_value" in slot_value and "has_unit" in slot_value:
return f"{slot_value['has_numeric_value']} {slot_value['has_unit']}"
elif (
"has_maximum_numeric_value" in slot_value
and "has_minimum_numeric_value" in slot_value
and "has_unit" in slot_value
):
range_value = (
slot_value["has_maximum_numeric_value"]
- slot_value["has_minimum_numeric_value"]
)
return f"{range_value} {slot_value['has_unit']}"
elif "has_raw_value" in slot_value:
return slot_value["has_raw_value"]
return "Unknown format"


def handle_text_value(slot_value):
return slot_value.get("has_raw_value", "Unknown format")


def handle_timestamp_value(slot_value):
return slot_value.get("has_raw_value", "Unknown format")


def handle_controlled_term_value(slot_value):
if "term" in slot_value:
term = slot_value["term"]
if "name" in term and "id" in term:
return f"{term['name']} [{term['id']}]"
elif "id" in term:
return term["id"]
elif "name" in term:
return term["name"]
elif "has_raw_value" in slot_value:
return slot_value["has_raw_value"]
return "Unknown format"


def handle_controlled_identified_term_value(slot_value):
if "term" in slot_value:
term = slot_value["term"]
if "name" in term and "id" in term:
return f"{term['name']} [{term['id']}]"
elif "id" in term:
return term["id"]
elif "has_raw_value" in slot_value:
return slot_value["has_raw_value"]
return "Unknown format"


def handle_geolocation_value(slot_value):
if "latitude" in slot_value and "longitude" in slot_value:
return f"{slot_value['latitude']} {slot_value['longitude']}"
elif "has_raw_value" in slot_value:
return slot_value["has_raw_value"]
return "Unknown format"


def handle_float_value(slot_value):
return f"{slot_value:.2f}"


def handle_string_value(slot_value):
return f"{slot_value}"


def load_mappings(url):
response = requests.get(url)
response.raise_for_status()
file_content = response.text

attribute_mappings = {}
slot_range_mappings = {}
reader = csv.DictReader(StringIO(file_content), delimiter="\t")
for row in reader:
if row["ignore"].strip():
continue

json_key = row["nmdc_schema_slot"]
# attribute mappings
xml_attribute_name = row["ncbi_biosample_attribute_name"]
attribute_mappings[json_key] = (
xml_attribute_name if xml_attribute_name else json_key
)

# slot range mappings
data_type = row["nmdc_schema_slot_range"]
slot_range_mappings[json_key] = data_type if data_type else "default"

return attribute_mappings, slot_range_mappings


def validate_xml(xml, xsd_url):
response = requests.get(xsd_url)
response.raise_for_status()
xsd_content = response.text

xml_schema_doc = etree.parse(BytesIO(xsd_content.encode("utf-8")))
xml_schema = etree.XMLSchema(xml_schema_doc)

xml_doc = etree.parse(BytesIO(xml.encode("utf-8")))

if not xml_schema.validate(xml_doc):
raise ValueError(f"There were errors while validating against: {xsd_url}")

return True
28 changes: 24 additions & 4 deletions nmdc_runtime/site/export/study_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import csv
from io import StringIO

import requests
from dagster import (
op,
get_dagster_logger,
Expand All @@ -26,13 +25,27 @@ def get_all_docs(client, collection, filter_):
per_page = 200
url_base = f"/{collection}?filter={filter_}&per_page={per_page}"
results = []
rv = client.request("GET", url_base).json()
response = client.request("GET", url_base)
if response.status_code != 200:
raise Exception(
f"Runtime API request failed with status {response.status_code}."
f" Check URL: {url_base}"
)
rv = response.json()
results.extend(rv.get("results", []))
page, count = rv["meta"]["page"], rv["meta"]["count"]
assert count <= 10_000
while page * per_page < count:
rv = requests.get(url_base + f"&page={page + 1}").json()
results.extend(rv["results"])
page += 1
url = f"{url_base}&page={page}"
response = client.request("GET", url)
if response.status_code != 200:
raise Exception(
f"Runtime API request failed with status {response.status_code}."
f" Check URL: {url}"
)
rv = response.json()
results.extend(rv.get("results", []))
return results


Expand Down Expand Up @@ -115,3 +128,10 @@ def export_study_biosamples_as_csv(context: OpExecutionContext, study_export_inf
def export_study_biosamples_metadata():
outputs = export_study_biosamples_as_csv(get_study_biosamples_metadata())
add_output_run_event(outputs)


@op(required_resource_keys={"runtime_api_site_client"})
def get_biosamples_by_study_id(context: OpExecutionContext, nmdc_study: dict):
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client
biosamples = get_all_docs(client, "biosamples", f"part_of:{nmdc_study['id']}")
return biosamples
24 changes: 24 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@
get_neon_pipeline_inputs,
get_df_from_url,
site_code_mapping,
get_ncbi_export_pipeline_study,
get_data_objects_from_biosamples,
get_omics_processing_from_biosamples,
get_ncbi_export_pipeline_inputs,
ncbi_submission_xml_from_nmdc_study,
ncbi_submission_xml_asset,
)
from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id


@graph
Expand Down Expand Up @@ -369,3 +376,20 @@ def ingest_neon_surface_water_metadata():
)
run_id = submit_metadata_to_db(database)
poll_for_run_completion(run_id)


@graph
def nmdc_study_to_ncbi_submission_export():
nmdc_study = get_ncbi_export_pipeline_study()
ncbi_submission_metadata = get_ncbi_export_pipeline_inputs()
biosamples = get_biosamples_by_study_id(nmdc_study)
omics_processing_records = get_omics_processing_from_biosamples(biosamples)
data_objects = get_data_objects_from_biosamples(biosamples)
xml_data = ncbi_submission_xml_from_nmdc_study(
nmdc_study,
ncbi_submission_metadata,
biosamples,
omics_processing_records,
data_objects,
)
ncbi_submission_xml_asset(xml_data)
Loading

0 comments on commit dca5a70

Please sign in to comment.