From eac8e6ab6858051d19d9619d5fed6724741b9527 Mon Sep 17 00:00:00 2001 From: braswent Date: Mon, 6 Nov 2023 11:35:36 -0500 Subject: [PATCH 01/71] annotator modules added by passing config val --- .gitignore | 3 +-- dags/roger/config/__init__.py | 2 ++ dags/roger/core/bulkload.py | 5 +++++ dags/roger/pipelines/base.py | 17 +++++++++++++---- requirements.txt | 3 ++- 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index fa5c8ebb..4885049c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ # Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore .secret-env -Merge-helm/ -Merge-Dug-Architecture.md +.vscode/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 060012ec..810e892e 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -88,6 +88,7 @@ class BulkLoaderConfig(DictLike): @dataclass class AnnotationConfig(DictLike): + annotator_type: str = "annotator_monarch" annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" synonym_service: str = "https://onto.renci.org/synonyms/" @@ -195,6 +196,7 @@ def to_dug_conf(self) -> DugConfig: redis_port=self.redisgraph.port, nboost_host=self.elasticsearch.nboost_host, preprocessor=self.annotation.preprocessor, + annotator_type=self.annotation.annotator_type, annotator={ 'url': self.annotation.annotator, }, diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 1eca92db..5610690e 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -42,6 +42,11 @@ def tables_up_to_date (self): targets=glob.glob (storage.bulk_path ("nodes/**.csv")) + \ glob.glob (storage.bulk_path ("edges/**.csv"))) + def create (self): + """Used in the CLI on args.create_bulk""" + self.create_nodes_csv_file() + self.create_edges_csv_file() + def create_nodes_csv_file(self): if self.tables_up_to_date (): log.info ("up to date.") diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index 25cdca55..c4a28ad0 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -14,11 +14,12 @@ import requests -from dug.core import get_parser, get_plugin_manager, DugConcept -from dug.core.annotate import DugAnnotator, ConceptExpander +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory from dug.core.parsers import Parser, DugElement +from dug.core.annotators import Annotator from dug.core.async_search import Search from dug.core.index import Index @@ -130,8 +131,9 @@ def __init__(self, config: RogerConfig, to_string=True): log.addHandler(self.string_handler) self.s3_utils = S3Utils(self.config.s3_config) - self.annotator: DugAnnotator = self.factory.build_annotator() - + self.annotator: Annotator = get_annotator( + dug_plugin_manager.hook, self.get_annotator_name(dug_conf) + ) self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() graph_name = self.config["redisgraph"]["graph"] @@ -195,6 +197,13 @@ def get_parser_name(self): can also be overriden. """ return getattr(self, 'parser_name', self.pipeline_name) + + def get_annotator_name(dug_conf): + """ + Access method for annotator_name + Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. + """ + return getattr(dug_conf, "annotator_type", "annotator_monarch") def annotate_files(self, parsable_files, output_data_path=None): """ diff --git a/requirements.txt b/requirements.txt index f553816e..ea6c1fe3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,8 @@ flatten-dict redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@dug-merge +# git+https://github.com/helxplatform/dug@dug-merge # Version used by mbacon for dev +git+https://github.com/helxplatform/dug@329-annotator-modules # Version used for annotator modules orjson kg-utils==0.0.6 bmt==1.1.0 From 2480001d9c8dd2ddc5fd123d081c6f7ac642f02b Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 13:47:52 -0500 Subject: [PATCH 02/71] fix merge conflict --- dags/roger/pipelines/base.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index d71065c6..62d2efd7 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -19,13 +19,8 @@ from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory -<<<<<<< HEAD from dug.core.parsers import Parser, DugElement from dug.core.annotators import Annotator -======= -from dug.core.parsers import Parser, DugElement, DugConcept -from dug.core.annotate import Identifier ->>>>>>> pipeline_parameterize_restructure from dug.core.async_search import Search from dug.core.index import Index From b0028c508ebbe7e4d9c58c8871bc49248fdb2792 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 15:16:33 -0500 Subject: [PATCH 03/71] following same pattern as parsers , modify configs --- dags/roger/config/__init__.py | 14 ++++++++++++-- dags/roger/config/config.yaml | 9 ++++++++- dags/roger/pipelines/base.py | 15 +++++++++++---- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 810e892e..118b807c 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -88,8 +88,18 @@ class BulkLoaderConfig(DictLike): @dataclass class AnnotationConfig(DictLike): - annotator_type: str = "annotator_monarch" - annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: str = "monarch" + annotator_args: dict = field( + default_factory=lambda: { + "monarch": { + "url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + }, + "sapbert": { + "classificationUrl": "https://med-nemo.apps.renci.org/annotate/", + "annotatorUrl": "https://babel-sapbert.apps.renci.org/annotate/", + }, + } + ) normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" synonym_service: str = "https://onto.renci.org/synonyms/" ontology_metadata: str = "https://api.monarchinitiative.org/api/bioentity/" diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index 5c27eb81..c51941ee 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -44,10 +44,17 @@ bulk_loader: annotation: clear_http_cache: false - annotator: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: monarch + annotator_args: + monarch: + url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + sapbert: + classificationUrl": "https://med-nemo.apps.renci.org/annotate/" + annotatorUrl: "https://babel-sapbert.apps.renci.org/annotate/" normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/" + preprocessor: debreviator: BMI: "body mass index" diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index 62d2efd7..19b52a27 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -129,9 +129,6 @@ def __init__(self, config: RogerConfig, to_string=False): log.addHandler(self.string_handler) self.s3_utils = S3Utils(self.config.s3_config) - self.annotator: Annotator = get_annotator( - dug_plugin_manager.hook, self.get_annotator_name(dug_conf) - ) self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() graph_name = self.config["redisgraph"]["graph"] @@ -209,6 +206,15 @@ def get_parser(self): parser: Parser = get_parser(dug_plugin_manager.hook, self.get_parser_name()) return parser + + def get_annotator(self): + dug_plugin_manager = get_plugin_manager() + annotator: Annotator = get_annotator( + dug_plugin_manager.hook, + self.get_annotator_name(), + self.config.to_dug_conf() + ) + return annotator def annotate_files(self, parsable_files, output_data_path=None): """ @@ -224,10 +230,11 @@ def annotate_files(self, parsable_files, output_data_path=None): log.debug("Creating Dug Crawler object on parse_file %s at %d of %d", parse_file, _ , len(parsable_files)) parser = self.get_parser() + annotator = self.get_annotator() crawler = Crawler( crawl_file=parse_file, parser=parser, - annotator=self.annotator, + annotator=annotator, tranqlizer='', tranql_queries=[], http_session=self.cached_session From 51139536cc3dd60ca70fa2bed8e96d57a743710c Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 16:05:23 -0500 Subject: [PATCH 04/71] fix to dug config method --- dags/roger/config/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 118b807c..60d0f6b2 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -207,9 +207,7 @@ def to_dug_conf(self) -> DugConfig: nboost_host=self.elasticsearch.nboost_host, preprocessor=self.annotation.preprocessor, annotator_type=self.annotation.annotator_type, - annotator={ - 'url': self.annotation.annotator, - }, + annotator_args=self.annotation.annotator_args, normalizer={ 'url': self.annotation.normalizer, }, From c59bfb05af8c7cbec62b299655d07d33e9a4a448 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 16:10:12 -0500 Subject: [PATCH 05/71] fix old dug pipeline for backward compatiblity --- dags/dug_helpers/dug_utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dags/dug_helpers/dug_utils.py b/dags/dug_helpers/dug_utils.py index a7795450..52db9624 100644 --- a/dags/dug_helpers/dug_utils.py +++ b/dags/dug_helpers/dug_utils.py @@ -11,8 +11,9 @@ from typing import Union, List import requests -from dug.core import get_parser, get_plugin_manager, DugConcept -from dug.core.annotate import DugAnnotator, ConceptExpander +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.annotators._base import Annotator +from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory from dug.core.parsers import Parser, DugElement @@ -44,7 +45,7 @@ def __init__(self, config: RogerConfig, to_string=True): self.string_handler = logging.StreamHandler(self.log_stream) log.addHandler(self.string_handler) - self.annotator: DugAnnotator = self.factory.build_annotator() + self.annotator_name: str = config.annotation.annotator_type self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() @@ -95,6 +96,7 @@ def annotate_files(self, parser_name, parsable_files, """ dug_plugin_manager = get_plugin_manager() parser: Parser = get_parser(dug_plugin_manager.hook, parser_name) + annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf()) if not output_data_path: output_data_path = storage.dug_annotation_path('') log.info("Parsing files") @@ -103,7 +105,7 @@ def annotate_files(self, parser_name, parsable_files, crawler = Crawler( crawl_file=parse_file, parser=parser, - annotator=self.annotator, + annotator=annotator, tranqlizer='', tranql_queries=[], http_session=self.cached_session From e0dcd9316ff561ea939652705fb06652d0583f16 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 16:39:55 -0500 Subject: [PATCH 06/71] correct default annotator type --- dags/roger/pipelines/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index 19b52a27..3be8b3c0 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -198,7 +198,7 @@ def get_annotator_name(dug_conf): Access method for annotator_name Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. """ - return getattr(dug_conf, "annotator_type", "annotator-monarch") + return getattr(dug_conf, "annotator_type", "monarch") def get_parser(self): From ae48080e9560b954638e29ebd9a5da33acd40eb3 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 17:50:31 -0500 Subject: [PATCH 07/71] reflective changes --- dags/roger/config/__init__.py | 4 ++-- dags/roger/config/config.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 60d0f6b2..bc1376ba 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -95,8 +95,8 @@ class AnnotationConfig(DictLike): "url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" }, "sapbert": { - "classificationUrl": "https://med-nemo.apps.renci.org/annotate/", - "annotatorUrl": "https://babel-sapbert.apps.renci.org/annotate/", + "classification_url": "https://med-nemo.apps.renci.org/annotate/", + "annotator_url": "https://babel-sapbert.apps.renci.org/annotate/", }, } ) diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index c51941ee..97917357 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -49,8 +49,8 @@ annotation: monarch: url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" sapbert: - classificationUrl": "https://med-nemo.apps.renci.org/annotate/" - annotatorUrl: "https://babel-sapbert.apps.renci.org/annotate/" + classification_url": "https://med-nemo.apps.renci.org/annotate/" + annotator_url: "https://babel-sapbert.apps.renci.org/annotate/" normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/" From 5fd9168899fbda9efb5dd83ba22ecca5e73bacfe Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 4 Jan 2024 18:07:26 -0500 Subject: [PATCH 08/71] typo extra quotes --- dags/roger/config/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index 97917357..a956f00e 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -49,7 +49,7 @@ annotation: monarch: url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" sapbert: - classification_url": "https://med-nemo.apps.renci.org/annotate/" + classification_url: "https://med-nemo.apps.renci.org/annotate/" annotator_url: "https://babel-sapbert.apps.renci.org/annotate/" normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" From 5ae2d3a4fd42c4b9adbd0d3dbedcacab51bc07ea Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Fri, 5 Jan 2024 09:29:10 -0500 Subject: [PATCH 09/71] annotator type not being picked up from config --- dags/roger/pipelines/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index 3be8b3c0..df99e5d6 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -193,12 +193,12 @@ def get_parser_name(self): """ return getattr(self, 'parser_name', self.pipeline_name) - def get_annotator_name(dug_conf): + def get_annotator_name(self): """ Access method for annotator_name Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. """ - return getattr(dug_conf, "annotator_type", "monarch") + return self.config.annotation.annotator_type def get_parser(self): From 9ca1e38341afe019ee9ee573d49d230f26df2714 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 16 Jan 2024 17:33:34 -0500 Subject: [PATCH 10/71] remove annotate simple , log env value for lakefs enabled --- dags/annotate_simple.py | 29 ----------------------------- dags/roger/tasks.py | 10 +++++++--- 2 files changed, 7 insertions(+), 32 deletions(-) delete mode 100755 dags/annotate_simple.py diff --git a/dags/annotate_simple.py b/dags/annotate_simple.py deleted file mode 100755 index 6ec69d96..00000000 --- a/dags/annotate_simple.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from dug_helpers.dug_utils import ( - DugUtil, - get_bacpac_files - ) -from roger.tasks import default_args, create_python_task - -DAG_ID = 'annotate_simple' - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id=DAG_ID, - default_args=default_args, - schedule_interval=None -) as dag: - - """Build workflow tasks.""" - make_kg_tagged = create_python_task(dag, "make_tagged_kgx", - DugUtil.make_kg_tagged) - annotate_files = create_python_task(dag, "annotate_bacpac_files", - DugUtil.annotate_bacpac_files, - input_repo='bacpac', - input_branch='v1.0') - - annotate_files >> make_kg_tagged diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 561608d1..4b16806d 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -222,12 +222,12 @@ def generate_dir_name_from_task_instance(task_instance: TaskInstance, f"{root_data_dir}/{dag_id}_{task_id}_{run_id}_{try_number}_{suffix}") def setup_input_data(context, exec_conf): - print(""" + logger.info(""" - Figures out the task name and id, - find its data dependencies - clean up and create in and out dir - put dependency data in input dir - - if for some reason data was not found raise an execption + - if for some reason data was not found raise an exception """) # Serves as a location where files the task will work on are placed. # computed as ROGER_DATA_DIR + /current task instance name_input_dir @@ -299,6 +299,10 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, "dag": dag, "provide_context" : True } + logger.info("Environ ROGER_LAKEFS__CONFIG_ENABLED ") + logger.info(os.environ.get('ROGER_LAKEFS__CONFIG_ENABLED ')) + logger.info('config lakefs enabled') + logger.info(config.lakefs_config.enabled) # if we have lakefs... if config.lakefs_config.enabled: @@ -310,7 +314,7 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, } if input_repo and input_branch: - # if the task is a root task , begining of the dag... + # if the task is a root task , beginning of the dag... # and we want to pull data from a different repo. pre_exec_conf = { 'input_repo': input_repo, From 348be818c18304d9544b4ce2dd22d5592429c77d Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 16 Jan 2024 18:08:01 -0500 Subject: [PATCH 11/71] testing lakefs off --- dags/roger/tasks.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 4b16806d..9be2b374 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -41,6 +41,7 @@ def task_wrapper(python_callable, **kwargs): pass_conf = kwargs.get('pass_conf', True) if config.lakefs_config.enabled: # get input path + logger.info("lakefs is enabled") input_data_path = generate_dir_name_from_task_instance(kwargs['ti'], roger_config=config, suffix='input') @@ -48,7 +49,8 @@ def task_wrapper(python_callable, **kwargs): output_data_path = generate_dir_name_from_task_instance(kwargs['ti'], roger_config=config, suffix='output') - else: + else: + logger.info("lakefs is disabled") input_data_path, output_data_path = None, None # cast it to a path object func_args = { @@ -59,9 +61,11 @@ def task_wrapper(python_callable, **kwargs): logger.info(f"Task function args: {func_args}") # overrides values config.dag_run = dag_run - if pass_conf: - return python_callable(config=config, **func_args) - return python_callable(**func_args) + def dummy_func(*args, **kwargs): + return "yay" + # if pass_conf: + # return python_callable(config=config, **func_args) + return dummy_func(**func_args) def get_executor_config(data_path='/opt/airflow/share/data'): """ Get an executor configuration. From 528768d3c56a43ccf77aace5c57d7b3d258b9fe8 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 16 Jan 2024 18:19:11 -0500 Subject: [PATCH 12/71] add more logging --- dags/roger/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 9be2b374..8fca5077 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -39,6 +39,7 @@ def task_wrapper(python_callable, **kwargs): # get dag config provided dag_run = kwargs.get('dag_run') pass_conf = kwargs.get('pass_conf', True) + config = kwargs.get('config', RogerConfig()) if config.lakefs_config.enabled: # get input path logger.info("lakefs is enabled") @@ -287,7 +288,8 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, op_kwargs = { "python_callable": a_callable, "to_string": True, - "pass_conf": pass_conf + "pass_conf": pass_conf, + "config": config } # update / override some of the args passed to the task function by default if func_kwargs is None: @@ -304,7 +306,7 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, "provide_context" : True } logger.info("Environ ROGER_LAKEFS__CONFIG_ENABLED ") - logger.info(os.environ.get('ROGER_LAKEFS__CONFIG_ENABLED ')) + logger.info(os.environ.get('ROGER_LAKEFS__CONFIG_ENABLED')) logger.info('config lakefs enabled') logger.info(config.lakefs_config.enabled) From df89638171674a3826100d8fbf0160b9684165bb Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 16 Jan 2024 18:25:15 -0500 Subject: [PATCH 13/71] add more logging --- dags/roger/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 8fca5077..7244114b 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -39,7 +39,7 @@ def task_wrapper(python_callable, **kwargs): # get dag config provided dag_run = kwargs.get('dag_run') pass_conf = kwargs.get('pass_conf', True) - config = kwargs.get('config', RogerConfig()) + config = kwargs.get('config') if config.lakefs_config.enabled: # get input path logger.info("lakefs is enabled") From 683e35b788825f962c0d83d68aa33816fd12690f Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 16 Jan 2024 19:36:29 -0500 Subject: [PATCH 14/71] post init for config to parse to boolean --- dags/roger/config/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index bc1376ba..44c2f672 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -35,6 +35,10 @@ class LakefsConfig(DictLike): repo: str enabled: bool = False + def __post_init__(self): + if isinstance(self.enabled, str): + self.enabled = self.enabled.lower() == "true" + @dataclass From 6428075d947896087539a2fb679129045f31bd08 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 17 Jan 2024 06:57:36 -0500 Subject: [PATCH 15/71] put back task calls --- dags/roger/tasks.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 7244114b..bd6a3bfb 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -39,10 +39,8 @@ def task_wrapper(python_callable, **kwargs): # get dag config provided dag_run = kwargs.get('dag_run') pass_conf = kwargs.get('pass_conf', True) - config = kwargs.get('config') if config.lakefs_config.enabled: # get input path - logger.info("lakefs is enabled") input_data_path = generate_dir_name_from_task_instance(kwargs['ti'], roger_config=config, suffix='input') @@ -51,7 +49,6 @@ def task_wrapper(python_callable, **kwargs): roger_config=config, suffix='output') else: - logger.info("lakefs is disabled") input_data_path, output_data_path = None, None # cast it to a path object func_args = { @@ -62,11 +59,8 @@ def task_wrapper(python_callable, **kwargs): logger.info(f"Task function args: {func_args}") # overrides values config.dag_run = dag_run - def dummy_func(*args, **kwargs): - return "yay" - # if pass_conf: - # return python_callable(config=config, **func_args) - return dummy_func(**func_args) + if pass_conf: + return python_callable(config=config, **func_args) def get_executor_config(data_path='/opt/airflow/share/data'): """ Get an executor configuration. @@ -156,7 +150,7 @@ def avalon_commit_callback(context: DagContext, **kwargs): # right now lets stick to using one repo , # issue Vladmir pointed out if uploads to a single lakefs branch have not - # been finilized with commit, + # been finalized with commit, # this would cause dirty commits if parallel tasks target the same branch. # solution: Lakefs team suggested we commit to a different temp branch per @@ -288,8 +282,7 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, op_kwargs = { "python_callable": a_callable, "to_string": True, - "pass_conf": pass_conf, - "config": config + "pass_conf": pass_conf } # update / override some of the args passed to the task function by default if func_kwargs is None: @@ -305,10 +298,6 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, "dag": dag, "provide_context" : True } - logger.info("Environ ROGER_LAKEFS__CONFIG_ENABLED ") - logger.info(os.environ.get('ROGER_LAKEFS__CONFIG_ENABLED')) - logger.info('config lakefs enabled') - logger.info(config.lakefs_config.enabled) # if we have lakefs... if config.lakefs_config.enabled: From ec54cc82f3ad0cd71ac3193f4596715dd0805404 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 17 Jan 2024 06:59:52 -0500 Subject: [PATCH 16/71] revert some changes --- dags/roger/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index bd6a3bfb..e6ae60cb 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -61,6 +61,7 @@ def task_wrapper(python_callable, **kwargs): config.dag_run = dag_run if pass_conf: return python_callable(config=config, **func_args) + return python_callable(**func_args) def get_executor_config(data_path='/opt/airflow/share/data'): """ Get an executor configuration. From d557c2f924181deb442ae3e6cd020fc0f8b7df14 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Fri, 19 Jan 2024 14:31:33 -0500 Subject: [PATCH 17/71] adding new pipeline --- dags/knowledge_graph_build.py | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 dags/knowledge_graph_build.py diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py new file mode 100644 index 00000000..b0a8e43d --- /dev/null +++ b/dags/knowledge_graph_build.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# + +""" +An Airflow workflow for the Roger Translator KGX data pipeline. +""" + +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +import roger +from roger.tasks import get_executor_config, default_args, create_python_task + +""" Build the workflow's tasks and DAG. """ +with DAG( + dag_id='knowledge_graph_build', + default_args=default_args, + schedule_interval=None +) as dag: + + """ Build the workflow tasks. """ + intro = EmptyOperator(task_id='Intro') + + # Merge nodes needs inputs from two sources + # 1. baseline and/or CDE KGX files from LakeFS (External repo) + # 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo + merge_nodes = create_python_task (dag, "MergeNodes", roger.merge_nodes) + + # The rest of these guys can just operate on the local lakefs repo/branch + # we need to add input dir and output dir similar to what we did for dug tasks + + create_nodes_schema = create_python_task(dag, "CreateNodesSchema", + roger.create_nodes_schema) + create_edges_schema = create_python_task(dag, "CreateEdgesSchema", + roger.create_edges_schema) + + + continue_task_bulk_load = EmptyOperator(task_id="continueBulkCreate") + create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", + roger.create_bulk_nodes) + create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", + roger.create_bulk_edges) + bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) + continue_task_validate = EmptyOperator(task_id="continueValidation") + check_tranql = create_python_task(dag, "CheckTranql", + roger.check_tranql) + validate = create_python_task(dag, "Validate", roger.validate) + finish = EmptyOperator(task_id='Finish') + + """ Build the DAG. """ + intro >> merge_nodes >> [create_nodes_schema, create_edges_schema ] >> continue_task_bulk_load >> \ + [create_bulk_load_nodes, create_bulk_load_edges] >> bulk_load >> continue_task_validate >>[validate, check_tranql ] >> finish From 83815ab472cfa3642524fcb90f502dd915d43f17 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Fri, 19 Jan 2024 15:38:23 -0500 Subject: [PATCH 18/71] lakefs io support for merge task --- dags/knowledge_graph_build.py | 24 +++++++++++++++++++++++- dags/roger/core/base.py | 2 +- dags/roger/core/storage.py | 6 ++++-- dags/roger/models/kgx.py | 10 +++++++--- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index b0a8e43d..4590bb50 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -9,6 +9,7 @@ from airflow.operators.empty import EmptyOperator import roger from roger.tasks import get_executor_config, default_args, create_python_task +from roger.config import config """ Build the workflow's tasks and DAG. """ with DAG( @@ -23,7 +24,28 @@ # Merge nodes needs inputs from two sources # 1. baseline and/or CDE KGX files from LakeFS (External repo) # 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo - merge_nodes = create_python_task (dag, "MergeNodes", roger.merge_nodes) + + # build the annotate and index pipeline output locations + #lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/ + working_repo = config.lakefs_config.repo + branch = config.lakefs_config.branch + get_path_on_lakefs = lambda d: f"{working_repo}/{branch}/annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" + kgx_files_to_grab = [] + for dataset in config.dug_inputs.data_sets: + dataset_name = dataset.split(":")[0] + kgx_files_to_grab.append(get_path_on_lakefs(dataset_name)) + + print("***************************") + print(kgx_files_to_grab) + + + merge_nodes = create_python_task (dag, name="MergeNodes", + a_callable=roger.merge_nodes, + input_repo="cde", + input_branch="v5.0") + + + # The rest of these guys can just operate on the local lakefs repo/branch # we need to add input dir and output dir similar to what we did for dug tasks diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 6696f81d..5f9b6d41 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -89,7 +89,7 @@ def create_nodes_schema(to_string=False, config=None): output = roger.log_stream.getvalue() if to_string else None return output -def merge_nodes(to_string=False, config=None): +def merge_nodes(to_string=False, config=None, input_path="", output_path=""): "Run KGX merge" output = None with Roger (to_string, config=config) as roger: diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index f5f152dd..0574e4c1 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -152,10 +152,12 @@ def kgx_path(name): :path name: Name of the KGX object. """ return str(ROGER_DATA_DIR / "kgx" / name) -def kgx_objects(format_="json"): +def kgx_objects(format_="json", path=None): """ A list of KGX objects. """ kgx_pattern = kgx_path(f"**.{format_}") - return sorted(glob.glob (kgx_pattern)) + if path: + kgx_pattern = f"{path}/**/*.{format_}" + return sorted(glob.glob (kgx_pattern, recursive=True)) def merge_path(name): """ Form a merged KGX object path. diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 62a57a61..6372b01a 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -414,13 +414,17 @@ def write_schema(self, schema, schema_type: SchemaType): dictionary = { k : v for k, v in schema.items () } storage.write_object (dictionary, file_name) - def merge(self): + def merge(self, input_path="", output_path=""): """ This version uses the disk merging from the kg_utils module """ data_set_version = self.config.get('kgx', {}).get('dataset_version') metrics = {} start = time.time() - json_format_files = storage.kgx_objects("json") - jsonl_format_files = storage.kgx_objects("jsonl") + if input_path: + json_format_file = storage.kgx_objects("json", input_path) + jsonl_format_files = storage.kgx_objects("jsonl", input_path) + else: + json_format_files = storage.kgx_objects("json") + jsonl_format_files = storage.kgx_objects("jsonl") # Create lists of the nodes and edges files in both json and jsonl # formats From 80ddf597359aa1b536ac6f15ff5721cb0a33ad5e Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Mon, 22 Jan 2024 14:13:25 -0500 Subject: [PATCH 19/71] fix name --- dags/knowledge_graph_build.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 4590bb50..92e2a836 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -41,7 +41,7 @@ merge_nodes = create_python_task (dag, name="MergeNodes", a_callable=roger.merge_nodes, - input_repo="cde", + input_repo="cde-graph", input_branch="v5.0") From 93dcbe92036ba8002297011af0863c7f9e26d7d7 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Mon, 22 Jan 2024 14:32:20 -0500 Subject: [PATCH 20/71] add io params for kg tasks --- dags/roger/core/base.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 5f9b6d41..ff35b8b0 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -89,7 +89,7 @@ def create_nodes_schema(to_string=False, config=None): output = roger.log_stream.getvalue() if to_string else None return output -def merge_nodes(to_string=False, config=None, input_path="", output_path=""): +def merge_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run KGX merge" output = None with Roger (to_string, config=config) as roger: @@ -97,14 +97,14 @@ def merge_nodes(to_string=False, config=None, input_path="", output_path=""): output = roger.log_stream.getvalue () if to_string else None return output -def create_bulk_load(to_string=False, config=None): +def create_bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk load files" o1 = create_bulk_nodes(to_string=to_string, config=config) o2 = create_bulk_edges(to_string=to_string, config=config) output = (o1 + o2) if to_string else None return output -def create_bulk_nodes(to_string=False, config=None): +def create_bulk_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk node CSV file" output = None with Roger(to_string, config=config) as roger: @@ -112,7 +112,7 @@ def create_bulk_nodes(to_string=False, config=None): output = roger.log_stream.getvalue() if to_string else None return output -def create_bulk_edges(to_string=False, config=None): +def create_bulk_edges(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create bulk edges CSV file" output = None with Roger(to_string, config=config) as roger: @@ -120,7 +120,7 @@ def create_bulk_edges(to_string=False, config=None): output = roger.log_stream.getvalue() if to_string else None return output -def bulk_load(to_string=False, config=None): +def bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk load insert process" output = None with Roger (to_string, config=config) as roger: @@ -128,7 +128,7 @@ def bulk_load(to_string=False, config=None): output = roger.log_stream.getvalue () if to_string else None return output -def validate (to_string=False, config=None): +def validate (to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk validate process" output = None with Roger (to_string, config=config) as roger: @@ -136,7 +136,7 @@ def validate (to_string=False, config=None): output = roger.log_stream.getvalue () if to_string else None return output -def check_tranql(to_string=False, config=None): +def check_tranql(to_string=False, config=None, input_data_path=None, output_data_path=None): "Tranql server smoke check" output = None with Roger(to_string, config=config) as roger: From 3676cc9e9cb0cab36d75395972d8e18f65f1eb6f Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 08:01:13 -0500 Subject: [PATCH 21/71] wire up i/o paths for merge --- dags/knowledge_graph_build.py | 1 + dags/roger/core/base.py | 2 +- dags/roger/core/storage.py | 6 ++++-- dags/roger/models/kgx.py | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 92e2a836..12666bde 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -29,6 +29,7 @@ #lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/ working_repo = config.lakefs_config.repo branch = config.lakefs_config.branch + kgx_repos = config.kgx.data_sets get_path_on_lakefs = lambda d: f"{working_repo}/{branch}/annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" kgx_files_to_grab = [] for dataset in config.dug_inputs.data_sets: diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index ff35b8b0..0fd06460 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -93,7 +93,7 @@ def merge_nodes(to_string=False, config=None, input_data_path=None, output_data_ "Run KGX merge" output = None with Roger (to_string, config=config) as roger: - roger.kgx.merge() + roger.kgx.merge(input_path=input_data_path, output_path=output_data_path) output = roger.log_stream.getvalue () if to_string else None return output diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 0574e4c1..a0b3b983 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -159,10 +159,12 @@ def kgx_objects(format_="json", path=None): kgx_pattern = f"{path}/**/*.{format_}" return sorted(glob.glob (kgx_pattern, recursive=True)) -def merge_path(name): +def merge_path(name, path=None): """ Form a merged KGX object path. :path name: Name of the merged KGX object. """ - return str(ROGER_DATA_DIR / 'merge' / name) + if path is None: + return str(ROGER_DATA_DIR / 'merge' / name) + return path + os.path.sep + 'name' def merged_objects(): """ A list of merged KGX objects. """ diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 6372b01a..591d710d 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -459,7 +459,7 @@ def merge(self, input_path="", output_path=""): write_merge_metric = {} t = time.time() start_nodes_jsonl = time.time() - nodes_file_path = storage.merge_path("nodes.jsonl") + nodes_file_path = storage.merge_path("nodes.jsonl", output_path) # stream out nodes to nodes.jsonl file with open(nodes_file_path, 'w') as stream: @@ -472,7 +472,7 @@ def merge(self, input_path="", output_path=""): start_edge_jsonl = time.time() # stream out edges to edges.jsonl file - edges_file_path = storage.merge_path("edges.jsonl") + edges_file_path = storage.merge_path("edges.jsonl", output_path) with open(edges_file_path, 'w') as stream: for edges in merged_edges: edges = json.loads(edges) From 0a8be3ed7ed9569d67dd6737f99eadd1f1bde51c Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 08:04:19 -0500 Subject: [PATCH 22/71] fix variable name --- dags/roger/models/kgx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 591d710d..5a7cde63 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -420,7 +420,7 @@ def merge(self, input_path="", output_path=""): metrics = {} start = time.time() if input_path: - json_format_file = storage.kgx_objects("json", input_path) + json_format_files = storage.kgx_objects("json", input_path) jsonl_format_files = storage.kgx_objects("jsonl", input_path) else: json_format_files = storage.kgx_objects("json") From 5a010d85ec096436b7d59a82126cedc4087dfc30 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 08:39:46 -0500 Subject: [PATCH 23/71] print files --- dags/roger/models/kgx.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 5a7cde63..87b6709e 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -433,6 +433,9 @@ def merge(self, input_path="", output_path=""): jsonl_edge_files = {file for file in jsonl_format_files if "edge" in file} + log.info(f"JsonL files: {jsonl_format_files}") + log.info(f"Json files: {json_format_files}") + # Create all the needed iterators and sets thereof jsonl_node_iterators = [storage.jsonl_iter(file_name) for file_name in jsonl_node_files] From 1e6a9a232f3b40e157bbaef1f7a0f86d984f6cc5 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 09:09:26 -0500 Subject: [PATCH 24/71] few debug logs --- dags/roger/core/storage.py | 2 +- dags/roger/models/kgx.py | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index a0b3b983..c75e0f36 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -164,7 +164,7 @@ def merge_path(name, path=None): :path name: Name of the merged KGX object. """ if path is None: return str(ROGER_DATA_DIR / 'merge' / name) - return path + os.path.sep + 'name' + return path + os.path.sep + name def merged_objects(): """ A list of merged KGX objects. """ diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 87b6709e..9cd3b263 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -50,11 +50,11 @@ def __init__(self, biolink=None, config=None): self.biolink = BiolinkModel(self.biolink_version) else: self.biolink = biolink - self.redis_conn = redis.Redis( - host=self.config.redisgraph.host, - port=self.config.redisgraph.port, - password=self.config.redisgraph.password, - db=self.merge_db_id) + # self.redis_conn = redis.Redis( + # host=self.config.redisgraph.host, + # port=self.config.redisgraph.port, + # password=self.config.redisgraph.password, + # db=self.merge_db_id) self.enable_metrics = self.config.get('enable_metrics', False) def get_kgx_json_format(self, files: list, dataset_version: str): @@ -416,9 +416,12 @@ def write_schema(self, schema, schema_type: SchemaType): def merge(self, input_path="", output_path=""): """ This version uses the disk merging from the kg_utils module """ - data_set_version = self.config.get('kgx', {}).get('dataset_version') + metrics = {} start = time.time() + + log.info(f"Input path = {input_path}, Output path = {output_path}") + if input_path: json_format_files = storage.kgx_objects("json", input_path) jsonl_format_files = storage.kgx_objects("jsonl", input_path) @@ -496,3 +499,9 @@ def merge(self, input_path="", output_path=""): if self.enable_metrics: metricsfile_path = storage.metrics_path('merge_metrics.yaml') storage.write_object(metrics, metricsfile_path) + + + +if __name__ == '__main__': + kg_merger = KGXModel() + kg_merger.merge(input_path="/home/kebedey/projects/helx/roger/kgx_dir",output_path="/home/kebedey/projects/helx/roger/kgx_dir_out") From c1ae51a14e6fb10aa0b692b89035563b3a3f1039 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 09:30:54 -0500 Subject: [PATCH 25/71] few debug logs --- dags/roger/models/kgx.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 9cd3b263..39107998 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -459,6 +459,14 @@ def merge(self, input_path="", output_path=""): self.merger.merge_nodes(node_iterators) merged_nodes = self.merger.get_merged_nodes_jsonl() + ## test code + + for edge in edge_iterators: + log.info(edge) + + ## end test code + + self.merger.merge_edges(edge_iterators) merged_edges = self.merger.get_merged_edges_jsonl() From eb7fdc1d9757928f4b08ab3c5dbb507109d7381a Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 09:39:36 -0500 Subject: [PATCH 26/71] treat path as path not str --- dags/roger/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index c75e0f36..1dfbc7e9 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -164,7 +164,7 @@ def merge_path(name, path=None): :path name: Name of the merged KGX object. """ if path is None: return str(ROGER_DATA_DIR / 'merge' / name) - return path + os.path.sep + name + return str(path / name) def merged_objects(): """ A list of merged KGX objects. """ From 6c49a0c28bd30d66936a97173ca73ea9ec698bc1 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 09:43:49 -0500 Subject: [PATCH 27/71] few debug logs --- dags/roger/models/kgx.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 39107998..f02eeeff 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -462,7 +462,8 @@ def merge(self, input_path="", output_path=""): ## test code for edge in edge_iterators: - log.info(edge) + if 'subject' not in edge: + log.info("error on edge") ## end test code From 7fdf08acbeb3b170490602725bc1086d65cab613 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 10:22:33 -0500 Subject: [PATCH 28/71] some fixes --- dags/roger/core/storage.py | 4 ++-- dags/roger/models/kgx.py | 25 ++++++++++++++++--------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 1dfbc7e9..17a5bd11 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -159,12 +159,12 @@ def kgx_objects(format_="json", path=None): kgx_pattern = f"{path}/**/*.{format_}" return sorted(glob.glob (kgx_pattern, recursive=True)) -def merge_path(name, path=None): +def merge_path(name, path: Path=None): """ Form a merged KGX object path. :path name: Name of the merged KGX object. """ if path is None: return str(ROGER_DATA_DIR / 'merge' / name) - return str(path / name) + return str(path.joinpath(name)) def merged_objects(): """ A list of merged KGX objects. """ diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index f02eeeff..ec5056bc 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -414,7 +414,7 @@ def write_schema(self, schema, schema_type: SchemaType): dictionary = { k : v for k, v in schema.items () } storage.write_object (dictionary, file_name) - def merge(self, input_path="", output_path=""): + def merge(self, input_path=None, output_path=None): """ This version uses the disk merging from the kg_utils module """ metrics = {} @@ -459,13 +459,13 @@ def merge(self, input_path="", output_path=""): self.merger.merge_nodes(node_iterators) merged_nodes = self.merger.get_merged_nodes_jsonl() - ## test code - - for edge in edge_iterators: - if 'subject' not in edge: - log.info("error on edge") - - ## end test code + # ## test code + # + # for edge in edge_iterators: + # if 'subject' not in edge: + # log.info("error on edge") + # + # ## end test code self.merger.merge_edges(edge_iterators) @@ -474,6 +474,11 @@ def merge(self, input_path="", output_path=""): write_merge_metric = {} t = time.time() start_nodes_jsonl = time.time() + + # create output dir + if not os.path.exists(output_path): + os.makedirs(output_path) + nodes_file_path = storage.merge_path("nodes.jsonl", output_path) # stream out nodes to nodes.jsonl file @@ -513,4 +518,6 @@ def merge(self, input_path="", output_path=""): if __name__ == '__main__': kg_merger = KGXModel() - kg_merger.merge(input_path="/home/kebedey/projects/helx/roger/kgx_dir",output_path="/home/kebedey/projects/helx/roger/kgx_dir_out") + import pathlib + kg_merger.merge(input_path=pathlib.Path("/home/kebedey/projects/helx/roger/kgx_dir"), + output_path=pathlib.Path("/home/kebedey/projects/helx/roger/kgx_dir_out")) From 07c5bd91b935b3f48e5930b7c708b5cf6926b531 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 10:25:04 -0500 Subject: [PATCH 29/71] logging edge files --- dags/roger/models/kgx.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index ec5056bc..29570153 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -435,9 +435,8 @@ def merge(self, input_path=None, output_path=None): if "node" in file} jsonl_edge_files = {file for file in jsonl_format_files if "edge" in file} - - log.info(f"JsonL files: {jsonl_format_files}") - log.info(f"Json files: {json_format_files}") + log.info(f"Jsonl edge files : {jsonl_edge_files}") + log.info(f"Jsonl node files : {jsonl_node_files}") # Create all the needed iterators and sets thereof jsonl_node_iterators = [storage.jsonl_iter(file_name) @@ -459,14 +458,6 @@ def merge(self, input_path=None, output_path=None): self.merger.merge_nodes(node_iterators) merged_nodes = self.merger.get_merged_nodes_jsonl() - # ## test code - # - # for edge in edge_iterators: - # if 'subject' not in edge: - # log.info("error on edge") - # - # ## end test code - self.merger.merge_edges(edge_iterators) merged_edges = self.merger.get_merged_edges_jsonl() From 999d4a60aa5b4ea6a4fcd98f0d31c30bc6501822 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 10:28:36 -0500 Subject: [PATCH 30/71] bug fix knowledge has edge --- dags/roger/models/kgx.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 29570153..c1f44b9b 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -432,9 +432,9 @@ def merge(self, input_path=None, output_path=None): # Create lists of the nodes and edges files in both json and jsonl # formats jsonl_node_files = {file for file in jsonl_format_files - if "node" in file} + if "node" in file.split('/')[-1]} jsonl_edge_files = {file for file in jsonl_format_files - if "edge" in file} + if "edge" in file.split('/')[-1]} log.info(f"Jsonl edge files : {jsonl_edge_files}") log.info(f"Jsonl node files : {jsonl_node_files}") From 5a8f6290b29418d463837abea06c244e0247eba9 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 12:20:33 -0500 Subject: [PATCH 31/71] re-org graph structure --- dags/knowledge_graph_build.py | 14 ++++++++++++-- dags/roger/core/base.py | 4 ++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 12666bde..c3ebb403 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -70,5 +70,15 @@ finish = EmptyOperator(task_id='Finish') """ Build the DAG. """ - intro >> merge_nodes >> [create_nodes_schema, create_edges_schema ] >> continue_task_bulk_load >> \ - [create_bulk_load_nodes, create_bulk_load_edges] >> bulk_load >> continue_task_validate >>[validate, check_tranql ] >> finish + merge_nodes.set_upstream(intro) + create_nodes_schema.set_upstream(merge_nodes) + create_edges_schema.set_upstream(merge_nodes) + create_bulk_load_nodes.set_upstream(create_nodes_schema) + create_bulk_load_nodes.set_upstream(merge_nodes) + create_bulk_load_edges.set_upstream(create_edges_schema) + create_bulk_load_edges.set_upstream(merge_nodes) + bulk_load.set_upstream(create_bulk_load_nodes) + bulk_load.set_upstream(create_bulk_load_edges) + validate.set_upstream(bulk_load) + check_tranql.set_upstream(bulk_load) + diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 0fd06460..319b1987 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -73,7 +73,7 @@ def create_schema(to_string=False, config=None): output = (o1 + o2 ) if to_string else None return output -def create_edges_schema(to_string=False, config=None): +def create_edges_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create edges schema on KGX object" output = None with Roger(to_string, config=config) as roger: @@ -81,7 +81,7 @@ def create_edges_schema(to_string=False, config=None): output = roger.log_stream.getvalue() if to_string else None return output -def create_nodes_schema(to_string=False, config=None): +def create_nodes_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create nodes schema on KGX object" output = None with Roger(to_string, config=config) as roger: From c5d2b0e0505f17cc628e41fb0eeec1ea30b725f6 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 12:40:36 -0500 Subject: [PATCH 32/71] adding pathing for other tasks --- dags/knowledge_graph_build.py | 11 +---------- dags/roger/core/base.py | 8 ++++++-- dags/roger/core/storage.py | 6 ++++-- dags/roger/models/kgx.py | 16 ++++++++-------- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index c3ebb403..406156f2 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -36,18 +36,12 @@ dataset_name = dataset.split(":")[0] kgx_files_to_grab.append(get_path_on_lakefs(dataset_name)) - print("***************************") - print(kgx_files_to_grab) - merge_nodes = create_python_task (dag, name="MergeNodes", a_callable=roger.merge_nodes, input_repo="cde-graph", input_branch="v5.0") - - - # The rest of these guys can just operate on the local lakefs repo/branch # we need to add input dir and output dir similar to what we did for dug tasks @@ -56,18 +50,15 @@ create_edges_schema = create_python_task(dag, "CreateEdgesSchema", roger.create_edges_schema) - - continue_task_bulk_load = EmptyOperator(task_id="continueBulkCreate") create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", roger.create_bulk_nodes) create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", roger.create_bulk_edges) bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) - continue_task_validate = EmptyOperator(task_id="continueValidation") check_tranql = create_python_task(dag, "CheckTranql", roger.check_tranql) validate = create_python_task(dag, "Validate", roger.validate) - finish = EmptyOperator(task_id='Finish') + """ Build the DAG. """ merge_nodes.set_upstream(intro) diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 319b1987..8337de39 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -77,7 +77,10 @@ def create_edges_schema(to_string=False, config=None, input_data_path=None, outp "Create edges schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_edges_schema() + roger.kgx.create_edges_schema( + input_data_path=input_data_path, + output_data_path=output_data_path + ) output = roger.log_stream.getvalue() if to_string else None return output @@ -85,7 +88,8 @@ def create_nodes_schema(to_string=False, config=None, input_data_path=None, outp "Create nodes schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_nodes_schema() + roger.kgx.create_nodes_schema(input_data_path=input_data_path, + output_data_path=output_data_path) output = roger.log_stream.getvalue() if to_string else None return output diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 17a5bd11..f4c359d5 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -171,10 +171,12 @@ def merged_objects(): merged_pattern = merge_path("**.json") return sorted(glob.glob (merged_pattern)) -def schema_path(name): +def schema_path(name, path=None): """ Path to a schema object. :param name: Name of the object to get a path for. """ - return str(ROGER_DATA_DIR / 'schema' / name) + if not path: + return str(ROGER_DATA_DIR / 'schema' / name) + return str (path / 'schema' / name) def bulk_path(name): """ Path to a bulk load object. diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index c1f44b9b..27b8eed6 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -301,7 +301,7 @@ def fetch_dug_kgx(self): log.info("Done copying dug KGX files.") return all_kgx_files - def create_nodes_schema(self): + def create_nodes_schema(self, input_data_path=None, output_data_path=None): """ Extracts schema for nodes based on biolink leaf types :return: @@ -309,7 +309,7 @@ def create_nodes_schema(self): category_schemas = defaultdict(lambda: None) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merge_path("nodes.jsonl", input_data_path) log.info(f"Processing : {merged_nodes_file}") counter = 0 for node in storage.json_line_iter(merged_nodes_file): @@ -356,15 +356,15 @@ def create_nodes_schema(self): f"These will be treated as {BiolinkModel.root_type}.") # Write node schemas. - self.write_schema(category_schemas, SchemaType.CATEGORY) + self.write_schema(category_schemas, SchemaType.CATEGORY, output_path=output_data_path) - def create_edges_schema(self): + def create_edges_schema(self, input_data_path=None, output_data_path=None): """ Create unified schema for all edges in an edges jsonl file. :return: """ predicate_schemas = defaultdict(lambda: None) - merged_edges_file = storage.merge_path("edges.jsonl") + merged_edges_file = storage.merge_path("edges.jsonl", input_data_path) """ Infer predicate schemas. """ for edge in storage.json_line_iter(merged_edges_file): predicate = edge['predicate'] @@ -378,7 +378,7 @@ def create_edges_schema(self): previous_type = predicate_schemas[predicate][k] predicate_schemas[predicate][k] = compare_types( previous_type, current_type) - self.write_schema(predicate_schemas, SchemaType.PREDICATE) + self.write_schema(predicate_schemas, SchemaType.PREDICATE, output_path=output_data_path) def create_schema (self): """Determine the schema of each type of object. @@ -403,13 +403,13 @@ def schema_up_to_date (self): f"{SchemaType.PREDICATE.value}-schema.json") ]) - def write_schema(self, schema, schema_type: SchemaType): + def write_schema(self, schema, schema_type: SchemaType ,output_path=None): """ Output the schema file. :param schema: Schema to get keys from. :param schema_type: Type of schema to write. """ - file_name = storage.schema_path (f"{schema_type.value}-schema.json") + file_name = storage.schema_path (f"{schema_type.value}-schema.json", output_path) log.info("writing schema: %s", file_name) dictionary = { k : v for k, v in schema.items () } storage.write_object (dictionary, file_name) From 96c8ff5a20dcca8d42bd9cecb53d06cfc0c15396 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 17:54:22 -0500 Subject: [PATCH 33/71] pagenation logic fix for avalon --- dags/knowledge_graph_build.py | 6 ++++-- dags/roger/tasks.py | 4 +--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 406156f2..1d5a6b5f 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -45,8 +45,10 @@ # The rest of these guys can just operate on the local lakefs repo/branch # we need to add input dir and output dir similar to what we did for dug tasks - create_nodes_schema = create_python_task(dag, "CreateNodesSchema", - roger.create_nodes_schema) + create_nodes_schema = create_python_task(dag, + name="CreateNodesSchema", + a_callable=roger.create_nodes_schema + ) create_edges_schema = create_python_task(dag, "CreateEdgesSchema", roger.create_edges_schema) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index e6ae60cb..bcd8fe94 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -263,14 +263,12 @@ def setup_input_data(context, exec_conf): local_path=input_dir, remote_path=remote_path, branch=input_branch, - task_name=context['task'].task_id, - pipeline_id=context['dag'].dag_id, repo=input_repo, changes_only=False, - metafilename=None, lake_fs_client=client ) + def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, input_branch=None, pass_conf=True, no_output_files=False): """ Create a python task. From 62e8a0ca3aae77ff4615e32c18d684f45ddeed93 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 18:07:49 -0500 Subject: [PATCH 34/71] update lakefs client code --- dags/roger/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index bcd8fe94..c1cccc54 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -18,7 +18,7 @@ from roger.logger import get_logger from roger.pipelines.base import DugPipeline from avalon.mainoperations import put_files, LakeFsWrapper, get_files -import lakefs_client +from lakefs_sdk.configuration import Configuration from functools import partial logger = get_logger() @@ -102,7 +102,7 @@ def get_executor_config(data_path='/opt/airflow/share/data'): return k8s_executor_config def init_lakefs_client(config: RogerConfig) -> LakeFsWrapper: - configuration = lakefs_client.Configuration() + configuration = Configuration() configuration.username = config.lakefs_config.access_key_id configuration.password = config.lakefs_config.secret_access_key configuration.host = config.lakefs_config.host From 9f032656cc3abff2f97db6c390137f3026e9fe71 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 18:16:23 -0500 Subject: [PATCH 35/71] fix glob for get kgx files --- dags/roger/core/base.py | 4 ++-- dags/roger/core/bulkload.py | 4 ++-- dags/roger/core/storage.py | 9 ++++++--- dags/roger/models/kgx.py | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 8337de39..8c50e770 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -112,7 +112,7 @@ def create_bulk_nodes(to_string=False, config=None, input_data_path=None, output "Generate bulk node CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_nodes_csv_file() + roger.bulk.create_nodes_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output @@ -120,7 +120,7 @@ def create_bulk_edges(to_string=False, config=None, input_data_path=None, output "Create bulk edges CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_edges_csv_file() + roger.bulk.create_edges_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 5610690e..05d4119c 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -47,7 +47,7 @@ def create (self): self.create_nodes_csv_file() self.create_edges_csv_file() - def create_nodes_csv_file(self): + def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): if self.tables_up_to_date (): log.info ("up to date.") return @@ -92,7 +92,7 @@ def create_nodes_csv_file(self): categories, categories_schema, state=state, is_relation=False) - def create_edges_csv_file(self): + def create_edges_csv_file(self, input_data_path=None, output_data_path=None): """ Write predicate data for bulk load. """ if self.tables_up_to_date (): log.info ("up to date.") diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index f4c359d5..fa0b7f3f 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -166,10 +166,13 @@ def merge_path(name, path: Path=None): return str(ROGER_DATA_DIR / 'merge' / name) return str(path.joinpath(name)) -def merged_objects(): +def merged_objects(file_type, path=None): """ A list of merged KGX objects. """ - merged_pattern = merge_path("**.json") - return sorted(glob.glob (merged_pattern)) + if not path: + merged_pattern = merge_path(f"**/{file_type}.jsonl") + else: + merged_pattern = merge_path(f"**/{file_type}.jsonl", path=path) + return sorted(glob.glob (merged_pattern, recursive=True)) def schema_path(name, path=None): """ Path to a schema object. diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 27b8eed6..0450f75e 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -309,7 +309,7 @@ def create_nodes_schema(self, input_data_path=None, output_data_path=None): category_schemas = defaultdict(lambda: None) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl", input_data_path) + merged_nodes_file = storage.merged_objects("nodes", input_data_path) log.info(f"Processing : {merged_nodes_file}") counter = 0 for node in storage.json_line_iter(merged_nodes_file): From 749deba3a1f9a00c58b4d4729044e467a82f189c Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 23 Jan 2024 18:32:30 -0500 Subject: [PATCH 36/71] fix up get merged objects --- dags/roger/core/storage.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index fa0b7f3f..85c22fb6 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -172,7 +172,12 @@ def merged_objects(file_type, path=None): merged_pattern = merge_path(f"**/{file_type}.jsonl") else: merged_pattern = merge_path(f"**/{file_type}.jsonl", path=path) - return sorted(glob.glob (merged_pattern, recursive=True)) + # this thing should always return one edges or nodes file (based on file_type) + try: + return sorted(glob.glob(merged_pattern, recursive=True))[0] + except IndexError: + raise ValueError(f"Could not find merged KGX of type {file_type} in {merged_pattern}") + def schema_path(name, path=None): """ Path to a schema object. From f4adf0d981afaba3ccaefe98dc18b7aa37cf85f4 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 13:47:32 -0500 Subject: [PATCH 37/71] send down fake commit id for metadata --- dags/roger/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index c1cccc54..b554bf3b 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -173,7 +173,9 @@ def avalon_commit_callback(context: DagContext, **kwargs): s3storage=False, lake_fs_client=client, branch=temp_branch_name, - repo=repo + repo=repo, + # @TODO figure out how to pass real commit id here + commit_id=branch ) # see what changes are going to be pushed from this branch to main branch From ce4c84f39b5c65d363e1c1d0df2c0249981a1058 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 14:22:53 -0500 Subject: [PATCH 38/71] working on edges schema --- dags/roger/models/kgx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 0450f75e..b7a1250e 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -364,7 +364,7 @@ def create_edges_schema(self, input_data_path=None, output_data_path=None): :return: """ predicate_schemas = defaultdict(lambda: None) - merged_edges_file = storage.merge_path("edges.jsonl", input_data_path) + merged_edges_file = storage.merged_objects("edges", input_data_path) """ Infer predicate schemas. """ for edge in storage.json_line_iter(merged_edges_file): predicate = edge['predicate'] From dcfd3db0fd8f132ee8b9e6c548466c363ee3cbb1 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 14:32:38 -0500 Subject: [PATCH 39/71] bulk create nodes I/O --- dags/roger/core/bulkload.py | 22 +++++----------------- dags/roger/core/storage.py | 25 +++++++++---------------- 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 05d4119c..fc9f12ab 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -33,36 +33,24 @@ def __init__(self, biolink, config=None): self.separator =(chr(separator) if isinstance(separator, int) else separator) - def tables_up_to_date (self): - return storage.is_up_to_date ( - source=[ - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json"), - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json") - ] + storage.merged_objects (), - targets=glob.glob (storage.bulk_path ("nodes/**.csv")) + \ - glob.glob (storage.bulk_path ("edges/**.csv"))) - def create (self): """Used in the CLI on args.create_bulk""" self.create_nodes_csv_file() self.create_edges_csv_file() def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): - if self.tables_up_to_date (): - log.info ("up to date.") - return # clear out previous data - bulk_path = storage.bulk_path("nodes") + bulk_path = storage.bulk_path("nodes", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - categories_schema = storage.read_schema (SchemaType.CATEGORY) + categories_schema = storage.read_schema (SchemaType.CATEGORY, input_data_path) state = defaultdict(lambda: None) log.info(f"processing nodes") """ Write node data for bulk load. """ categories = defaultdict(lambda: []) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merged_objects('nodes', input_data_path) counter = 1 for node in storage.json_line_iter(merged_nodes_file): if not node['category']: @@ -79,7 +67,7 @@ def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): f"Showing first 10: {list(category_error_nodes)[:10]}.") # flush every 100K if counter % 100_000 == 0: - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) # reset variables. @@ -88,7 +76,7 @@ def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): counter += 1 # write back if any thing left. if len(categories): - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 85c22fb6..5cd0c4a7 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -186,10 +186,13 @@ def schema_path(name, path=None): return str(ROGER_DATA_DIR / 'schema' / name) return str (path / 'schema' / name) -def bulk_path(name): +def bulk_path(name, path): """ Path to a bulk load object. :param name: Name of the object. """ - return str(ROGER_DATA_DIR / 'bulk' / name) + if not path: + return str(ROGER_DATA_DIR / 'bulk' / name) + else: + return str(path / name) def metrics_path(name): """ @@ -402,11 +405,11 @@ def dug_dd_xml_objects(input_data_path=None): def copy_file_to_dir(file_location, dir_name): return shutil.copy(file_location, dir_name) -def read_schema (schema_type: SchemaType): +def read_schema (schema_type: SchemaType, path=None): """ Read a schema object. :param schema_type: Schema type of the object to read. """ - path = schema_path (f"{schema_type.value}-schema.json") - return read_object (path) + location = schema_path (f"{schema_type.value}-schema.json", path=path) + return read_object (location) def get_uri (path, key): """ Build a URI. @@ -426,17 +429,7 @@ def read_relative_object (path): def trunc(text, limit): return ('..' + text[-limit-2:]) if len(text) > limit else text -def is_up_to_date (source, targets): - target_time_list = [ - os.stat (f).st_mtime for f in targets if os.path.exists(f)] - if len(target_time_list) == 0: - log.debug (f"no targets found") - return False - source = [ os.stat (f).st_mtime for f in source if os.path.exists (f) ] - if len(source) == 0: - log.debug ("no source found. up to date") - return True - return max(source) < min(target_time_list) + def json_line_iter(jsonl_file_path): f = open(file=jsonl_file_path, mode='r', encoding='utf-8') From e43b5f1e4ea9d8780c4913775d50fc444a93a701 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 14:58:39 -0500 Subject: [PATCH 40/71] find schema file --- dags/roger/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 5cd0c4a7..110f13f1 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -184,7 +184,7 @@ def schema_path(name, path=None): :param name: Name of the object to get a path for. """ if not path: return str(ROGER_DATA_DIR / 'schema' / name) - return str (path / 'schema' / name) + return glob.glob(str (path / '**' / 'schema' / name), recursive=True)[0] def bulk_path(name, path): """ Path to a bulk load object. From db5801874fe0f1cbc672862800f355b96cf42586 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 15:29:04 -0500 Subject: [PATCH 41/71] bulk create edges I/O --- dags/roger/core/bulkload.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index fc9f12ab..d2d4427c 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -82,16 +82,13 @@ def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): def create_edges_csv_file(self, input_data_path=None, output_data_path=None): """ Write predicate data for bulk load. """ - if self.tables_up_to_date (): - log.info ("up to date.") - return # Clear out previous data bulk_path = storage.bulk_path("edges") if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - predicates_schema = storage.read_schema(SchemaType.PREDICATE) + predicates_schema = storage.read_schema(SchemaType.PREDICATE, input_data_path) predicates = defaultdict(lambda: []) - edges_file = storage.merge_path('edges.jsonl') + edges_file = storage.merged_objects('edges', input_data_path) counter = 1 state = {} for edge in storage.json_line_iter(edges_file): @@ -99,14 +96,14 @@ def create_edges_csv_file(self, input_data_path=None, output_data_path=None): # write out every 100K , to avoid large predicate dict. if counter % 100_000 == 0: self.write_bulk( - storage.bulk_path("edges"),predicates, predicates_schema, + storage.bulk_path("edges", output_data_path),predicates, predicates_schema, state=state, is_relation=True) predicates = defaultdict(lambda : []) counter += 1 # if there are some items left (if loop ended before counter reached the # specified value) if len(predicates): - self.write_bulk(storage.bulk_path("edges"), predicates, + self.write_bulk(storage.bulk_path("edges", output_data_path), predicates, predicates_schema,state=state, is_relation=True) @staticmethod From 44ab91a2ff93e2c2a202fd974f8083f9366a29da Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 15:30:22 -0500 Subject: [PATCH 42/71] bulk create edges I/O --- dags/roger/core/bulkload.py | 2 +- dags/roger/core/storage.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index d2d4427c..8569b0d9 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -83,7 +83,7 @@ def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): def create_edges_csv_file(self, input_data_path=None, output_data_path=None): """ Write predicate data for bulk load. """ # Clear out previous data - bulk_path = storage.bulk_path("edges") + bulk_path = storage.bulk_path("edges", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) predicates_schema = storage.read_schema(SchemaType.PREDICATE, input_data_path) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index 110f13f1..0f79aad4 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -186,7 +186,7 @@ def schema_path(name, path=None): return str(ROGER_DATA_DIR / 'schema' / name) return glob.glob(str (path / '**' / 'schema' / name), recursive=True)[0] -def bulk_path(name, path): +def bulk_path(name, path=None): """ Path to a bulk load object. :param name: Name of the object. """ if not path: From 27fc0e41880d3558195e3a4be27990ddfe4d7b40 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 15:37:53 -0500 Subject: [PATCH 43/71] bulk load io --- dags/roger/core/base.py | 2 +- dags/roger/core/bulkload.py | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 8c50e770..0e84501f 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -128,7 +128,7 @@ def bulk_load(to_string=False, config=None, input_data_path=None, output_data_pa "Run bulk load insert process" output = None with Roger (to_string, config=config) as roger: - roger.bulk.insert() + roger.bulk.insert(input_data_path=input_data_path) output = roger.log_stream.getvalue () if to_string else None return output diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 8569b0d9..afeeb9cc 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -306,17 +306,10 @@ def write_bulk(self, bulk_path, obj_map, schema, state={}, state['processed_id'] = processed_objects_id state['called_times'] = called_x_times - def insert (self): - - redisgraph = { - 'host': os.getenv('REDIS_HOST'), - 'port': os.getenv('REDIS_PORT', '6379'), - 'password': os.getenv('REDIS_PASSWORD'), - 'graph': os.getenv('REDIS_GRAPH'), - } + def insert (self, input_data_path=None): redisgraph = self.config.redisgraph - nodes = sorted(glob.glob (storage.bulk_path ("nodes/**.csv*"))) - edges = sorted(glob.glob (storage.bulk_path ("edges/**.csv*"))) + nodes = sorted(glob.glob (storage.bulk_path ("**/nodes/**.csv*", input_data_path))) + edges = sorted(glob.glob (storage.bulk_path ("**/edges/**.csv*", input_data_path))) graph = redisgraph['graph'] log.info(f"bulk loading \n nodes: {nodes} \n edges: {edges}") From 2319a46c0d06d536ca9831910c3abd04034aaf1d Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 15:38:17 -0500 Subject: [PATCH 44/71] no outputs for final tasks --- dags/knowledge_graph_build.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 1d5a6b5f..95f8a83d 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -8,7 +8,7 @@ from airflow.models import DAG from airflow.operators.empty import EmptyOperator import roger -from roger.tasks import get_executor_config, default_args, create_python_task +from roger.tasks import default_args, create_python_task from roger.config import config """ Build the workflow's tasks and DAG. """ @@ -49,17 +49,28 @@ name="CreateNodesSchema", a_callable=roger.create_nodes_schema ) - create_edges_schema = create_python_task(dag, "CreateEdgesSchema", - roger.create_edges_schema) + create_edges_schema = create_python_task(dag, + name="CreateEdgesSchema", + a_callable=roger.create_edges_schema) - create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", - roger.create_bulk_nodes) - create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", - roger.create_bulk_edges) - bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) - check_tranql = create_python_task(dag, "CheckTranql", - roger.check_tranql) - validate = create_python_task(dag, "Validate", roger.validate) + create_bulk_load_nodes = create_python_task(dag, + name="CreateBulkLoadNodes", + a_callable=roger.create_bulk_nodes) + create_bulk_load_edges = create_python_task(dag, + name="CreateBulkLoadEdges", + a_callable=roger.create_bulk_edges) + bulk_load = create_python_task(dag, + name="BulkLoad", + a_callable=roger.bulk_load, + no_output_files=True) + check_tranql = create_python_task(dag, + name="CheckTranql", + a_callable=roger.check_tranql, + no_output_files=True) + validate = create_python_task(dag, + name="Validate", + a_callable=roger.validate, + no_output_files=True) """ Build the DAG. """ From 6429caaf1d01c787e751507b089965e71883474e Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 16:02:59 -0500 Subject: [PATCH 45/71] add recursive glob --- dags/roger/core/bulkload.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index afeeb9cc..61accf17 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -308,8 +308,8 @@ def write_bulk(self, bulk_path, obj_map, schema, state={}, def insert (self, input_data_path=None): redisgraph = self.config.redisgraph - nodes = sorted(glob.glob (storage.bulk_path ("**/nodes/**.csv*", input_data_path))) - edges = sorted(glob.glob (storage.bulk_path ("**/edges/**.csv*", input_data_path))) + nodes = sorted(glob.glob (storage.bulk_path ("**/nodes/**.csv*", input_data_path), recursive=True)) + edges = sorted(glob.glob (storage.bulk_path ("**/edges/**.csv*", input_data_path), recursive=True)) graph = redisgraph['graph'] log.info(f"bulk loading \n nodes: {nodes} \n edges: {edges}") @@ -342,7 +342,6 @@ def insert (self, input_data_path=None): # Edge label now no longer has 'biolink:' args.extend(("-R " + " -R ".join(edges_with_type)).split()) args.extend([f"--separator={self.separator}"]) - log.debug(f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}") args.extend([f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}"]) args.extend(['--enforce-schema']) args.extend([f"{redisgraph['graph']}"]) From 3c84f6aa35023e26ffde125906b223e00b39b904 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 16:12:37 -0500 Subject: [PATCH 46/71] fix globbing --- dags/roger/core/bulkload.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 61accf17..65f481db 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -323,7 +323,7 @@ def insert (self, input_data_path=None): log.info ("bulk loading graph: %s", str(graph)) args = [] if len(nodes) > 0: - bulk_path_root = storage.bulk_path('nodes') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/nodes', path=input_data_path), recursive=True)[0] + os.path.sep nodes_with_type = [] for x in nodes: """ @@ -336,7 +336,7 @@ def insert (self, input_data_path=None): nodes_with_type.append(f"{all_labels} {x}") args.extend(("-N " + " -N ".join(nodes_with_type)).split()) if len(edges) > 0: - bulk_path_root = storage.bulk_path('edges') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/edges', path=input_data_path), recursive=True) + os.path.sep edges_with_type = [f"biolink.{x.replace(bulk_path_root, '').strip(os.path.sep).split('.')[0].split('~')[1]} {x}" for x in edges] # Edge label now no longer has 'biolink:' From f8db982a0cc1ffcbfe6ee6fae0276a89a916cbb0 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 16:14:45 -0500 Subject: [PATCH 47/71] oops --- dags/roger/core/bulkload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 65f481db..2113f1b7 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -336,7 +336,7 @@ def insert (self, input_data_path=None): nodes_with_type.append(f"{all_labels} {x}") args.extend(("-N " + " -N ".join(nodes_with_type)).split()) if len(edges) > 0: - bulk_path_root = glob.glob(storage.bulk_path('**/edges', path=input_data_path), recursive=True) + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/edges', path=input_data_path), recursive=True)[0] + os.path.sep edges_with_type = [f"biolink.{x.replace(bulk_path_root, '').strip(os.path.sep).split('.')[0].split('~')[1]} {x}" for x in edges] # Edge label now no longer has 'biolink:' From 8ab1a5c488d706fef143c3084abc2f38a02fcc16 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Wed, 24 Jan 2024 17:00:40 -0500 Subject: [PATCH 48/71] delete dags --- dags/annotate.py | 102 --------------------------------------- dags/index_dag.py | 28 ----------- dags/tranql_translate.py | 43 ----------------- 3 files changed, 173 deletions(-) delete mode 100755 dags/annotate.py delete mode 100755 dags/index_dag.py delete mode 100755 dags/tranql_translate.py diff --git a/dags/annotate.py b/dags/annotate.py deleted file mode 100755 index 53e5b0a7..00000000 --- a/dags/annotate.py +++ /dev/null @@ -1,102 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from dug_helpers.dug_utils import ( - DugUtil, - get_topmed_files, - get_dbgap_files, - get_nida_files, - get_sparc_files, - get_anvil_files, - get_cancer_data_commons_files, - get_kids_first_files, - get_sprint_files, - get_bacpac_files, - get_heal_study_files, - get_heal_research_program_files - ) -from roger.tasks import default_args, create_python_task - -DAG_ID = 'annotate_dug' - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id=DAG_ID, - default_args=default_args, - schedule_interval=None -) as dag: - - """Build workflow tasks.""" - intro = EmptyOperator(task_id='Intro', dag=dag) - - # Unzip and get files, avoid this because - # 1. it takes a bit of time making the dag itself, webserver hangs - # 2. Every task in this dag would still need to execute this part making it redundant - # 3. tasks like intro would fail because they don't have the data dir mounted. - - make_kg_tagged = create_python_task(dag, "make_tagged_kgx", DugUtil.make_kg_tagged) - - dummy_stepover = EmptyOperator(task_id="continue") - - #intro >> run_printlog - envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed") - data_sets = envspec.split(",") - - clear_annotation_items = create_python_task(dag, "clear_annotation_files", DugUtil.clear_annotation_cached) - - for i, data_set in enumerate(data_sets): - annotate_files = None - if data_set.startswith("bdc"): - prepare_files = create_python_task(dag, "get_dbgap_data", get_dbgap_files) - annotate_files = create_python_task(dag, "annotate_db_gap_files", DugUtil.annotate_db_gap_files) - elif data_set.startswith("nida"): - prepare_files = create_python_task(dag, "get_nida_files", get_nida_files) - annotate_files = create_python_task(dag, "annotate_nida_files", DugUtil.annotate_nida_files) - elif data_set.startswith("sparc"): - prepare_files = create_python_task(dag, "get_sparc_files", get_sparc_files) - annotate_files = create_python_task(dag, "annotate_sparc_files", DugUtil.annotate_sparc_files) - elif data_set.startswith("topmed"): - prepare_files = create_python_task(dag, "get_topmed_data", get_topmed_files) - annotate_files = create_python_task(dag, "annotate_topmed_files", DugUtil.annotate_topmed_files) - elif data_set.startswith("anvil"): - prepare_files = create_python_task(dag, "get_anvil_data", get_anvil_files) - annotate_files = create_python_task(dag, "annotate_anvil_files", DugUtil.annotate_anvil_files) - elif data_set.startswith("crdc"): - prepare_files = create_python_task(dag, "get_cancer_commons_files", get_cancer_data_commons_files) - annotate_files = create_python_task(dag, "annotate_cancer_commons_files", - DugUtil.annotate_cancer_commons_files) - elif data_set.startswith("kfdrc"): - prepare_files = create_python_task(dag, "get_kids_first_files", get_kids_first_files) - annotate_files = create_python_task(dag, "annotate_kids_first_files", - DugUtil.annotate_kids_first_files) - elif data_set.startswith("sprint"): - prepare_files = create_python_task(dag, "get_sprint_files", get_sprint_files) - annotate_files = create_python_task(dag, "annotate_sprint_files", - DugUtil.annotate_sprint_files) - elif data_set.startswith("bacpac"): - prepare_files = create_python_task(dag, "get_bacpac_files", get_bacpac_files) - annotate_files = create_python_task(dag, "annotate_bacpac_files", - DugUtil.annotate_bacpac_files) - - elif data_set.startswith("heal-studies"): - prepare_files = create_python_task(dag, "get_heal_study_files", get_heal_study_files) - annotate_files = create_python_task(dag, "annotate_heal_study_files", - DugUtil.annotate_heal_study_files) - # elif data_set.startswith("heal-mds-imports"): - # prepare_files = create_python_task(dag, "get_heal_mds_imports", get_heal_study_files) - - elif data_set.startswith("heal-research-programs"): - prepare_files = create_python_task(dag, "get_heal_research_program_files", get_heal_research_program_files) - annotate_files = create_python_task(dag, "annotate_heal_research_program_files", - DugUtil.annotate_heal_research_program_files) - - intro >> prepare_files - prepare_files >> clear_annotation_items - - if annotate_files: - clear_annotation_items >> annotate_files - annotate_files >> dummy_stepover - - dummy_stepover >> make_kg_tagged diff --git a/dags/index_dag.py b/dags/index_dag.py deleted file mode 100755 index 6ddc4b02..00000000 --- a/dags/index_dag.py +++ /dev/null @@ -1,28 +0,0 @@ -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from roger.tasks import get_executor_config, default_args, create_python_task -from dug_helpers.dug_utils import DugUtil - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='index_dug', - default_args=default_args, - schedule_interval=None -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - index_variables = create_python_task (dag, "IndexVariables", DugUtil.index_variables) - validate_index_variables = create_python_task(dag,"ValidateIndexVariables", DugUtil.validate_indexed_variables) - crawl_tags = create_python_task(dag, "CrawlConcepts", DugUtil.crawl_tranql) - index_concepts = create_python_task(dag, "IndexConcepts", DugUtil.index_concepts) - dummy_stepover = EmptyOperator(task_id="continue") - index_extracted_dug_elements = create_python_task(dag, "IndexExtractedElements", DugUtil.index_extracted_elements) - validate_index_concepts = create_python_task(dag, "ValidateIndexConcepts", DugUtil.validate_indexed_concepts) - finish = EmptyOperator(task_id='Finish') - """ Build the DAG. """ - intro >> index_variables >> validate_index_variables >> finish - intro >> crawl_tags >> index_concepts >> dummy_stepover - intro >> crawl_tags >> index_extracted_dug_elements >> dummy_stepover - dummy_stepover >> validate_index_concepts >> finish \ No newline at end of file diff --git a/dags/tranql_translate.py b/dags/tranql_translate.py deleted file mode 100755 index 53394ba2..00000000 --- a/dags/tranql_translate.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -# - -""" -An Airflow workflow for the Roger Translator KGX data pipeline. -""" - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -import roger -from roger.tasks import get_executor_config, default_args, create_python_task - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='tranql_translate', - default_args=default_args, - schedule_interval=None, - concurrency=16, -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - get_kgx = create_python_task (dag, "GetSource", roger.get_kgx) - create_nodes_schema = create_python_task(dag, "CreateNodesSchema", - roger.create_nodes_schema) - create_edges_schema = create_python_task(dag, "CreateEdgesSchema", - roger.create_edges_schema) - continue_task_bulk_load = EmptyOperator(task_id="continueBulkCreate") - continue_task_validate = EmptyOperator(task_id="continueValidation") - merge_nodes = create_python_task (dag, "MergeNodes", roger.merge_nodes) - create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", - roger.create_bulk_nodes) - create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", - roger.create_bulk_edges) - bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) - check_tranql = create_python_task(dag, "CheckTranql", - roger.check_tranql) - validate = create_python_task(dag, "Validate", roger.validate) - finish = EmptyOperator(task_id='Finish') - - """ Build the DAG. """ - intro >> get_kgx >> merge_nodes >> [create_nodes_schema, create_edges_schema ] >> continue_task_bulk_load >> \ - [create_bulk_load_nodes, create_bulk_load_edges] >> bulk_load >> continue_task_validate >>[validate, check_tranql ] >> finish From ce59d533d287159c219c02752f8cf7c9e089374b Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 13:28:07 -0500 Subject: [PATCH 49/71] pin dug to latest release --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9818afcc..3d438437 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@patch-nrslv-resp +git+https://github.com/helxplatform/dug@2.13.0 orjson kg-utils==0.0.6 bmt==1.1.0 From 3434d1a60b2591f612adbc874a406b147c808de8 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 13:31:48 -0500 Subject: [PATCH 50/71] cruft cleanup --- dags/roger/models/kgx.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index b7a1250e..bf51f193 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -8,9 +8,8 @@ from collections import defaultdict from xxhash import xxh64_hexdigest import orjson as json -import redis import ntpath -from kg_utils.merging import GraphMerger, MemoryGraphMerger, DiskGraphMerger +from kg_utils.merging import DiskGraphMerger from kg_utils.constants import * from roger.config import get_default_config @@ -50,11 +49,6 @@ def __init__(self, biolink=None, config=None): self.biolink = BiolinkModel(self.biolink_version) else: self.biolink = biolink - # self.redis_conn = redis.Redis( - # host=self.config.redisgraph.host, - # port=self.config.redisgraph.port, - # password=self.config.redisgraph.password, - # db=self.merge_db_id) self.enable_metrics = self.config.get('enable_metrics', False) def get_kgx_json_format(self, files: list, dataset_version: str): @@ -505,10 +499,3 @@ def merge(self, input_path=None, output_path=None): metricsfile_path = storage.metrics_path('merge_metrics.yaml') storage.write_object(metrics, metricsfile_path) - - -if __name__ == '__main__': - kg_merger = KGXModel() - import pathlib - kg_merger.merge(input_path=pathlib.Path("/home/kebedey/projects/helx/roger/kgx_dir"), - output_path=pathlib.Path("/home/kebedey/projects/helx/roger/kgx_dir_out")) From 60f90ba813e1a0589fd3658725f91e4a5bfed848 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 16:22:44 -0500 Subject: [PATCH 51/71] re-org kgx config --- dags/roger/config/__init__.py | 4 +--- dags/roger/config/config.yaml | 4 +--- dags/roger/models/kgx.py | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 44c2f672..ac9eb23a 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -50,10 +50,8 @@ class LoggingConfig(DictLike): @dataclass class KgxConfig(DictLike): biolink_model_version: str = "1.5.0" - dataset_version: str = "v1.0" - merge_db_id: int = 1 merge_db_temp_dir: str = "workspace" - data_sets: List = field(default_factory=lambda: ['baseline-graph']) + data_sets: List = field(default_factory=lambda: ['baseline-graph:v5.0']) def __post_init__(self): # Convert strings to list. In cases where this is passed as env variable with a single value diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index a956f00e..e9402ce4 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -16,11 +16,9 @@ annotation_base_data_uri: https://stars.renci.org/var/dug/ kgx: biolink_model_version: v3.1.2 - dataset_version: v5.0 - merge_db_id: 1 merge_db_temp_dir: workspace data_sets: - - baseline-graph + - baseline-graph:v5.0 dug_inputs: data_source: s3 diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index bf51f193..bc301262 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -42,8 +42,6 @@ def __init__(self, biolink=None, config=None): self.merger = DiskGraphMerger(temp_directory=self.temp_directory, chunk_size=5_000_000) self.biolink_version = self.config.kgx.biolink_model_version - self.merge_db_id = self.config.kgx.merge_db_id - self.merge_db_name = f'db{self.merge_db_id}' log.debug(f"Trying to get biolink version : {self.biolink_version}") if biolink is None: self.biolink = BiolinkModel(self.biolink_version) From 6f2c0cc3ff715db405f8e183b78af820ac004d69 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 17:37:01 -0500 Subject: [PATCH 52/71] add support for multiple initial repos --- dags/knowledge_graph_build.py | 26 +++++++++++---- dags/roger/tasks.py | 61 +++++++++++++++++++---------------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 95f8a83d..398a0a83 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -30,17 +30,31 @@ working_repo = config.lakefs_config.repo branch = config.lakefs_config.branch kgx_repos = config.kgx.data_sets - get_path_on_lakefs = lambda d: f"{working_repo}/{branch}/annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" - kgx_files_to_grab = [] + input_repos = [{ + 'name': repo.split(':')[0] + 'branch': repo.split(':')[1], + 'path': '*' + } for repo in kgx_repos] + + # Figure out a way to extract paths + get_path_on_lakefs = lambda d: f"/annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" + + for dataset in config.dug_inputs.data_sets: dataset_name = dataset.split(":")[0] - kgx_files_to_grab.append(get_path_on_lakefs(dataset_name)) - + # add datasets from the other pipeline + input_repos.append( + { + 'name': working_repo, + 'branch': branch, + 'path': get_path_on_lakefs(dataset_name) + } + ) merge_nodes = create_python_task (dag, name="MergeNodes", a_callable=roger.merge_nodes, - input_repo="cde-graph", - input_branch="v5.0") + external_repos=input_repos + ) # The rest of these guys can just operate on the local lakefs repo/branch # we need to add input dir and output dir similar to what we did for dug tasks diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index b554bf3b..c1cdc7f3 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -243,36 +243,45 @@ def setup_input_data(context, exec_conf): # Download files from lakefs and store them in this new input_path client = init_lakefs_client(config=config) - input_repo = exec_conf['input_repo'] - input_branch = exec_conf['input_branch'] - # If input repo is provided use that as source of files - if exec_conf.get('path') and exec_conf.get('path') == '*': - remote_paths = ['*'] # root path to get all sub dirs - # else figure out what to pull from the repo based on task name etc... - else: + repos = exec_conf['repos'] + # if no external repo is provided we assume to get the upstream task dataset. + if not repos or len(repos) == 0: + # merge destination branch + branch = config.lakefs_config.branch + repo = config.lakefs_config.repo task_instance: TaskInstance = context['ti'] # get upstream ids upstream_ids = task_instance.task.upstream_task_ids dag_id = task_instance.dag_id # calculate remote dirs using dag_id + upstreams - - remote_paths = [f'{dag_id}/{upstream_id}' - for upstream_id in upstream_ids] - for remote_path in remote_paths: + repos = [{ + 'repo': repo, + 'branch': branch, + 'path': f'{dag_id}/{upstream_id}' + } for upstream_id in upstream_ids] + + # input_repo = exec_conf['input_repo'] + # input_branch = exec_conf['input_branch'] + # If input repo is provided use that as source of files + for repo in repos: + if not repo.get('path'): + # get all if path is not specified + repo['path'] = '*' + logger.info(f"repos : {repos}") + for r in repos: logger.info("downloading %s from %s@%s to %s", - remote_path, input_repo, input_branch, input_dir) + r['path'], r['repo'], r['branch'], input_dir) get_files( - local_path=input_dir, - remote_path=remote_path, - branch=input_branch, - repo=input_repo, + local_path=input_dir + f'/{r["repo"]}', + remote_path=r['path'], + branch=r['repo'], + repo=r['branch'], changes_only=False, lake_fs_client=client ) -def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, - input_branch=None, pass_conf=True, no_output_files=False): +def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = {}, pass_conf=True, no_output_files=False): """ Create a python task. :param func_kwargs: additional arguments for callable. :param dag: dag to add task to. @@ -305,19 +314,17 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, # repo and branch for pre-execution , to download input objects pre_exec_conf = { - 'input_repo': config.lakefs_config.repo, - 'input_branch': config.lakefs_config.branch + 'repos': [] } - - if input_repo and input_branch: + if external_repos: # if the task is a root task , beginning of the dag... # and we want to pull data from a different repo. pre_exec_conf = { - 'input_repo': input_repo, - 'input_branch': input_branch, - # if path is not defined , we can use the context (dag context) to - # resolve the previous task dir in lakefs. - 'path': '*' + 'repos': [{ + 'repo': r['name'], + 'branch': r['branch'], + 'path': r.get('path', '*') + } for r in external_repos] } pre_exec = partial(setup_input_data, exec_conf=pre_exec_conf) From eef984efe8c3fb2f8dd92ebff8096f4077eaa9de Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 17:38:21 -0500 Subject: [PATCH 53/71] fix comma --- dags/knowledge_graph_build.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 398a0a83..65e763ea 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -31,7 +31,7 @@ branch = config.lakefs_config.branch kgx_repos = config.kgx.data_sets input_repos = [{ - 'name': repo.split(':')[0] + 'name': repo.split(':')[0], 'branch': repo.split(':')[1], 'path': '*' } for repo in kgx_repos] From f90f13ffc46a792ea48ef45ab136693ac8dd6361 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 17:55:23 -0500 Subject: [PATCH 54/71] create dir to download to --- dags/roger/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index c1cdc7f3..df740b7a 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -271,6 +271,9 @@ def setup_input_data(context, exec_conf): for r in repos: logger.info("downloading %s from %s@%s to %s", r['path'], r['repo'], r['branch'], input_dir) + # create path to download to ... + if not os.path.exists(input_dir + f'/{r["repo"]}'): + os.mkdir(input_dir + f'/{r["repo"]}') get_files( local_path=input_dir + f'/{r["repo"]}', remote_path=r['path'], From 4fd8ae2886b240a4efeb12d1fc473cc3be42ee7c Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 18:47:38 -0500 Subject: [PATCH 55/71] swap branch and repo --- dags/roger/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index df740b7a..f4c9b861 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -277,8 +277,8 @@ def setup_input_data(context, exec_conf): get_files( local_path=input_dir + f'/{r["repo"]}', remote_path=r['path'], - branch=r['repo'], - repo=r['branch'], + branch=r['branch'], + repo=r['repo'], changes_only=False, lake_fs_client=client ) From 0c057c1a9c5a9696b410714dccde65f2750960b9 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 19:51:40 -0500 Subject: [PATCH 56/71] clean up dirs --- dags/knowledge_graph_build.py | 2 +- dags/roger/tasks.py | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index 65e763ea..de28aa78 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -37,7 +37,7 @@ } for repo in kgx_repos] # Figure out a way to extract paths - get_path_on_lakefs = lambda d: f"/annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" + get_path_on_lakefs = lambda d: f"annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" for dataset in config.dug_inputs.data_sets: diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index f4c9b861..b13321d7 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -203,11 +203,20 @@ def avalon_commit_callback(context: DagContext, **kwargs): logger.info(f"deleted temp branch {temp_branch_name}") logger.info(f"deleting local dir {local_path}") files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] + clean_up(context, **kwargs) + +def clean_up(context: DagContext, **kwargs): + input_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' + output_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='input')).rstrip('/') + '/' + files_to_clean = glob.glob(input_dir + '**', recursive=True) + [input_dir] + files_to_clean += glob.glob(output_dir + '**', recursive=True) + [output_dir] for f in files_to_clean: - shutil.rmtree(f) - - - + if os.path.exists(f): + shutil.rmtree(f) def generate_dir_name_from_task_instance(task_instance: TaskInstance, roger_config: RogerConfig, suffix:str): @@ -333,7 +342,7 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = pre_exec = partial(setup_input_data, exec_conf=pre_exec_conf) # add pre_exec partial function as an argument to python executor conf python_operator_args['pre_execute'] = pre_exec - + python_operator_args['on_failure_callback'] = partial(clean_up, kwargs=op_kwargs) # if the task has output files, we will add a commit callback if not no_output_files: python_operator_args['on_success_callback'] = partial(avalon_commit_callback, kwargs=op_kwargs) From 9ac704af0ad44077c5b86641e63703624b37237b Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 25 Jan 2024 20:02:49 -0500 Subject: [PATCH 57/71] =?UTF-8?q?fix=20up=20other=20pipeline=20?= =?UTF-8?q?=F0=9F=91=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dags/roger/tasks.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index b13321d7..852cf4b7 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -371,8 +371,10 @@ def create_pipeline_taskgroup( dag, f"annotate_{name}_files", pipeline.annotate, - input_repo=getattr(pipeline_class, 'pipeline_name'), - input_branch=input_dataset_version, + external_repos=[{ + 'name': getattr(pipeline_class, 'pipeline_name'), + 'branch': input_dataset_version + }], pass_conf=False) index_variables_task = create_python_task( From b663308811f5dc600a333e68e51572e786815edf Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Mon, 29 Jan 2024 17:58:34 -0500 Subject: [PATCH 58/71] add remaining pipelines --- dags/roger/pipelines/bdc.py | 4 ++-- dags/roger/pipelines/crdc.py | 4 ++-- dags/roger/pipelines/heal_research_programs.py | 16 ++++++++++++++++ dags/roger/pipelines/heal_studies.py | 16 ++++++++++++++++ dags/roger/pipelines/kfdrc.py | 4 ++-- dags/roger/pipelines/sparc.py | 17 +++++++++++++++++ dags/roger/pipelines/topmed.py | 4 ++-- 7 files changed, 57 insertions(+), 8 deletions(-) create mode 100644 dags/roger/pipelines/heal_research_programs.py create mode 100644 dags/roger/pipelines/heal_studies.py create mode 100644 dags/roger/pipelines/sparc.py diff --git a/dags/roger/pipelines/bdc.py b/dags/roger/pipelines/bdc.py index 4b40690c..bc30cf44 100644 --- a/dags/roger/pipelines/bdc.py +++ b/dags/roger/pipelines/bdc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for BDC-dbGap data" from roger.pipelines import DugPipeline from roger.core import storage class bdcPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for BDC-dbGap data set" pipeline_name = "bdc" parser_name = "dbgap" diff --git a/dags/roger/pipelines/crdc.py b/dags/roger/pipelines/crdc.py index 521afe01..2143cf7b 100644 --- a/dags/roger/pipelines/crdc.py +++ b/dags/roger/pipelines/crdc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for Cancer Commons data" from roger.pipelines import DugPipeline from roger.core import storage class CRDCPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for Cancer Commons data set" pipeline_name = "crdc" parser_name = "crdc" diff --git a/dags/roger/pipelines/heal_research_programs.py b/dags/roger/pipelines/heal_research_programs.py new file mode 100644 index 00000000..d95a4497 --- /dev/null +++ b/dags/roger/pipelines/heal_research_programs.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealResearchProgramPipeline(DugPipeline): + "Pipeline for Heal-research-programs data set" + pipeline_name = "heal-research-programs" + parser_name = "heal-research" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_research_program_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) \ No newline at end of file diff --git a/dags/roger/pipelines/heal_studies.py b/dags/roger/pipelines/heal_studies.py new file mode 100644 index 00000000..cf78c042 --- /dev/null +++ b/dags/roger/pipelines/heal_studies.py @@ -0,0 +1,16 @@ +"Pipeline for Heal-studies data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class HealStudiesPipeline(DugPipeline): + "Pipeline for Heal-studies data set" + pipeline_name = "heal-studies" + parser_name = "heal-studies" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'), + input_data_path) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/kfdrc.py b/dags/roger/pipelines/kfdrc.py index 3c5a012d..bcb0b7ac 100644 --- a/dags/roger/pipelines/kfdrc.py +++ b/dags/roger/pipelines/kfdrc.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for KDFRC data" from roger.pipelines import DugPipeline from roger.core import storage class kfdrcPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for KDFRC data set" pipeline_name = "kfdrc" parser_name = "kfdrc" diff --git a/dags/roger/pipelines/sparc.py b/dags/roger/pipelines/sparc.py new file mode 100644 index 00000000..d1c9c950 --- /dev/null +++ b/dags/roger/pipelines/sparc.py @@ -0,0 +1,17 @@ +"Pipeline for Sparc data" + +from roger.pipelines import DugPipeline +from roger.core import storage + +class SparcPipeline(DugPipeline): + "Pipeline for Sparc data set" + pipeline_name = "sparc" + parser_name = "SciCrunch" + + def get_objects(self, input_data_path=None): + if not input_data_path: + input_data_path = storage.dug_heal_study_path() + files = storage.get_files_recursive( + lambda x: True, input_data_path + ) + return sorted([str(f) for f in files]) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index a839898b..fa218359 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -1,10 +1,10 @@ -"Pipeline for BACPAC data" +"Pipeline for Topmed data" from roger.pipelines import DugPipeline from roger.core import storage class TopmedPipeline(DugPipeline): - "Pipeline for BACPAC data set" + "Pipeline for Topmed data set" pipeline_name = "topmed" parser_name = "TOPMedTag" From 0f3ae358862969361736e2268334b3ca22e1fbac Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 30 Jan 2024 12:25:04 -0500 Subject: [PATCH 59/71] adding ctn parser --- dags/annotate_and_index.py | 2 +- dags/roger/pipelines/ctn.py | 10 ++++++++++ requirements.txt | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 dags/roger/pipelines/ctn.py diff --git a/dags/annotate_and_index.py b/dags/annotate_and_index.py index e08260b7..884cd149 100644 --- a/dags/annotate_and_index.py +++ b/dags/annotate_and_index.py @@ -25,7 +25,7 @@ from roger import pipelines from roger.config import config - envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed") + envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed:v2.0") data_sets = envspec.split(",") pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in data_sets} for pipeline_class in pipelines.get_pipeline_classes(pipeline_names): diff --git a/dags/roger/pipelines/ctn.py b/dags/roger/pipelines/ctn.py new file mode 100644 index 00000000..25918062 --- /dev/null +++ b/dags/roger/pipelines/ctn.py @@ -0,0 +1,10 @@ +"Pipeline for Clinical trials network data" + +from roger.pipelines import DugPipeline + +class CTNPipeline(DugPipeline): + "Pipeline for Clinical trials nework data set" + pipeline_name = "ctn" + parser_name = "ctn" + + diff --git a/requirements.txt b/requirements.txt index 3d438437..52d4fdf3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@2.13.0 +git+https://github.com/helxplatform/dug@ctn-parser orjson kg-utils==0.0.6 bmt==1.1.0 From aa5ce40e0be76b30b4aa4cb4c877b134b3877df2 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 1 Feb 2024 08:36:03 -0500 Subject: [PATCH 60/71] change merge strategy --- dags/roger/tasks.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 852cf4b7..11d776c7 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -19,6 +19,7 @@ from roger.pipelines.base import DugPipeline from avalon.mainoperations import put_files, LakeFsWrapper, get_files from lakefs_sdk.configuration import Configuration +from lakefs_sdk.models.merge import Merge from functools import partial logger = get_logger() @@ -186,9 +187,13 @@ def avalon_commit_callback(context: DagContext, **kwargs): try: # merging temp branch to working branch + # the current working branch wins incase of conflicts + merge = Merge({"strategy": "source-wins"}) client._client.refs_api.merge_into_branch(repository=repo, source_ref=temp_branch_name, - destination_branch=branch) + destination_branch=branch, + merge=merge + ) logger.info(f"merged branch {temp_branch_name} into {branch}") except Exception as e: From 455c3f39cad8819bc944c4839ff77905bb2b854f Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 1 Feb 2024 12:06:55 -0500 Subject: [PATCH 61/71] merge init fix --- dags/roger/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 11d776c7..0b0f922d 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -187,8 +187,8 @@ def avalon_commit_callback(context: DagContext, **kwargs): try: # merging temp branch to working branch - # the current working branch wins incase of conflicts - merge = Merge({"strategy": "source-wins"}) + # the current working branch wins incase of conflicts + merge = Merge(**{"strategy": "source-wins"}) client._client.refs_api.merge_into_branch(repository=repo, source_ref=temp_branch_name, destination_branch=branch, From f4f9d642d28199961cad0b0f258a47ed6957cb70 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Mon, 5 Feb 2024 11:30:13 -0500 Subject: [PATCH 62/71] debug dir --- dags/roger/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 0b0f922d..52f03792 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -208,7 +208,8 @@ def avalon_commit_callback(context: DagContext, **kwargs): logger.info(f"deleted temp branch {temp_branch_name}") logger.info(f"deleting local dir {local_path}") files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] - clean_up(context, **kwargs) + + # clean_up(context, **kwargs) def clean_up(context: DagContext, **kwargs): input_dir = str(generate_dir_name_from_task_instance(context['ti'], From 6303ac00ddc0c51526eb55c54bad284457a1f067 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 6 Feb 2024 09:01:41 -0500 Subject: [PATCH 63/71] fix topmed file read --- dags/roger/pipelines/topmed.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index fa218359..21d42c18 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -2,7 +2,7 @@ from roger.pipelines import DugPipeline from roger.core import storage - +from roger.logger import logger class TopmedPipeline(DugPipeline): "Pipeline for Topmed data set" pipeline_name = "topmed" @@ -11,6 +11,7 @@ class TopmedPipeline(DugPipeline): def get_objects(self, input_data_path=None): if not input_data_path: input_data_path = str(storage.dug_input_files_path('topmed')) - topmed_file_pattern = storage.os.path.join(input_data_path, - "topmed_*.csv") - return sorted(storage.glob.glob(topmed_file_pattern)) + files =storage.get_files_recursive( + lambda file_name: file_name.endswith('topmed_*.csv'), + input_data_path) + return sorted(files) From a00c13c6e075bb0d41a0794543b677af5e247363 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 6 Feb 2024 09:18:11 -0500 Subject: [PATCH 64/71] fix topmed file read --- dags/roger/pipelines/topmed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index 21d42c18..d7490abc 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -12,6 +12,6 @@ def get_objects(self, input_data_path=None): if not input_data_path: input_data_path = str(storage.dug_input_files_path('topmed')) files =storage.get_files_recursive( - lambda file_name: file_name.endswith('topmed_*.csv'), + lambda file_name: file_name.endswith('.csv'), input_data_path) return sorted(files) From 8710326291aeb3257a4fe16d013f5c10ec08c093 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Tue, 6 Feb 2024 09:22:36 -0500 Subject: [PATCH 65/71] return file names as strings --- dags/roger/pipelines/topmed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index d7490abc..14b8b197 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -14,4 +14,4 @@ def get_objects(self, input_data_path=None): files =storage.get_files_recursive( lambda file_name: file_name.endswith('.csv'), input_data_path) - return sorted(files) + return sorted([str(x) for x in files]) From 56437ab5b99ce915dd5d7c2ebb1358c8ed78b226 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 09:23:43 -0500 Subject: [PATCH 66/71] topmed kgx builder custom --- dags/roger/pipelines/topmed.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index 14b8b197..e1804274 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -1,6 +1,8 @@ "Pipeline for Topmed data" from roger.pipelines import DugPipeline +from roger.pipelines.base import log, os +import jsonpickle from roger.core import storage from roger.logger import logger class TopmedPipeline(DugPipeline): @@ -15,3 +17,24 @@ def get_objects(self, input_data_path=None): lambda file_name: file_name.endswith('.csv'), input_data_path) return sorted([str(x) for x in files]) + + def make_kg_tagged(self, to_string=False, elements_files=None, + input_data_path=None, output_data_path=None): + "Create tagged knowledge graphs from elements" + if not output_data_path: + output_data_path = storage.dug_kgx_path("") + storage.clear_dir(output_data_path) + if not elements_files: + elements_files = storage.dug_elements_objects(input_data_path, format='txt') + for file_ in elements_files: + elements = jsonpickle.decode(storage.read_object(file_)) + kg = self.make_tagged_kg(elements) + dug_base_file_name = file_.split(os.path.sep)[-2] + output_file_path = os.path.join(output_data_path, + dug_base_file_name + '_kgx.json') + storage.write_object(kg, output_file_path) + log.info("Wrote %d and %d edges, to %s", len(kg['nodes']), + len(kg['edges']), output_file_path) + output_log = self.log_stream.getvalue() if to_string else '' + return output_log + From c3c743ddaeb25bc88ac0aa059e61258ca411e972 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 09:26:18 -0500 Subject: [PATCH 67/71] topmed kgx builder custom --- dags/roger/pipelines/topmed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/roger/pipelines/topmed.py b/dags/roger/pipelines/topmed.py index e1804274..90b3e515 100644 --- a/dags/roger/pipelines/topmed.py +++ b/dags/roger/pipelines/topmed.py @@ -21,6 +21,7 @@ def get_objects(self, input_data_path=None): def make_kg_tagged(self, to_string=False, elements_files=None, input_data_path=None, output_data_path=None): "Create tagged knowledge graphs from elements" + log.info("Override base.make_kg_tagged called") if not output_data_path: output_data_path = storage.dug_kgx_path("") storage.clear_dir(output_data_path) From 404f7cc5b2c7dad46ccfcc831f7ba077b9f09ca2 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 10:04:27 -0500 Subject: [PATCH 68/71] add skip --- dags/roger/pipelines/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index df99e5d6..026713da 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -304,7 +304,9 @@ def convert_to_kgx_json(self, elements, written_nodes=None): nodes = graph['nodes'] for _, element in enumerate(elements): - # DugElement means a variable (Study variable...) + # DugElement means a variable (Study variable...) + if not isinstance(element, DugElement): + continue study_id = element.collection_id if study_id not in written_nodes: nodes.append({ From 6262a5cc54f0db81836c1bd8867f4c43bac9c157 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 10:53:24 -0500 Subject: [PATCH 69/71] get files pattern recursive --- dags/roger/pipelines/nida.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/roger/pipelines/nida.py b/dags/roger/pipelines/nida.py index eb425f27..b2e841bd 100644 --- a/dags/roger/pipelines/nida.py +++ b/dags/roger/pipelines/nida.py @@ -14,5 +14,5 @@ def get_objects(self, input_data_path=None): if not input_data_path: input_data_path = storage.dug_input_files_path( self.get_files_dir()) - nida_file_pattern = storage.os.path.join(input_data_path, 'NIDA-*.xml') - return sorted(storage.glob.glob(nida_file_pattern)) + files = sorted(storage.get_files_recursive(lambda x: 'NIDA-' in x , input_data_path)) + return files From f284a1ccc3e8ef95cf3f55c2c43369b853a7ec69 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 12:25:24 -0500 Subject: [PATCH 70/71] version pin avalon --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 064e1813..24931f6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,5 +10,5 @@ git+https://github.com/helxplatform/dug@ctn-parser orjson kg-utils==0.0.6 bmt==1.1.0 -git+https://github.com/helxplatform/avalon.git@pydantic-upgrade +git+https://github.com/helxplatform/avalon.git@v1.0.0 linkml-runtime==1.6.0 From d5bfc5e85a1119cd9f11a244cea3251e4cdba9a4 Mon Sep 17 00:00:00 2001 From: YaphetKG Date: Thu, 8 Feb 2024 12:59:31 -0500 Subject: [PATCH 71/71] pin dug --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 24931f6c..9693c5b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@ctn-parser +git+https://github.com/helxplatform/dug@2.13.1 orjson kg-utils==0.0.6 bmt==1.1.0