diff --git a/dags/annotate_and_index.py b/dags/annotate_and_index.py index e08260b..884cd14 100644 --- a/dags/annotate_and_index.py +++ b/dags/annotate_and_index.py @@ -25,7 +25,7 @@ from roger import pipelines from roger.config import config - envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed") + envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed:v2.0") data_sets = envspec.split(",") pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in data_sets} for pipeline_class in pipelines.get_pipeline_classes(pipeline_names): diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index df99e5d..026713d 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -304,7 +304,9 @@ def convert_to_kgx_json(self, elements, written_nodes=None): nodes = graph['nodes'] for _, element in enumerate(elements): - # DugElement means a variable (Study variable...) + # DugElement means a variable (Study variable...) + if not isinstance(element, DugElement): + continue study_id = element.collection_id if study_id not in written_nodes: nodes.append({ diff --git a/dags/roger/pipelines/bdc.py b/dags/roger/pipelines/bdc.py index 4b40690..bc30cf4 100644 --- a/dags/roger/pipelines/bdc.py +++ b/dags/roger/pipelines/bdc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for BDC-dbGap data" from roger.pipelines import DugPipeline from roger.core import storage class bdcPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for BDC-dbGap data set" pipeline_name = "bdc" parser_name = "dbgap" diff --git a/dags/roger/pipelines/crdc.py b/dags/roger/pipelines/crdc.py index 521afe0..2143cf7 100644 --- a/dags/roger/pipelines/crdc.py +++ b/dags/roger/pipelines/crdc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for Cancer Commons data" from roger.pipelines import DugPipeline from roger.core import storage class CRDCPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for Cancer Commons data set" pipeline_name = "crdc" parser_name = "crdc" diff --git a/dags/roger/pipelines/ctn.py b/dags/roger/pipelines/ctn.py new file mode 100644 index 0000000..2591806 --- /dev/null +++ b/dags/roger/pipelines/ctn.py @@ -0,0 +1,10 @@ +"Pipeline for Clinical trials network data" + +from roger.pipelines import DugPipeline + +class CTNPipeline(DugPipeline): + "Pipeline for Clinical trials nework data set" + pipeline_name = "ctn" + parser_name = "ctn" + + diff --git a/dags/roger/pipelines/heal_research_programs.py b/dags/roger/pipelines/heal_research_programs.py new file mode 100644 index 0000000..d95a449 --- /dev/null +++ b/dags/roger/pipelines/heal_research_programs.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealResearchProgramPipeline(DugPipeline): + "Pipeline for Heal-research-programs data set" + pipeline_name = "heal-research-programs" + parser_name = "heal-research" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_research_program_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) \ No newline at end of file diff --git a/dags/roger/pipelines/heal_studies.py b/dags/roger/pipelines/heal_studies.py new file mode 100644 index 0000000..cf78c04 --- /dev/null +++ b/dags/roger/pipelines/heal_studies.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealStudiesPipeline(DugPipeline): + "Pipeline for Heal-studies data set" + pipeline_name = "heal-studies" + parser_name = "heal-studies" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/kfdrc.py b/dags/roger/pipelines/kfdrc.py index 3c5a012..bcb0b7a 100644 --- a/dags/roger/pipelines/kfdrc.py +++ b/dags/roger/pipelines/kfdrc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for KDFRC data" from roger.pipelines import DugPipeline from roger.core import storage class kfdrcPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for KDFRC data set" pipeline_name = "kfdrc" parser_name = "kfdrc" diff --git a/dags/roger/pipelines/nida.py b/dags/roger/pipelines/nida.py index eb425f2..b2e841b 100644 --- a/dags/roger/pipelines/nida.py +++ b/dags/roger/pipelines/nida.py @@ -14,5 +14,5 @@ def get_objects(self, input_data_path=None): if not input_data_path: input_data_path = storage.dug_input_files_path( self.get_files_dir()) - nida_file_pattern = storage.os.path.join(input_data_path, 'NIDA-*.xml') - return sorted(storage.glob.glob(nida_file_pattern)) + files = sorted(storage.get_files_recursive(lambda x: 'NIDA-' in x , input_data_path)) + return files diff --git a/dags/roger/pipelines/sparc.py b/dags/roger/pipelines/sparc.py new file mode 100644 index 0000000..d1c9c95 --- /dev/null +++ b/dags/roger/pipelines/sparc.py @@ -0,0 +1,17 @@ +"Pipeline for Sparc data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class SparcPipeline(DugPipeline): + "Pipeline for Sparc data set" + pipeline_name = "sparc" + parser_name = "SciCrunch" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index a839898..90b3e51 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -1,16 +1,41 @@ -"Pipeline for BACPAC data" +"Pipeline for Topmed data" from roger.pipelines import DugPipeline +from roger.pipelines.base import log, os +import jsonpickle from roger.core import storage - +from roger.logger import logger class TopmedPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for Topmed data set" pipeline_name = "topmed" parser_name = "TOPMedTag" def get_objects(self, input_data_path=None): if not input_data_path: input_data_path = str(storage.dug_input_files_path('topmed')) - topmed_file_pattern = storage.os.path.join(input_data_path, - "topmed_*.csv") - return sorted(storage.glob.glob(topmed_file_pattern)) + files =storage.get_files_recursive( + lambda file_name: file_name.endswith('.csv'), + input_data_path) + return sorted([str(x) for x in files]) + + def make_kg_tagged(self, to_string=False, elements_files=None, + input_data_path=None, output_data_path=None): + "Create tagged knowledge graphs from elements" + log.info("Override base.make_kg_tagged called") + if not output_data_path: + output_data_path = storage.dug_kgx_path("") + storage.clear_dir(output_data_path) + if not elements_files: + elements_files = storage.dug_elements_objects(input_data_path, format='txt') + for file_ in elements_files: + elements = jsonpickle.decode(storage.read_object(file_)) + kg = self.make_tagged_kg(elements) + dug_base_file_name = file_.split(os.path.sep)[-2] + output_file_path = os.path.join(output_data_path, + dug_base_file_name + '_kgx.json') + storage.write_object(kg, output_file_path) + log.info("Wrote %d and %d edges, to %s", len(kg['nodes']), + len(kg['edges']), output_file_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 852cf4b..f545114 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -19,6 +19,7 @@ from roger.pipelines.base import DugPipeline from avalon.mainoperations import put_files, LakeFsWrapper, get_files from lakefs_sdk.configuration import Configuration +from lakefs_sdk.models.merge import Merge from functools import partial logger = get_logger() @@ -186,9 +187,13 @@ def avalon_commit_callback(context: DagContext, **kwargs): try: # merging temp branch to working branch + # the current working branch wins incase of conflicts + merge = Merge(**{"strategy": "source-wins"}) client._client.refs_api.merge_into_branch(repository=repo, source_ref=temp_branch_name, - destination_branch=branch) + destination_branch=branch, + merge=merge + ) logger.info(f"merged branch {temp_branch_name} into {branch}") except Exception as e: @@ -203,6 +208,7 @@ def avalon_commit_callback(context: DagContext, **kwargs): logger.info(f"deleted temp branch {temp_branch_name}") logger.info(f"deleting local dir {local_path}") files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] + clean_up(context, **kwargs) def clean_up(context: DagContext, **kwargs): diff --git a/requirements.txt b/requirements.txt index 8f211b5..9693c5b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,9 +6,9 @@ jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@2.13.0 +git+https://github.com/helxplatform/dug@2.13.1 orjson kg-utils==0.0.6 bmt==1.1.0 -git+https://github.com/helxplatform/avalon.git@pydantic-upgrade +git+https://github.com/helxplatform/avalon.git@v1.0.0 linkml-runtime==1.6.0