From 21e427105038a39f920f4fc9588a10b6c187b255 Mon Sep 17 00:00:00 2001 From: Nathaniel Braswell Date: Mon, 29 Jan 2024 16:14:25 -0500 Subject: [PATCH] annotator modules added by passing config val (#90) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * annotator modules added by passing config val * fix merge conflict * following same pattern as parsers , modify configs * fix to dug config method * fix old dug pipeline for backward compatiblity * correct default annotator type * reflective changes * typo extra quotes * annotator type not being picked up from config * remove annotate simple , log env value for lakefs enabled * testing lakefs off * add more logging * add more logging * post init for config to parse to boolean * put back task calls * revert some changes * adding new pipeline * lakefs io support for merge task * fix name * add io params for kg tasks * wire up i/o paths for merge * fix variable name * print files * few debug logs * few debug logs * treat path as path not str * few debug logs * some fixes * logging edge files * bug fix knowledge has edge * re-org graph structure * adding pathing for other tasks * pagenation logic fix for avalon * update lakefs client code * fix glob for get kgx files * fix up get merged objects * send down fake commit id for metadata * working on edges schema * bulk create nodes I/O * find schema file * bulk create edges I/O * bulk create edges I/O * bulk load io * no outputs for final tasks * add recursive glob * fix globbing * oops * delete dags * pin dug to latest release * cruft cleanup * re-org kgx config * add support for multiple initial repos * fix comma * create dir to download to * swap branch and repo * clean up dirs * fix up other pipeline 👌 --------- Co-authored-by: YaphetKG --- .gitignore | 3 +- dags/annotate.py | 102 ------------------------------- dags/annotate_simple.py | 29 --------- dags/dug_helpers/dug_utils.py | 10 +-- dags/index_dag.py | 28 --------- dags/knowledge_graph_build.py | 102 +++++++++++++++++++++++++++++++ dags/roger/config/__init__.py | 26 +++++--- dags/roger/config/config.yaml | 13 ++-- dags/roger/core/base.py | 34 ++++++----- dags/roger/core/bulkload.py | 62 +++++++------------ dags/roger/core/storage.py | 57 +++++++++-------- dags/roger/models/kgx.py | 58 ++++++++++-------- dags/roger/pipelines/base.py | 30 ++++++--- dags/roger/tasks.py | 111 ++++++++++++++++++++-------------- dags/tranql_translate.py | 43 ------------- requirements.txt | 2 +- 16 files changed, 333 insertions(+), 377 deletions(-) delete mode 100755 dags/annotate.py delete mode 100755 dags/annotate_simple.py delete mode 100755 dags/index_dag.py create mode 100644 dags/knowledge_graph_build.py delete mode 100755 dags/tranql_translate.py diff --git a/.gitignore b/.gitignore index 8943498d..6c46fe7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ # Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore .secret-env -Merge-helm/ -Merge-Dug-Architecture.md +.vscode/ # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/dags/annotate.py b/dags/annotate.py deleted file mode 100755 index 53e5b0a7..00000000 --- a/dags/annotate.py +++ /dev/null @@ -1,102 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from dug_helpers.dug_utils import ( - DugUtil, - get_topmed_files, - get_dbgap_files, - get_nida_files, - get_sparc_files, - get_anvil_files, - get_cancer_data_commons_files, - get_kids_first_files, - get_sprint_files, - get_bacpac_files, - get_heal_study_files, - get_heal_research_program_files - ) -from roger.tasks import default_args, create_python_task - -DAG_ID = 'annotate_dug' - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id=DAG_ID, - default_args=default_args, - schedule_interval=None -) as dag: - - """Build workflow tasks.""" - intro = EmptyOperator(task_id='Intro', dag=dag) - - # Unzip and get files, avoid this because - # 1. it takes a bit of time making the dag itself, webserver hangs - # 2. Every task in this dag would still need to execute this part making it redundant - # 3. tasks like intro would fail because they don't have the data dir mounted. - - make_kg_tagged = create_python_task(dag, "make_tagged_kgx", DugUtil.make_kg_tagged) - - dummy_stepover = EmptyOperator(task_id="continue") - - #intro >> run_printlog - envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed") - data_sets = envspec.split(",") - - clear_annotation_items = create_python_task(dag, "clear_annotation_files", DugUtil.clear_annotation_cached) - - for i, data_set in enumerate(data_sets): - annotate_files = None - if data_set.startswith("bdc"): - prepare_files = create_python_task(dag, "get_dbgap_data", get_dbgap_files) - annotate_files = create_python_task(dag, "annotate_db_gap_files", DugUtil.annotate_db_gap_files) - elif data_set.startswith("nida"): - prepare_files = create_python_task(dag, "get_nida_files", get_nida_files) - annotate_files = create_python_task(dag, "annotate_nida_files", DugUtil.annotate_nida_files) - elif data_set.startswith("sparc"): - prepare_files = create_python_task(dag, "get_sparc_files", get_sparc_files) - annotate_files = create_python_task(dag, "annotate_sparc_files", DugUtil.annotate_sparc_files) - elif data_set.startswith("topmed"): - prepare_files = create_python_task(dag, "get_topmed_data", get_topmed_files) - annotate_files = create_python_task(dag, "annotate_topmed_files", DugUtil.annotate_topmed_files) - elif data_set.startswith("anvil"): - prepare_files = create_python_task(dag, "get_anvil_data", get_anvil_files) - annotate_files = create_python_task(dag, "annotate_anvil_files", DugUtil.annotate_anvil_files) - elif data_set.startswith("crdc"): - prepare_files = create_python_task(dag, "get_cancer_commons_files", get_cancer_data_commons_files) - annotate_files = create_python_task(dag, "annotate_cancer_commons_files", - DugUtil.annotate_cancer_commons_files) - elif data_set.startswith("kfdrc"): - prepare_files = create_python_task(dag, "get_kids_first_files", get_kids_first_files) - annotate_files = create_python_task(dag, "annotate_kids_first_files", - DugUtil.annotate_kids_first_files) - elif data_set.startswith("sprint"): - prepare_files = create_python_task(dag, "get_sprint_files", get_sprint_files) - annotate_files = create_python_task(dag, "annotate_sprint_files", - DugUtil.annotate_sprint_files) - elif data_set.startswith("bacpac"): - prepare_files = create_python_task(dag, "get_bacpac_files", get_bacpac_files) - annotate_files = create_python_task(dag, "annotate_bacpac_files", - DugUtil.annotate_bacpac_files) - - elif data_set.startswith("heal-studies"): - prepare_files = create_python_task(dag, "get_heal_study_files", get_heal_study_files) - annotate_files = create_python_task(dag, "annotate_heal_study_files", - DugUtil.annotate_heal_study_files) - # elif data_set.startswith("heal-mds-imports"): - # prepare_files = create_python_task(dag, "get_heal_mds_imports", get_heal_study_files) - - elif data_set.startswith("heal-research-programs"): - prepare_files = create_python_task(dag, "get_heal_research_program_files", get_heal_research_program_files) - annotate_files = create_python_task(dag, "annotate_heal_research_program_files", - DugUtil.annotate_heal_research_program_files) - - intro >> prepare_files - prepare_files >> clear_annotation_items - - if annotate_files: - clear_annotation_items >> annotate_files - annotate_files >> dummy_stepover - - dummy_stepover >> make_kg_tagged diff --git a/dags/annotate_simple.py b/dags/annotate_simple.py deleted file mode 100755 index 6ec69d96..00000000 --- a/dags/annotate_simple.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from dug_helpers.dug_utils import ( - DugUtil, - get_bacpac_files - ) -from roger.tasks import default_args, create_python_task - -DAG_ID = 'annotate_simple' - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id=DAG_ID, - default_args=default_args, - schedule_interval=None -) as dag: - - """Build workflow tasks.""" - make_kg_tagged = create_python_task(dag, "make_tagged_kgx", - DugUtil.make_kg_tagged) - annotate_files = create_python_task(dag, "annotate_bacpac_files", - DugUtil.annotate_bacpac_files, - input_repo='bacpac', - input_branch='v1.0') - - annotate_files >> make_kg_tagged diff --git a/dags/dug_helpers/dug_utils.py b/dags/dug_helpers/dug_utils.py index a7795450..52db9624 100644 --- a/dags/dug_helpers/dug_utils.py +++ b/dags/dug_helpers/dug_utils.py @@ -11,8 +11,9 @@ from typing import Union, List import requests -from dug.core import get_parser, get_plugin_manager, DugConcept -from dug.core.annotate import DugAnnotator, ConceptExpander +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.annotators._base import Annotator +from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory from dug.core.parsers import Parser, DugElement @@ -44,7 +45,7 @@ def __init__(self, config: RogerConfig, to_string=True): self.string_handler = logging.StreamHandler(self.log_stream) log.addHandler(self.string_handler) - self.annotator: DugAnnotator = self.factory.build_annotator() + self.annotator_name: str = config.annotation.annotator_type self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() @@ -95,6 +96,7 @@ def annotate_files(self, parser_name, parsable_files, """ dug_plugin_manager = get_plugin_manager() parser: Parser = get_parser(dug_plugin_manager.hook, parser_name) + annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf()) if not output_data_path: output_data_path = storage.dug_annotation_path('') log.info("Parsing files") @@ -103,7 +105,7 @@ def annotate_files(self, parser_name, parsable_files, crawler = Crawler( crawl_file=parse_file, parser=parser, - annotator=self.annotator, + annotator=annotator, tranqlizer='', tranql_queries=[], http_session=self.cached_session diff --git a/dags/index_dag.py b/dags/index_dag.py deleted file mode 100755 index 6ddc4b02..00000000 --- a/dags/index_dag.py +++ /dev/null @@ -1,28 +0,0 @@ -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator - -from roger.tasks import get_executor_config, default_args, create_python_task -from dug_helpers.dug_utils import DugUtil - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='index_dug', - default_args=default_args, - schedule_interval=None -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - index_variables = create_python_task (dag, "IndexVariables", DugUtil.index_variables) - validate_index_variables = create_python_task(dag,"ValidateIndexVariables", DugUtil.validate_indexed_variables) - crawl_tags = create_python_task(dag, "CrawlConcepts", DugUtil.crawl_tranql) - index_concepts = create_python_task(dag, "IndexConcepts", DugUtil.index_concepts) - dummy_stepover = EmptyOperator(task_id="continue") - index_extracted_dug_elements = create_python_task(dag, "IndexExtractedElements", DugUtil.index_extracted_elements) - validate_index_concepts = create_python_task(dag, "ValidateIndexConcepts", DugUtil.validate_indexed_concepts) - finish = EmptyOperator(task_id='Finish') - """ Build the DAG. """ - intro >> index_variables >> validate_index_variables >> finish - intro >> crawl_tags >> index_concepts >> dummy_stepover - intro >> crawl_tags >> index_extracted_dug_elements >> dummy_stepover - dummy_stepover >> validate_index_concepts >> finish \ No newline at end of file diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py new file mode 100644 index 00000000..de28aa78 --- /dev/null +++ b/dags/knowledge_graph_build.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# + +""" +An Airflow workflow for the Roger Translator KGX data pipeline. +""" + +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +import roger +from roger.tasks import default_args, create_python_task +from roger.config import config + +""" Build the workflow's tasks and DAG. """ +with DAG( + dag_id='knowledge_graph_build', + default_args=default_args, + schedule_interval=None +) as dag: + + """ Build the workflow tasks. """ + intro = EmptyOperator(task_id='Intro') + + # Merge nodes needs inputs from two sources + # 1. baseline and/or CDE KGX files from LakeFS (External repo) + # 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo + + # build the annotate and index pipeline output locations + #lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/ + working_repo = config.lakefs_config.repo + branch = config.lakefs_config.branch + kgx_repos = config.kgx.data_sets + input_repos = [{ + 'name': repo.split(':')[0], + 'branch': repo.split(':')[1], + 'path': '*' + } for repo in kgx_repos] + + # Figure out a way to extract paths + get_path_on_lakefs = lambda d: f"annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/" + + + for dataset in config.dug_inputs.data_sets: + dataset_name = dataset.split(":")[0] + # add datasets from the other pipeline + input_repos.append( + { + 'name': working_repo, + 'branch': branch, + 'path': get_path_on_lakefs(dataset_name) + } + ) + + merge_nodes = create_python_task (dag, name="MergeNodes", + a_callable=roger.merge_nodes, + external_repos=input_repos + ) + + # The rest of these guys can just operate on the local lakefs repo/branch + # we need to add input dir and output dir similar to what we did for dug tasks + + create_nodes_schema = create_python_task(dag, + name="CreateNodesSchema", + a_callable=roger.create_nodes_schema + ) + create_edges_schema = create_python_task(dag, + name="CreateEdgesSchema", + a_callable=roger.create_edges_schema) + + create_bulk_load_nodes = create_python_task(dag, + name="CreateBulkLoadNodes", + a_callable=roger.create_bulk_nodes) + create_bulk_load_edges = create_python_task(dag, + name="CreateBulkLoadEdges", + a_callable=roger.create_bulk_edges) + bulk_load = create_python_task(dag, + name="BulkLoad", + a_callable=roger.bulk_load, + no_output_files=True) + check_tranql = create_python_task(dag, + name="CheckTranql", + a_callable=roger.check_tranql, + no_output_files=True) + validate = create_python_task(dag, + name="Validate", + a_callable=roger.validate, + no_output_files=True) + + + """ Build the DAG. """ + merge_nodes.set_upstream(intro) + create_nodes_schema.set_upstream(merge_nodes) + create_edges_schema.set_upstream(merge_nodes) + create_bulk_load_nodes.set_upstream(create_nodes_schema) + create_bulk_load_nodes.set_upstream(merge_nodes) + create_bulk_load_edges.set_upstream(create_edges_schema) + create_bulk_load_edges.set_upstream(merge_nodes) + bulk_load.set_upstream(create_bulk_load_nodes) + bulk_load.set_upstream(create_bulk_load_edges) + validate.set_upstream(bulk_load) + check_tranql.set_upstream(bulk_load) + diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 060012ec..ac9eb23a 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -35,6 +35,10 @@ class LakefsConfig(DictLike): repo: str enabled: bool = False + def __post_init__(self): + if isinstance(self.enabled, str): + self.enabled = self.enabled.lower() == "true" + @dataclass @@ -46,10 +50,8 @@ class LoggingConfig(DictLike): @dataclass class KgxConfig(DictLike): biolink_model_version: str = "1.5.0" - dataset_version: str = "v1.0" - merge_db_id: int = 1 merge_db_temp_dir: str = "workspace" - data_sets: List = field(default_factory=lambda: ['baseline-graph']) + data_sets: List = field(default_factory=lambda: ['baseline-graph:v5.0']) def __post_init__(self): # Convert strings to list. In cases where this is passed as env variable with a single value @@ -88,7 +90,18 @@ class BulkLoaderConfig(DictLike): @dataclass class AnnotationConfig(DictLike): - annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: str = "monarch" + annotator_args: dict = field( + default_factory=lambda: { + "monarch": { + "url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + }, + "sapbert": { + "classification_url": "https://med-nemo.apps.renci.org/annotate/", + "annotator_url": "https://babel-sapbert.apps.renci.org/annotate/", + }, + } + ) normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" synonym_service: str = "https://onto.renci.org/synonyms/" ontology_metadata: str = "https://api.monarchinitiative.org/api/bioentity/" @@ -195,9 +208,8 @@ def to_dug_conf(self) -> DugConfig: redis_port=self.redisgraph.port, nboost_host=self.elasticsearch.nboost_host, preprocessor=self.annotation.preprocessor, - annotator={ - 'url': self.annotation.annotator, - }, + annotator_type=self.annotation.annotator_type, + annotator_args=self.annotation.annotator_args, normalizer={ 'url': self.annotation.normalizer, }, diff --git a/dags/roger/config/config.yaml b/dags/roger/config/config.yaml index 5c27eb81..e9402ce4 100644 --- a/dags/roger/config/config.yaml +++ b/dags/roger/config/config.yaml @@ -16,11 +16,9 @@ annotation_base_data_uri: https://stars.renci.org/var/dug/ kgx: biolink_model_version: v3.1.2 - dataset_version: v5.0 - merge_db_id: 1 merge_db_temp_dir: workspace data_sets: - - baseline-graph + - baseline-graph:v5.0 dug_inputs: data_source: s3 @@ -44,10 +42,17 @@ bulk_loader: annotation: clear_http_cache: false - annotator: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + annotator_type: monarch + annotator_args: + monarch: + url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + sapbert: + classification_url: "https://med-nemo.apps.renci.org/annotate/" + annotator_url: "https://babel-sapbert.apps.renci.org/annotate/" normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup" ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/" + preprocessor: debreviator: BMI: "body mass index" diff --git a/dags/roger/core/base.py b/dags/roger/core/base.py index 6696f81d..0e84501f 100644 --- a/dags/roger/core/base.py +++ b/dags/roger/core/base.py @@ -73,62 +73,66 @@ def create_schema(to_string=False, config=None): output = (o1 + o2 ) if to_string else None return output -def create_edges_schema(to_string=False, config=None): +def create_edges_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create edges schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_edges_schema() + roger.kgx.create_edges_schema( + input_data_path=input_data_path, + output_data_path=output_data_path + ) output = roger.log_stream.getvalue() if to_string else None return output -def create_nodes_schema(to_string=False, config=None): +def create_nodes_schema(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create nodes schema on KGX object" output = None with Roger(to_string, config=config) as roger: - roger.kgx.create_nodes_schema() + roger.kgx.create_nodes_schema(input_data_path=input_data_path, + output_data_path=output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def merge_nodes(to_string=False, config=None): +def merge_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run KGX merge" output = None with Roger (to_string, config=config) as roger: - roger.kgx.merge() + roger.kgx.merge(input_path=input_data_path, output_path=output_data_path) output = roger.log_stream.getvalue () if to_string else None return output -def create_bulk_load(to_string=False, config=None): +def create_bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk load files" o1 = create_bulk_nodes(to_string=to_string, config=config) o2 = create_bulk_edges(to_string=to_string, config=config) output = (o1 + o2) if to_string else None return output -def create_bulk_nodes(to_string=False, config=None): +def create_bulk_nodes(to_string=False, config=None, input_data_path=None, output_data_path=None): "Generate bulk node CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_nodes_csv_file() + roger.bulk.create_nodes_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def create_bulk_edges(to_string=False, config=None): +def create_bulk_edges(to_string=False, config=None, input_data_path=None, output_data_path=None): "Create bulk edges CSV file" output = None with Roger(to_string, config=config) as roger: - roger.bulk.create_edges_csv_file() + roger.bulk.create_edges_csv_file(input_data_path, output_data_path) output = roger.log_stream.getvalue() if to_string else None return output -def bulk_load(to_string=False, config=None): +def bulk_load(to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk load insert process" output = None with Roger (to_string, config=config) as roger: - roger.bulk.insert() + roger.bulk.insert(input_data_path=input_data_path) output = roger.log_stream.getvalue () if to_string else None return output -def validate (to_string=False, config=None): +def validate (to_string=False, config=None, input_data_path=None, output_data_path=None): "Run bulk validate process" output = None with Roger (to_string, config=config) as roger: @@ -136,7 +140,7 @@ def validate (to_string=False, config=None): output = roger.log_stream.getvalue () if to_string else None return output -def check_tranql(to_string=False, config=None): +def check_tranql(to_string=False, config=None, input_data_path=None, output_data_path=None): "Tranql server smoke check" output = None with Roger(to_string, config=config) as roger: diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index 1eca92db..2113f1b7 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -33,31 +33,24 @@ def __init__(self, biolink, config=None): self.separator =(chr(separator) if isinstance(separator, int) else separator) - def tables_up_to_date (self): - return storage.is_up_to_date ( - source=[ - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json"), - storage.schema_path (f"{SchemaType.PREDICATE.value}-schema.json") - ] + storage.merged_objects (), - targets=glob.glob (storage.bulk_path ("nodes/**.csv")) + \ - glob.glob (storage.bulk_path ("edges/**.csv"))) - - def create_nodes_csv_file(self): - if self.tables_up_to_date (): - log.info ("up to date.") - return + def create (self): + """Used in the CLI on args.create_bulk""" + self.create_nodes_csv_file() + self.create_edges_csv_file() + + def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): # clear out previous data - bulk_path = storage.bulk_path("nodes") + bulk_path = storage.bulk_path("nodes", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - categories_schema = storage.read_schema (SchemaType.CATEGORY) + categories_schema = storage.read_schema (SchemaType.CATEGORY, input_data_path) state = defaultdict(lambda: None) log.info(f"processing nodes") """ Write node data for bulk load. """ categories = defaultdict(lambda: []) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merged_objects('nodes', input_data_path) counter = 1 for node in storage.json_line_iter(merged_nodes_file): if not node['category']: @@ -74,7 +67,7 @@ def create_nodes_csv_file(self): f"Showing first 10: {list(category_error_nodes)[:10]}.") # flush every 100K if counter % 100_000 == 0: - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) # reset variables. @@ -83,22 +76,19 @@ def create_nodes_csv_file(self): counter += 1 # write back if any thing left. if len(categories): - self.write_bulk(storage.bulk_path("nodes"), + self.write_bulk(storage.bulk_path("nodes", output_data_path), categories, categories_schema, state=state, is_relation=False) - def create_edges_csv_file(self): + def create_edges_csv_file(self, input_data_path=None, output_data_path=None): """ Write predicate data for bulk load. """ - if self.tables_up_to_date (): - log.info ("up to date.") - return # Clear out previous data - bulk_path = storage.bulk_path("edges") + bulk_path = storage.bulk_path("edges", output_data_path) if os.path.exists(bulk_path): shutil.rmtree(bulk_path) - predicates_schema = storage.read_schema(SchemaType.PREDICATE) + predicates_schema = storage.read_schema(SchemaType.PREDICATE, input_data_path) predicates = defaultdict(lambda: []) - edges_file = storage.merge_path('edges.jsonl') + edges_file = storage.merged_objects('edges', input_data_path) counter = 1 state = {} for edge in storage.json_line_iter(edges_file): @@ -106,14 +96,14 @@ def create_edges_csv_file(self): # write out every 100K , to avoid large predicate dict. if counter % 100_000 == 0: self.write_bulk( - storage.bulk_path("edges"),predicates, predicates_schema, + storage.bulk_path("edges", output_data_path),predicates, predicates_schema, state=state, is_relation=True) predicates = defaultdict(lambda : []) counter += 1 # if there are some items left (if loop ended before counter reached the # specified value) if len(predicates): - self.write_bulk(storage.bulk_path("edges"), predicates, + self.write_bulk(storage.bulk_path("edges", output_data_path), predicates, predicates_schema,state=state, is_relation=True) @staticmethod @@ -316,17 +306,10 @@ def write_bulk(self, bulk_path, obj_map, schema, state={}, state['processed_id'] = processed_objects_id state['called_times'] = called_x_times - def insert (self): - - redisgraph = { - 'host': os.getenv('REDIS_HOST'), - 'port': os.getenv('REDIS_PORT', '6379'), - 'password': os.getenv('REDIS_PASSWORD'), - 'graph': os.getenv('REDIS_GRAPH'), - } + def insert (self, input_data_path=None): redisgraph = self.config.redisgraph - nodes = sorted(glob.glob (storage.bulk_path ("nodes/**.csv*"))) - edges = sorted(glob.glob (storage.bulk_path ("edges/**.csv*"))) + nodes = sorted(glob.glob (storage.bulk_path ("**/nodes/**.csv*", input_data_path), recursive=True)) + edges = sorted(glob.glob (storage.bulk_path ("**/edges/**.csv*", input_data_path), recursive=True)) graph = redisgraph['graph'] log.info(f"bulk loading \n nodes: {nodes} \n edges: {edges}") @@ -340,7 +323,7 @@ def insert (self): log.info ("bulk loading graph: %s", str(graph)) args = [] if len(nodes) > 0: - bulk_path_root = storage.bulk_path('nodes') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/nodes', path=input_data_path), recursive=True)[0] + os.path.sep nodes_with_type = [] for x in nodes: """ @@ -353,13 +336,12 @@ def insert (self): nodes_with_type.append(f"{all_labels} {x}") args.extend(("-N " + " -N ".join(nodes_with_type)).split()) if len(edges) > 0: - bulk_path_root = storage.bulk_path('edges') + os.path.sep + bulk_path_root = glob.glob(storage.bulk_path('**/edges', path=input_data_path), recursive=True)[0] + os.path.sep edges_with_type = [f"biolink.{x.replace(bulk_path_root, '').strip(os.path.sep).split('.')[0].split('~')[1]} {x}" for x in edges] # Edge label now no longer has 'biolink:' args.extend(("-R " + " -R ".join(edges_with_type)).split()) args.extend([f"--separator={self.separator}"]) - log.debug(f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}") args.extend([f"--redis-url=redis://:{redisgraph['password']}@{redisgraph['host']}:{redisgraph['port']}"]) args.extend(['--enforce-schema']) args.extend([f"{redisgraph['graph']}"]) diff --git a/dags/roger/core/storage.py b/dags/roger/core/storage.py index f5f152dd..0f79aad4 100644 --- a/dags/roger/core/storage.py +++ b/dags/roger/core/storage.py @@ -152,30 +152,47 @@ def kgx_path(name): :path name: Name of the KGX object. """ return str(ROGER_DATA_DIR / "kgx" / name) -def kgx_objects(format_="json"): +def kgx_objects(format_="json", path=None): """ A list of KGX objects. """ kgx_pattern = kgx_path(f"**.{format_}") - return sorted(glob.glob (kgx_pattern)) + if path: + kgx_pattern = f"{path}/**/*.{format_}" + return sorted(glob.glob (kgx_pattern, recursive=True)) -def merge_path(name): +def merge_path(name, path: Path=None): """ Form a merged KGX object path. :path name: Name of the merged KGX object. """ - return str(ROGER_DATA_DIR / 'merge' / name) + if path is None: + return str(ROGER_DATA_DIR / 'merge' / name) + return str(path.joinpath(name)) -def merged_objects(): +def merged_objects(file_type, path=None): """ A list of merged KGX objects. """ - merged_pattern = merge_path("**.json") - return sorted(glob.glob (merged_pattern)) + if not path: + merged_pattern = merge_path(f"**/{file_type}.jsonl") + else: + merged_pattern = merge_path(f"**/{file_type}.jsonl", path=path) + # this thing should always return one edges or nodes file (based on file_type) + try: + return sorted(glob.glob(merged_pattern, recursive=True))[0] + except IndexError: + raise ValueError(f"Could not find merged KGX of type {file_type} in {merged_pattern}") + -def schema_path(name): +def schema_path(name, path=None): """ Path to a schema object. :param name: Name of the object to get a path for. """ - return str(ROGER_DATA_DIR / 'schema' / name) + if not path: + return str(ROGER_DATA_DIR / 'schema' / name) + return glob.glob(str (path / '**' / 'schema' / name), recursive=True)[0] -def bulk_path(name): +def bulk_path(name, path=None): """ Path to a bulk load object. :param name: Name of the object. """ - return str(ROGER_DATA_DIR / 'bulk' / name) + if not path: + return str(ROGER_DATA_DIR / 'bulk' / name) + else: + return str(path / name) def metrics_path(name): """ @@ -388,11 +405,11 @@ def dug_dd_xml_objects(input_data_path=None): def copy_file_to_dir(file_location, dir_name): return shutil.copy(file_location, dir_name) -def read_schema (schema_type: SchemaType): +def read_schema (schema_type: SchemaType, path=None): """ Read a schema object. :param schema_type: Schema type of the object to read. """ - path = schema_path (f"{schema_type.value}-schema.json") - return read_object (path) + location = schema_path (f"{schema_type.value}-schema.json", path=path) + return read_object (location) def get_uri (path, key): """ Build a URI. @@ -412,17 +429,7 @@ def read_relative_object (path): def trunc(text, limit): return ('..' + text[-limit-2:]) if len(text) > limit else text -def is_up_to_date (source, targets): - target_time_list = [ - os.stat (f).st_mtime for f in targets if os.path.exists(f)] - if len(target_time_list) == 0: - log.debug (f"no targets found") - return False - source = [ os.stat (f).st_mtime for f in source if os.path.exists (f) ] - if len(source) == 0: - log.debug ("no source found. up to date") - return True - return max(source) < min(target_time_list) + def json_line_iter(jsonl_file_path): f = open(file=jsonl_file_path, mode='r', encoding='utf-8') diff --git a/dags/roger/models/kgx.py b/dags/roger/models/kgx.py index 62a57a61..bc301262 100644 --- a/dags/roger/models/kgx.py +++ b/dags/roger/models/kgx.py @@ -8,9 +8,8 @@ from collections import defaultdict from xxhash import xxh64_hexdigest import orjson as json -import redis import ntpath -from kg_utils.merging import GraphMerger, MemoryGraphMerger, DiskGraphMerger +from kg_utils.merging import DiskGraphMerger from kg_utils.constants import * from roger.config import get_default_config @@ -43,18 +42,11 @@ def __init__(self, biolink=None, config=None): self.merger = DiskGraphMerger(temp_directory=self.temp_directory, chunk_size=5_000_000) self.biolink_version = self.config.kgx.biolink_model_version - self.merge_db_id = self.config.kgx.merge_db_id - self.merge_db_name = f'db{self.merge_db_id}' log.debug(f"Trying to get biolink version : {self.biolink_version}") if biolink is None: self.biolink = BiolinkModel(self.biolink_version) else: self.biolink = biolink - self.redis_conn = redis.Redis( - host=self.config.redisgraph.host, - port=self.config.redisgraph.port, - password=self.config.redisgraph.password, - db=self.merge_db_id) self.enable_metrics = self.config.get('enable_metrics', False) def get_kgx_json_format(self, files: list, dataset_version: str): @@ -301,7 +293,7 @@ def fetch_dug_kgx(self): log.info("Done copying dug KGX files.") return all_kgx_files - def create_nodes_schema(self): + def create_nodes_schema(self, input_data_path=None, output_data_path=None): """ Extracts schema for nodes based on biolink leaf types :return: @@ -309,7 +301,7 @@ def create_nodes_schema(self): category_schemas = defaultdict(lambda: None) category_error_nodes = set() - merged_nodes_file = storage.merge_path("nodes.jsonl") + merged_nodes_file = storage.merged_objects("nodes", input_data_path) log.info(f"Processing : {merged_nodes_file}") counter = 0 for node in storage.json_line_iter(merged_nodes_file): @@ -356,15 +348,15 @@ def create_nodes_schema(self): f"These will be treated as {BiolinkModel.root_type}.") # Write node schemas. - self.write_schema(category_schemas, SchemaType.CATEGORY) + self.write_schema(category_schemas, SchemaType.CATEGORY, output_path=output_data_path) - def create_edges_schema(self): + def create_edges_schema(self, input_data_path=None, output_data_path=None): """ Create unified schema for all edges in an edges jsonl file. :return: """ predicate_schemas = defaultdict(lambda: None) - merged_edges_file = storage.merge_path("edges.jsonl") + merged_edges_file = storage.merged_objects("edges", input_data_path) """ Infer predicate schemas. """ for edge in storage.json_line_iter(merged_edges_file): predicate = edge['predicate'] @@ -378,7 +370,7 @@ def create_edges_schema(self): previous_type = predicate_schemas[predicate][k] predicate_schemas[predicate][k] = compare_types( previous_type, current_type) - self.write_schema(predicate_schemas, SchemaType.PREDICATE) + self.write_schema(predicate_schemas, SchemaType.PREDICATE, output_path=output_data_path) def create_schema (self): """Determine the schema of each type of object. @@ -403,31 +395,40 @@ def schema_up_to_date (self): f"{SchemaType.PREDICATE.value}-schema.json") ]) - def write_schema(self, schema, schema_type: SchemaType): + def write_schema(self, schema, schema_type: SchemaType ,output_path=None): """ Output the schema file. :param schema: Schema to get keys from. :param schema_type: Type of schema to write. """ - file_name = storage.schema_path (f"{schema_type.value}-schema.json") + file_name = storage.schema_path (f"{schema_type.value}-schema.json", output_path) log.info("writing schema: %s", file_name) dictionary = { k : v for k, v in schema.items () } storage.write_object (dictionary, file_name) - def merge(self): + def merge(self, input_path=None, output_path=None): """ This version uses the disk merging from the kg_utils module """ - data_set_version = self.config.get('kgx', {}).get('dataset_version') + metrics = {} start = time.time() - json_format_files = storage.kgx_objects("json") - jsonl_format_files = storage.kgx_objects("jsonl") + + log.info(f"Input path = {input_path}, Output path = {output_path}") + + if input_path: + json_format_files = storage.kgx_objects("json", input_path) + jsonl_format_files = storage.kgx_objects("jsonl", input_path) + else: + json_format_files = storage.kgx_objects("json") + jsonl_format_files = storage.kgx_objects("jsonl") # Create lists of the nodes and edges files in both json and jsonl # formats jsonl_node_files = {file for file in jsonl_format_files - if "node" in file} + if "node" in file.split('/')[-1]} jsonl_edge_files = {file for file in jsonl_format_files - if "edge" in file} + if "edge" in file.split('/')[-1]} + log.info(f"Jsonl edge files : {jsonl_edge_files}") + log.info(f"Jsonl node files : {jsonl_node_files}") # Create all the needed iterators and sets thereof jsonl_node_iterators = [storage.jsonl_iter(file_name) @@ -449,13 +450,19 @@ def merge(self): self.merger.merge_nodes(node_iterators) merged_nodes = self.merger.get_merged_nodes_jsonl() + self.merger.merge_edges(edge_iterators) merged_edges = self.merger.get_merged_edges_jsonl() write_merge_metric = {} t = time.time() start_nodes_jsonl = time.time() - nodes_file_path = storage.merge_path("nodes.jsonl") + + # create output dir + if not os.path.exists(output_path): + os.makedirs(output_path) + + nodes_file_path = storage.merge_path("nodes.jsonl", output_path) # stream out nodes to nodes.jsonl file with open(nodes_file_path, 'w') as stream: @@ -468,7 +475,7 @@ def merge(self): start_edge_jsonl = time.time() # stream out edges to edges.jsonl file - edges_file_path = storage.merge_path("edges.jsonl") + edges_file_path = storage.merge_path("edges.jsonl", output_path) with open(edges_file_path, 'w') as stream: for edges in merged_edges: edges = json.loads(edges) @@ -489,3 +496,4 @@ def merge(self): if self.enable_metrics: metricsfile_path = storage.metrics_path('merge_metrics.yaml') storage.write_object(metrics, metricsfile_path) + diff --git a/dags/roger/pipelines/base.py b/dags/roger/pipelines/base.py index 988db2d0..df99e5d6 100644 --- a/dags/roger/pipelines/base.py +++ b/dags/roger/pipelines/base.py @@ -15,12 +15,12 @@ import requests -from dug.core import get_parser, get_plugin_manager, DugConcept -from dug.core.annotate import DugAnnotator, ConceptExpander +from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept +from dug.core.concept_expander import ConceptExpander from dug.core.crawler import Crawler from dug.core.factory import DugFactory -from dug.core.parsers import Parser, DugElement, DugConcept -from dug.core.annotate import Identifier +from dug.core.parsers import Parser, DugElement +from dug.core.annotators import Annotator from dug.core.async_search import Search from dug.core.index import Index @@ -129,8 +129,6 @@ def __init__(self, config: RogerConfig, to_string=False): log.addHandler(self.string_handler) self.s3_utils = S3Utils(self.config.s3_config) - self.annotator: DugAnnotator = self.factory.build_annotator() - self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer() graph_name = self.config["redisgraph"]["graph"] @@ -195,11 +193,28 @@ def get_parser_name(self): """ return getattr(self, 'parser_name', self.pipeline_name) + def get_annotator_name(self): + """ + Access method for annotator_name + Defaults to annotator_monarch unless specified using annotation.annotator_type in the configuration file. + """ + return self.config.annotation.annotator_type + + def get_parser(self): dug_plugin_manager = get_plugin_manager() parser: Parser = get_parser(dug_plugin_manager.hook, self.get_parser_name()) return parser + + def get_annotator(self): + dug_plugin_manager = get_plugin_manager() + annotator: Annotator = get_annotator( + dug_plugin_manager.hook, + self.get_annotator_name(), + self.config.to_dug_conf() + ) + return annotator def annotate_files(self, parsable_files, output_data_path=None): """ @@ -215,10 +230,11 @@ def annotate_files(self, parsable_files, output_data_path=None): log.debug("Creating Dug Crawler object on parse_file %s at %d of %d", parse_file, _ , len(parsable_files)) parser = self.get_parser() + annotator = self.get_annotator() crawler = Crawler( crawl_file=parse_file, parser=parser, - annotator=self.annotator, + annotator=annotator, tranqlizer='', tranql_queries=[], http_session=self.cached_session diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 561608d1..852cf4b7 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -18,7 +18,7 @@ from roger.logger import get_logger from roger.pipelines.base import DugPipeline from avalon.mainoperations import put_files, LakeFsWrapper, get_files -import lakefs_client +from lakefs_sdk.configuration import Configuration from functools import partial logger = get_logger() @@ -48,7 +48,7 @@ def task_wrapper(python_callable, **kwargs): output_data_path = generate_dir_name_from_task_instance(kwargs['ti'], roger_config=config, suffix='output') - else: + else: input_data_path, output_data_path = None, None # cast it to a path object func_args = { @@ -102,7 +102,7 @@ def get_executor_config(data_path='/opt/airflow/share/data'): return k8s_executor_config def init_lakefs_client(config: RogerConfig) -> LakeFsWrapper: - configuration = lakefs_client.Configuration() + configuration = Configuration() configuration.username = config.lakefs_config.access_key_id configuration.password = config.lakefs_config.secret_access_key configuration.host = config.lakefs_config.host @@ -151,7 +151,7 @@ def avalon_commit_callback(context: DagContext, **kwargs): # right now lets stick to using one repo , # issue Vladmir pointed out if uploads to a single lakefs branch have not - # been finilized with commit, + # been finalized with commit, # this would cause dirty commits if parallel tasks target the same branch. # solution: Lakefs team suggested we commit to a different temp branch per @@ -173,7 +173,9 @@ def avalon_commit_callback(context: DagContext, **kwargs): s3storage=False, lake_fs_client=client, branch=temp_branch_name, - repo=repo + repo=repo, + # @TODO figure out how to pass real commit id here + commit_id=branch ) # see what changes are going to be pushed from this branch to main branch @@ -201,11 +203,20 @@ def avalon_commit_callback(context: DagContext, **kwargs): logger.info(f"deleted temp branch {temp_branch_name}") logger.info(f"deleting local dir {local_path}") files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] + clean_up(context, **kwargs) + +def clean_up(context: DagContext, **kwargs): + input_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' + output_dir = str(generate_dir_name_from_task_instance(context['ti'], + roger_config=config, + suffix='input')).rstrip('/') + '/' + files_to_clean = glob.glob(input_dir + '**', recursive=True) + [input_dir] + files_to_clean += glob.glob(output_dir + '**', recursive=True) + [output_dir] for f in files_to_clean: - shutil.rmtree(f) - - - + if os.path.exists(f): + shutil.rmtree(f) def generate_dir_name_from_task_instance(task_instance: TaskInstance, roger_config: RogerConfig, suffix:str): @@ -222,12 +233,12 @@ def generate_dir_name_from_task_instance(task_instance: TaskInstance, f"{root_data_dir}/{dag_id}_{task_id}_{run_id}_{try_number}_{suffix}") def setup_input_data(context, exec_conf): - print(""" + logger.info(""" - Figures out the task name and id, - find its data dependencies - clean up and create in and out dir - put dependency data in input dir - - if for some reason data was not found raise an execption + - if for some reason data was not found raise an exception """) # Serves as a location where files the task will work on are placed. # computed as ROGER_DATA_DIR + /current task instance name_input_dir @@ -241,38 +252,48 @@ def setup_input_data(context, exec_conf): # Download files from lakefs and store them in this new input_path client = init_lakefs_client(config=config) - input_repo = exec_conf['input_repo'] - input_branch = exec_conf['input_branch'] - # If input repo is provided use that as source of files - if exec_conf.get('path') and exec_conf.get('path') == '*': - remote_paths = ['*'] # root path to get all sub dirs - # else figure out what to pull from the repo based on task name etc... - else: + repos = exec_conf['repos'] + # if no external repo is provided we assume to get the upstream task dataset. + if not repos or len(repos) == 0: + # merge destination branch + branch = config.lakefs_config.branch + repo = config.lakefs_config.repo task_instance: TaskInstance = context['ti'] # get upstream ids upstream_ids = task_instance.task.upstream_task_ids dag_id = task_instance.dag_id # calculate remote dirs using dag_id + upstreams - - remote_paths = [f'{dag_id}/{upstream_id}' - for upstream_id in upstream_ids] - for remote_path in remote_paths: + repos = [{ + 'repo': repo, + 'branch': branch, + 'path': f'{dag_id}/{upstream_id}' + } for upstream_id in upstream_ids] + + # input_repo = exec_conf['input_repo'] + # input_branch = exec_conf['input_branch'] + # If input repo is provided use that as source of files + for repo in repos: + if not repo.get('path'): + # get all if path is not specified + repo['path'] = '*' + logger.info(f"repos : {repos}") + for r in repos: logger.info("downloading %s from %s@%s to %s", - remote_path, input_repo, input_branch, input_dir) + r['path'], r['repo'], r['branch'], input_dir) + # create path to download to ... + if not os.path.exists(input_dir + f'/{r["repo"]}'): + os.mkdir(input_dir + f'/{r["repo"]}') get_files( - local_path=input_dir, - remote_path=remote_path, - branch=input_branch, - task_name=context['task'].task_id, - pipeline_id=context['dag'].dag_id, - repo=input_repo, + local_path=input_dir + f'/{r["repo"]}', + remote_path=r['path'], + branch=r['branch'], + repo=r['repo'], changes_only=False, - metafilename=None, lake_fs_client=client ) -def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, - input_branch=None, pass_conf=True, no_output_files=False): + +def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = {}, pass_conf=True, no_output_files=False): """ Create a python task. :param func_kwargs: additional arguments for callable. :param dag: dag to add task to. @@ -305,25 +326,23 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, input_repo=None, # repo and branch for pre-execution , to download input objects pre_exec_conf = { - 'input_repo': config.lakefs_config.repo, - 'input_branch': config.lakefs_config.branch + 'repos': [] } - - if input_repo and input_branch: - # if the task is a root task , begining of the dag... + if external_repos: + # if the task is a root task , beginning of the dag... # and we want to pull data from a different repo. pre_exec_conf = { - 'input_repo': input_repo, - 'input_branch': input_branch, - # if path is not defined , we can use the context (dag context) to - # resolve the previous task dir in lakefs. - 'path': '*' + 'repos': [{ + 'repo': r['name'], + 'branch': r['branch'], + 'path': r.get('path', '*') + } for r in external_repos] } pre_exec = partial(setup_input_data, exec_conf=pre_exec_conf) # add pre_exec partial function as an argument to python executor conf python_operator_args['pre_execute'] = pre_exec - + python_operator_args['on_failure_callback'] = partial(clean_up, kwargs=op_kwargs) # if the task has output files, we will add a commit callback if not no_output_files: python_operator_args['on_success_callback'] = partial(avalon_commit_callback, kwargs=op_kwargs) @@ -352,8 +371,10 @@ def create_pipeline_taskgroup( dag, f"annotate_{name}_files", pipeline.annotate, - input_repo=getattr(pipeline_class, 'pipeline_name'), - input_branch=input_dataset_version, + external_repos=[{ + 'name': getattr(pipeline_class, 'pipeline_name'), + 'branch': input_dataset_version + }], pass_conf=False) index_variables_task = create_python_task( diff --git a/dags/tranql_translate.py b/dags/tranql_translate.py deleted file mode 100755 index 53394ba2..00000000 --- a/dags/tranql_translate.py +++ /dev/null @@ -1,43 +0,0 @@ -# -*- coding: utf-8 -*- -# - -""" -An Airflow workflow for the Roger Translator KGX data pipeline. -""" - -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -import roger -from roger.tasks import get_executor_config, default_args, create_python_task - -""" Build the workflow's tasks and DAG. """ -with DAG( - dag_id='tranql_translate', - default_args=default_args, - schedule_interval=None, - concurrency=16, -) as dag: - - """ Build the workflow tasks. """ - intro = EmptyOperator(task_id='Intro') - get_kgx = create_python_task (dag, "GetSource", roger.get_kgx) - create_nodes_schema = create_python_task(dag, "CreateNodesSchema", - roger.create_nodes_schema) - create_edges_schema = create_python_task(dag, "CreateEdgesSchema", - roger.create_edges_schema) - continue_task_bulk_load = EmptyOperator(task_id="continueBulkCreate") - continue_task_validate = EmptyOperator(task_id="continueValidation") - merge_nodes = create_python_task (dag, "MergeNodes", roger.merge_nodes) - create_bulk_load_nodes = create_python_task(dag, "CreateBulkLoadNodes", - roger.create_bulk_nodes) - create_bulk_load_edges = create_python_task(dag, "CreateBulkLoadEdges", - roger.create_bulk_edges) - bulk_load = create_python_task(dag, "BulkLoad", roger.bulk_load) - check_tranql = create_python_task(dag, "CheckTranql", - roger.check_tranql) - validate = create_python_task(dag, "Validate", roger.validate) - finish = EmptyOperator(task_id='Finish') - - """ Build the DAG. """ - intro >> get_kgx >> merge_nodes >> [create_nodes_schema, create_edges_schema ] >> continue_task_bulk_load >> \ - [create_bulk_load_nodes, create_bulk_load_edges] >> bulk_load >> continue_task_validate >>[validate, check_tranql ] >> finish diff --git a/requirements.txt b/requirements.txt index cbdcfefd..8f211b51 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ jsonpickle redisgraph-bulk-loader==0.12.3 pytest PyYAML -git+https://github.com/helxplatform/dug@patch-nrslv-resp +git+https://github.com/helxplatform/dug@2.13.0 orjson kg-utils==0.0.6 bmt==1.1.0