Skip to content

Commit

Permalink
annotator modules added by passing config val (#90)
Browse files Browse the repository at this point in the history
* annotator modules added by passing config val

* fix merge conflict

* following same pattern as parsers , modify configs

* fix to dug config method

* fix old dug pipeline for backward compatiblity

* correct default annotator type

* reflective changes

* typo extra quotes

* annotator type not being picked up from config

* remove annotate simple , log env value for lakefs enabled

* testing lakefs off

* add more logging

* add more logging

* post init for config to parse to boolean

* put back task calls

* revert some changes

* adding new pipeline

* lakefs io support for merge task

* fix name

* add io params for kg tasks

* wire up i/o paths for merge

* fix variable name

* print files

* few debug logs

* few debug logs

* treat path as path not str

* few debug logs

* some fixes

* logging edge files

* bug fix knowledge has edge

* re-org graph structure

* adding pathing for other tasks

* pagenation logic fix for avalon

* update lakefs client code

* fix glob for get kgx files

* fix up get merged objects

* send down fake commit id for metadata

* working on edges schema

* bulk create nodes I/O

* find schema file

* bulk create edges  I/O

* bulk create edges  I/O

* bulk load io

* no outputs for final tasks

* add recursive glob

* fix globbing

* oops

* delete dags

* pin dug to latest release

* cruft cleanup

* re-org kgx config

* add support for multiple initial repos

* fix comma

* create dir to download to

* swap branch and repo

* clean up dirs

* fix up other pipeline 👌

---------

Co-authored-by: YaphetKG <[email protected]>
  • Loading branch information
braswent and YaphetKG authored Jan 29, 2024
1 parent f79350a commit 21e4271
Show file tree
Hide file tree
Showing 16 changed files with 333 additions and 377 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore
.secret-env
Merge-helm/
Merge-Dug-Architecture.md
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
102 changes: 0 additions & 102 deletions dags/annotate.py

This file was deleted.

29 changes: 0 additions & 29 deletions dags/annotate_simple.py

This file was deleted.

10 changes: 6 additions & 4 deletions dags/dug_helpers/dug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from typing import Union, List

import requests
from dug.core import get_parser, get_plugin_manager, DugConcept
from dug.core.annotate import DugAnnotator, ConceptExpander
from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept
from dug.core.annotators._base import Annotator
from dug.core.concept_expander import ConceptExpander
from dug.core.crawler import Crawler
from dug.core.factory import DugFactory
from dug.core.parsers import Parser, DugElement
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self, config: RogerConfig, to_string=True):
self.string_handler = logging.StreamHandler(self.log_stream)
log.addHandler(self.string_handler)

self.annotator: DugAnnotator = self.factory.build_annotator()
self.annotator_name: str = config.annotation.annotator_type

self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer()

