Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

annotator modules added by passing config val #90

Merged
merged 58 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
eac8e6a
annotator modules added by passing config val
braswent Nov 6, 2023
3880dbc
Merge branch 'pipeline_parameterize_restructure' into rebased-annotat…
YaphetKG Jan 4, 2024
2480001
fix merge conflict
YaphetKG Jan 4, 2024
b0028c5
following same pattern as parsers , modify configs
YaphetKG Jan 4, 2024
5113953
fix to dug config method
YaphetKG Jan 4, 2024
c59bfb0
fix old dug pipeline for backward compatiblity
YaphetKG Jan 4, 2024
e0dcd93
correct default annotator type
YaphetKG Jan 4, 2024
ae48080
reflective changes
YaphetKG Jan 4, 2024
5fd9168
typo extra quotes
YaphetKG Jan 4, 2024
5ae2d3a
annotator type not being picked up from config
YaphetKG Jan 5, 2024
9ca1e38
remove annotate simple , log env value for lakefs enabled
YaphetKG Jan 16, 2024
348be81
testing lakefs off
YaphetKG Jan 16, 2024
528768d
add more logging
YaphetKG Jan 16, 2024
df89638
add more logging
YaphetKG Jan 16, 2024
683e35b
post init for config to parse to boolean
YaphetKG Jan 17, 2024
6428075
put back task calls
YaphetKG Jan 17, 2024
ec54cc8
revert some changes
YaphetKG Jan 17, 2024
d557c2f
adding new pipeline
YaphetKG Jan 19, 2024
83815ab
lakefs io support for merge task
YaphetKG Jan 19, 2024
80ddf59
fix name
YaphetKG Jan 22, 2024
93dcbe9
add io params for kg tasks
YaphetKG Jan 22, 2024
3676cc9
wire up i/o paths for merge
YaphetKG Jan 23, 2024
0a8be3e
fix variable name
YaphetKG Jan 23, 2024
5a010d8
print files
YaphetKG Jan 23, 2024
1e6a9a2
few debug logs
YaphetKG Jan 23, 2024
c1ae51a
few debug logs
YaphetKG Jan 23, 2024
eb7fdc1
treat path as path not str
YaphetKG Jan 23, 2024
6c49a0c
few debug logs
YaphetKG Jan 23, 2024
7fdf08a
some fixes
YaphetKG Jan 23, 2024
07c5bd9
logging edge files
YaphetKG Jan 23, 2024
999d4a6
bug fix knowledge has edge
YaphetKG Jan 23, 2024
5a8f629
re-org graph structure
YaphetKG Jan 23, 2024
c5d2b0e
adding pathing for other tasks
YaphetKG Jan 23, 2024
96c8ff5
pagenation logic fix for avalon
YaphetKG Jan 23, 2024
62e8a0c
update lakefs client code
YaphetKG Jan 23, 2024
9f03265
fix glob for get kgx files
YaphetKG Jan 23, 2024
749deba
fix up get merged objects
YaphetKG Jan 23, 2024
f4adf0d
send down fake commit id for metadata
YaphetKG Jan 24, 2024
ce4c84f
working on edges schema
YaphetKG Jan 24, 2024
dcfd3db
bulk create nodes I/O
YaphetKG Jan 24, 2024
e43b5f1
find schema file
YaphetKG Jan 24, 2024
db58018
bulk create edges I/O
YaphetKG Jan 24, 2024
44ab91a
bulk create edges I/O
YaphetKG Jan 24, 2024
27fc0e4
bulk load io
YaphetKG Jan 24, 2024
2319a46
no outputs for final tasks
YaphetKG Jan 24, 2024
6429caa
add recursive glob
YaphetKG Jan 24, 2024
3c84f6a
fix globbing
YaphetKG Jan 24, 2024
f8db982
oops
YaphetKG Jan 24, 2024
8ab1a5c
delete dags
YaphetKG Jan 24, 2024
ce59d53
pin dug to latest release
YaphetKG Jan 25, 2024
3434d1a
cruft cleanup
YaphetKG Jan 25, 2024
60f90ba
re-org kgx config
YaphetKG Jan 25, 2024
6f2c0cc
add support for multiple initial repos
YaphetKG Jan 25, 2024
eef984e
fix comma
YaphetKG Jan 25, 2024
f90f13f
create dir to download to
YaphetKG Jan 25, 2024
4fd8ae2
swap branch and repo
YaphetKG Jan 25, 2024
0c057c1
clean up dirs
YaphetKG Jan 26, 2024
9ac704a
fix up other pipeline 👌
YaphetKG Jan 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore
.secret-env
Merge-helm/
Merge-Dug-Architecture.md
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
102 changes: 0 additions & 102 deletions dags/annotate.py