Expand Down Expand Up @@ -95,6 +96,7 @@ def annotate_files(self, parser_name, parsable_files,
"""
dug_plugin_manager = get_plugin_manager()
parser: Parser = get_parser(dug_plugin_manager.hook, parser_name)
annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf())
if not output_data_path:
output_data_path = storage.dug_annotation_path('')
log.info("Parsing files")
Expand All @@ -103,7 +105,7 @@ def annotate_files(self, parser_name, parsable_files,
crawler = Crawler(
crawl_file=parse_file,
parser=parser,
annotator=self.annotator,
annotator=annotator,
tranqlizer='',
tranql_queries=[],
http_session=self.cached_session
Expand Down
28 changes: 0 additions & 28 deletions dags/index_dag.py

This file was deleted.

102 changes: 102 additions & 0 deletions dags/knowledge_graph_build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
#

"""
An Airflow workflow for the Roger Translator KGX data pipeline.
"""

from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
import roger
from roger.tasks import default_args, create_python_task
from roger.config import config

""" Build the workflow's tasks and DAG. """
with DAG(
dag_id='knowledge_graph_build',
default_args=default_args,
schedule_interval=None
) as dag:

""" Build the workflow tasks. """
intro = EmptyOperator(task_id='Intro')

# Merge nodes needs inputs from two sources
# 1. baseline and/or CDE KGX files from LakeFS (External repo)
# 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo

# build the annotate and index pipeline output locations
#lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/
working_repo = config.lakefs_config.repo
branch = config.lakefs_config.branch
kgx_repos = config.kgx.data_sets
input_repos = [{
'name': repo.split(':')[0],
'branch': repo.split(':')[1],
'path': '*'
} for repo in kgx_repos]

# Figure out a way to extract paths
get_path_on_lakefs = lambda d: f"annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/"


for dataset in config.dug_inputs.data_sets:
dataset_name = dataset.split(":")[0]
# add datasets from the other pipeline
input_repos.append(
{
'name': working_repo,
'branch': branch,
'path': get_path_on_lakefs(dataset_name)
}
)

merge_nodes = create_python_task (dag, name="MergeNodes",
a_callable=roger.merge_nodes,
external_repos=input_repos
)

# The rest of these guys can just operate on the local lakefs repo/branch
# we need to add input dir and output dir similar to what we did for dug tasks

create_nodes_schema = create_python_task(dag,
name="CreateNodesSchema",
a_callable=roger.create_nodes_schema
)
create_edges_schema = create_python_task(dag,
name="CreateEdgesSchema",
a_callable=roger.create_edges_schema)

create_bulk_load_nodes = create_python_task(dag,
name="CreateBulkLoadNodes",
a_callable=roger.create_bulk_nodes)
create_bulk_load_edges = create_python_task(dag,
name="CreateBulkLoadEdges",
a_callable=roger.create_bulk_edges)
bulk_load = create_python_task(dag,
name="BulkLoad",
a_callable=roger.bulk_load,
no_output_files=True)
check_tranql = create_python_task(dag,
name="CheckTranql",
a_callable=roger.check_tranql,
no_output_files=True)
validate = create_python_task(dag,
name="Validate",
a_callable=roger.validate,
no_output_files=True)


""" Build the DAG. """
merge_nodes.set_upstream(intro)
create_nodes_schema.set_upstream(merge_nodes)
create_edges_schema.set_upstream(merge_nodes)
create_bulk_load_nodes.set_upstream(create_nodes_schema)
create_bulk_load_nodes.set_upstream(merge_nodes)
create_bulk_load_edges.set_upstream(create_edges_schema)
create_bulk_load_edges.set_upstream(merge_nodes)
bulk_load.set_upstream(create_bulk_load_nodes)
bulk_load.set_upstream(create_bulk_load_edges)
validate.set_upstream(bulk_load)
check_tranql.set_upstream(bulk_load)

26 changes: 19 additions & 7 deletions dags/roger/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class LakefsConfig(DictLike):
repo: str
enabled: bool = False

def __post_init__(self):
if isinstance(self.enabled, str):
self.enabled = self.enabled.lower() == "true"



@dataclass
Expand All @@ -46,10 +50,8 @@ class LoggingConfig(DictLike):
@dataclass
class KgxConfig(DictLike):
biolink_model_version: str = "1.5.0"
dataset_version: str = "v1.0"
merge_db_id: int = 1
merge_db_temp_dir: str = "workspace"
data_sets: List = field(default_factory=lambda: ['baseline-graph'])
data_sets: List = field(default_factory=lambda: ['baseline-graph:v5.0'])

def __post_init__(self):
# Convert strings to list. In cases where this is passed as env variable with a single value
Expand Down Expand Up @@ -88,7 +90,18 @@ class BulkLoaderConfig(DictLike):

@dataclass
class AnnotationConfig(DictLike):
annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
annotator_type: str = "monarch"
annotator_args: dict = field(
default_factory=lambda: {
"monarch": {
"url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
},
"sapbert": {
"classification_url": "https://med-nemo.apps.renci.org/annotate/",
"annotator_url": "https://babel-sapbert.apps.renci.org/annotate/",
},
}
)
normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie="
synonym_service: str = "https://onto.renci.org/synonyms/"
ontology_metadata: str = "https://api.monarchinitiative.org/api/bioentity/"
Expand Down Expand Up @@ -195,9 +208,8 @@ def to_dug_conf(self) -> DugConfig:
redis_port=self.redisgraph.port,
nboost_host=self.elasticsearch.nboost_host,
preprocessor=self.annotation.preprocessor,
annotator={
'url': self.annotation.annotator,
},
annotator_type=self.annotation.annotator_type,
annotator_args=self.annotation.annotator_args,
normalizer={
'url': self.annotation.normalizer,
},
Expand Down
13 changes: 9 additions & 4 deletions dags/roger/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ annotation_base_data_uri: https://stars.renci.org/var/dug/

kgx:
biolink_model_version: v3.1.2
dataset_version: v5.0
merge_db_id: 1
merge_db_temp_dir: workspace
data_sets:
- baseline-graph
- baseline-graph:v5.0

dug_inputs:
data_source: s3
Expand All @@ -44,10 +42,17 @@ bulk_loader:

annotation:
clear_http_cache: false
annotator: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
annotator_type: monarch
annotator_args:
monarch:
url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
sapbert:
classification_url: "https://med-nemo.apps.renci.org/annotate/"
annotator_url: "https://babel-sapbert.apps.renci.org/annotate/"
normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie="
synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup"
ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/"

preprocessor:
debreviator:
BMI: "body mass index"
Expand Down
Loading

0 comments on commit 21e4271

Please sign in to comment.