This file was deleted.

29 changes: 0 additions & 29 deletions dags/annotate_simple.py

This file was deleted.

10 changes: 6 additions & 4 deletions dags/dug_helpers/dug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from typing import Union, List

import requests
from dug.core import get_parser, get_plugin_manager, DugConcept
from dug.core.annotate import DugAnnotator, ConceptExpander
from dug.core import get_parser, get_annotator, get_plugin_manager, DugConcept
from dug.core.annotators._base import Annotator
from dug.core.concept_expander import ConceptExpander
from dug.core.crawler import Crawler
from dug.core.factory import DugFactory
from dug.core.parsers import Parser, DugElement
Expand Down Expand Up @@ -44,7 +45,7 @@ def __init__(self, config: RogerConfig, to_string=True):
self.string_handler = logging.StreamHandler(self.log_stream)
log.addHandler(self.string_handler)

self.annotator: DugAnnotator = self.factory.build_annotator()
self.annotator_name: str = config.annotation.annotator_type

self.tranqlizer: ConceptExpander = self.factory.build_tranqlizer()

Expand Down Expand Up @@ -95,6 +96,7 @@ def annotate_files(self, parser_name, parsable_files,
"""
dug_plugin_manager = get_plugin_manager()
parser: Parser = get_parser(dug_plugin_manager.hook, parser_name)
annotator: Annotator = get_annotator(dug_plugin_manager.hook, annotator_name=self.annotator_name, config=self.config.to_dug_conf())
if not output_data_path:
output_data_path = storage.dug_annotation_path('')
log.info("Parsing files")
Expand All @@ -103,7 +105,7 @@ def annotate_files(self, parser_name, parsable_files,
crawler = Crawler(
crawl_file=parse_file,
parser=parser,
annotator=self.annotator,
annotator=annotator,
tranqlizer='',
tranql_queries=[],
http_session=self.cached_session
Expand Down
28 changes: 0 additions & 28 deletions dags/index_dag.py

This file was deleted.

102 changes: 102 additions & 0 deletions dags/knowledge_graph_build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
#

"""
An Airflow workflow for the Roger Translator KGX data pipeline.
"""

from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
import roger
from roger.tasks import default_args, create_python_task
from roger.config import config

""" Build the workflow's tasks and DAG. """
with DAG(
dag_id='knowledge_graph_build',
default_args=default_args,
schedule_interval=None
) as dag:

""" Build the workflow tasks. """
intro = EmptyOperator(task_id='Intro')

# Merge nodes needs inputs from two sources
# 1. baseline and/or CDE KGX files from LakeFS (External repo)
# 2. Infer which local kgx files are needed based on dug_inputs and grab them from the current repo

# build the annotate and index pipeline output locations
#lakefs://yk-heal/main/annotate_and_index/crdc_dataset_pipeline_task_group.make_kgx_crdc/
working_repo = config.lakefs_config.repo
branch = config.lakefs_config.branch
kgx_repos = config.kgx.data_sets
input_repos = [{
'name': repo.split(':')[0],
'branch': repo.split(':')[1],
'path': '*'
} for repo in kgx_repos]

# Figure out a way to extract paths
get_path_on_lakefs = lambda d: f"annotate_and_index/{d}_dataset_pipeline_task_group.make_kgx_{d}/"


for dataset in config.dug_inputs.data_sets:
dataset_name = dataset.split(":")[0]
# add datasets from the other pipeline
input_repos.append(
{
'name': working_repo,
'branch': branch,
'path': get_path_on_lakefs(dataset_name)
}
)

merge_nodes = create_python_task (dag, name="MergeNodes",
a_callable=roger.merge_nodes,
external_repos=input_repos
)

# The rest of these guys can just operate on the local lakefs repo/branch
# we need to add input dir and output dir similar to what we did for dug tasks

create_nodes_schema = create_python_task(dag,
name="CreateNodesSchema",
a_callable=roger.create_nodes_schema
)
create_edges_schema = create_python_task(dag,
name="CreateEdgesSchema",
a_callable=roger.create_edges_schema)

create_bulk_load_nodes = create_python_task(dag,
name="CreateBulkLoadNodes",
a_callable=roger.create_bulk_nodes)
create_bulk_load_edges = create_python_task(dag,
name="CreateBulkLoadEdges",
a_callable=roger.create_bulk_edges)
bulk_load = create_python_task(dag,
name="BulkLoad",
a_callable=roger.bulk_load,
no_output_files=True)
check_tranql = create_python_task(dag,
name="CheckTranql",
a_callable=roger.check_tranql,
no_output_files=True)
validate = create_python_task(dag,
name="Validate",
a_callable=roger.validate,
no_output_files=True)


""" Build the DAG. """
merge_nodes.set_upstream(intro)
create_nodes_schema.set_upstream(merge_nodes)
create_edges_schema.set_upstream(merge_nodes)
create_bulk_load_nodes.set_upstream(create_nodes_schema)
create_bulk_load_nodes.set_upstream(merge_nodes)
create_bulk_load_edges.set_upstream(create_edges_schema)
create_bulk_load_edges.set_upstream(merge_nodes)
bulk_load.set_upstream(create_bulk_load_nodes)
bulk_load.set_upstream(create_bulk_load_edges)
validate.set_upstream(bulk_load)
check_tranql.set_upstream(bulk_load)

26 changes: 19 additions & 7 deletions dags/roger/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class LakefsConfig(DictLike):
repo: str
enabled: bool = False

def __post_init__(self):
if isinstance(self.enabled, str):
self.enabled = self.enabled.lower() == "true"



@dataclass
Expand All @@ -46,10 +50,8 @@ class LoggingConfig(DictLike):
@dataclass
class KgxConfig(DictLike):
biolink_model_version: str = "1.5.0"
dataset_version: str = "v1.0"
merge_db_id: int = 1
merge_db_temp_dir: str = "workspace"
data_sets: List = field(default_factory=lambda: ['baseline-graph'])
data_sets: List = field(default_factory=lambda: ['baseline-graph:v5.0'])

def __post_init__(self):
# Convert strings to list. In cases where this is passed as env variable with a single value
Expand Down Expand Up @@ -88,7 +90,18 @@ class BulkLoaderConfig(DictLike):

@dataclass
class AnnotationConfig(DictLike):
annotator: str = "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
annotator_type: str = "monarch"
annotator_args: dict = field(
default_factory=lambda: {
"monarch": {
"url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
},
"sapbert": {
"classification_url": "https://med-nemo.apps.renci.org/annotate/",
"annotator_url": "https://babel-sapbert.apps.renci.org/annotate/",
},
}
)
normalizer: str = "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie="
synonym_service: str = "https://onto.renci.org/synonyms/"
ontology_metadata: str = "https://api.monarchinitiative.org/api/bioentity/"
Expand Down Expand Up @@ -195,9 +208,8 @@ def to_dug_conf(self) -> DugConfig:
redis_port=self.redisgraph.port,
nboost_host=self.elasticsearch.nboost_host,
preprocessor=self.annotation.preprocessor,
annotator={
'url': self.annotation.annotator,
},
annotator_type=self.annotation.annotator_type,
annotator_args=self.annotation.annotator_args,
normalizer={
'url': self.annotation.normalizer,
},
Expand Down
13 changes: 9 additions & 4 deletions dags/roger/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ annotation_base_data_uri: https://stars.renci.org/var/dug/

kgx:
biolink_model_version: v3.1.2
dataset_version: v5.0
merge_db_id: 1
merge_db_temp_dir: workspace
data_sets:
- baseline-graph
- baseline-graph:v5.0

dug_inputs:
data_source: s3
Expand All @@ -44,10 +42,17 @@ bulk_loader:

annotation:
clear_http_cache: false
annotator: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
annotator_type: monarch
annotator_args:
monarch:
url: "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content="
sapbert:
classification_url: "https://med-nemo.apps.renci.org/annotate/"
annotator_url: "https://babel-sapbert.apps.renci.org/annotate/"
normalizer: "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie="
synonym_service: "https://name-resolution-sri.renci.org/reverse_lookup"
ontology_metadata: "https://api.monarchinitiative.org/api/bioentity/"

preprocessor:
debreviator:
BMI: "body mass index"
Expand Down
Loading