Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heal parsers #97

Merged
merged 73 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
eac8e6a
annotator modules added by passing config val
braswent Nov 6, 2023
3880dbc
Merge branch 'pipeline_parameterize_restructure' into rebased-annotat…
YaphetKG Jan 4, 2024
2480001
fix merge conflict
YaphetKG Jan 4, 2024
b0028c5
following same pattern as parsers , modify configs
YaphetKG Jan 4, 2024
5113953
fix to dug config method
YaphetKG Jan 4, 2024
c59bfb0
fix old dug pipeline for backward compatiblity
YaphetKG Jan 4, 2024
e0dcd93
correct default annotator type
YaphetKG Jan 4, 2024
ae48080
reflective changes
YaphetKG Jan 4, 2024
5fd9168
typo extra quotes
YaphetKG Jan 4, 2024
5ae2d3a
annotator type not being picked up from config
YaphetKG Jan 5, 2024
9ca1e38
remove annotate simple , log env value for lakefs enabled
YaphetKG Jan 16, 2024
348be81
testing lakefs off
YaphetKG Jan 16, 2024
528768d
add more logging
YaphetKG Jan 16, 2024
df89638
add more logging
YaphetKG Jan 16, 2024
683e35b
post init for config to parse to boolean
YaphetKG Jan 17, 2024
6428075
put back task calls
YaphetKG Jan 17, 2024
ec54cc8
revert some changes
YaphetKG Jan 17, 2024
d557c2f
adding new pipeline
YaphetKG Jan 19, 2024
83815ab
lakefs io support for merge task
YaphetKG Jan 19, 2024
80ddf59
fix name
YaphetKG Jan 22, 2024
93dcbe9
add io params for kg tasks
YaphetKG Jan 22, 2024
3676cc9
wire up i/o paths for merge
YaphetKG Jan 23, 2024
0a8be3e
fix variable name
YaphetKG Jan 23, 2024
5a010d8
print files
YaphetKG Jan 23, 2024
1e6a9a2
few debug logs
YaphetKG Jan 23, 2024
c1ae51a
few debug logs
YaphetKG Jan 23, 2024
eb7fdc1
treat path as path not str
YaphetKG Jan 23, 2024
6c49a0c
few debug logs
YaphetKG Jan 23, 2024
7fdf08a
some fixes
YaphetKG Jan 23, 2024
07c5bd9
logging edge files
YaphetKG Jan 23, 2024
999d4a6
bug fix knowledge has edge
YaphetKG Jan 23, 2024
5a8f629
re-org graph structure
YaphetKG Jan 23, 2024
c5d2b0e
adding pathing for other tasks
YaphetKG Jan 23, 2024
96c8ff5
pagenation logic fix for avalon
YaphetKG Jan 23, 2024
62e8a0c
update lakefs client code
YaphetKG Jan 23, 2024
9f03265
fix glob for get kgx files
YaphetKG Jan 23, 2024
749deba
fix up get merged objects
YaphetKG Jan 23, 2024
f4adf0d
send down fake commit id for metadata
YaphetKG Jan 24, 2024
ce4c84f
working on edges schema
YaphetKG Jan 24, 2024
dcfd3db
bulk create nodes I/O
YaphetKG Jan 24, 2024
e43b5f1
find schema file
YaphetKG Jan 24, 2024
db58018
bulk create edges I/O
YaphetKG Jan 24, 2024
44ab91a
bulk create edges I/O
YaphetKG Jan 24, 2024
27fc0e4
bulk load io
YaphetKG Jan 24, 2024
2319a46
no outputs for final tasks
YaphetKG Jan 24, 2024
6429caa
add recursive glob
YaphetKG Jan 24, 2024
3c84f6a
fix globbing
YaphetKG Jan 24, 2024
f8db982
oops
YaphetKG Jan 24, 2024
8ab1a5c
delete dags
YaphetKG Jan 24, 2024
ce59d53
pin dug to latest release
YaphetKG Jan 25, 2024
3434d1a
cruft cleanup
YaphetKG Jan 25, 2024
60f90ba
re-org kgx config
YaphetKG Jan 25, 2024
6f2c0cc
add support for multiple initial repos
YaphetKG Jan 25, 2024
eef984e
fix comma
YaphetKG Jan 25, 2024
f90f13f
create dir to download to
YaphetKG Jan 25, 2024
4fd8ae2
swap branch and repo
YaphetKG Jan 25, 2024
0c057c1
clean up dirs
YaphetKG Jan 26, 2024
9ac704a
fix up other pipeline 👌
YaphetKG Jan 26, 2024
b663308
add remaining pipelines
YaphetKG Jan 29, 2024
0f3ae35
adding ctn parser
YaphetKG Jan 30, 2024
aa5ce40
change merge strategy
YaphetKG Feb 1, 2024
455c3f3
merge init fix
YaphetKG Feb 1, 2024
f4f9d64
debug dir
YaphetKG Feb 5, 2024
6303ac0
fix topmed file read
YaphetKG Feb 6, 2024
a00c13c
fix topmed file read
YaphetKG Feb 6, 2024
8710326
return file names as strings
YaphetKG Feb 6, 2024
56437ab
topmed kgx builder custom
YaphetKG Feb 8, 2024
c3c743d
topmed kgx builder custom
YaphetKG Feb 8, 2024
404f7cc
add skip
YaphetKG Feb 8, 2024
6262a5c
get files pattern recursive
YaphetKG Feb 8, 2024
77ddb9f
Merge branch 'pipeline_parameterize_restructure' into add-heal-parsers
YaphetKG Feb 8, 2024
f284a1c
version pin avalon
YaphetKG Feb 8, 2024
d5bfc5e
pin dug
YaphetKG Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dags/annotate_and_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from roger import pipelines
from roger.config import config
envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed")
envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed:v2.0")
data_sets = envspec.split(",")
pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in data_sets}
for pipeline_class in pipelines.get_pipeline_classes(pipeline_names):
Expand Down
4 changes: 3 additions & 1 deletion dags/roger/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def convert_to_kgx_json(self, elements, written_nodes=None):
nodes = graph['nodes']

for _, element in enumerate(elements):
# DugElement means a variable (Study variable...)
# DugElement means a variable (Study variable...)
if not isinstance(element, DugElement):
continue
study_id = element.collection_id
if study_id not in written_nodes:
nodes.append({
Expand Down
4 changes: 2 additions & 2 deletions dags/roger/pipelines/bdc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"Pipeline for BACPAC data"
"Pipeline for BDC-dbGap data"

from roger.pipelines import DugPipeline
from roger.core import storage

class bdcPipeline(DugPipeline):
"Pipeline for BACPAC data set"
"Pipeline for BDC-dbGap data set"
pipeline_name = "bdc"
parser_name = "dbgap"

Expand Down
4 changes: 2 additions & 2 deletions dags/roger/pipelines/crdc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"Pipeline for BACPAC data"
"Pipeline for Cancer Commons data"

from roger.pipelines import DugPipeline
from roger.core import storage

class CRDCPipeline(DugPipeline):
"Pipeline for BACPAC data set"
"Pipeline for Cancer Commons data set"
pipeline_name = "crdc"
parser_name = "crdc"

Expand Down
10 changes: 10 additions & 0 deletions dags/roger/pipelines/ctn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"Pipeline for Clinical trials network data"

from roger.pipelines import DugPipeline

class CTNPipeline(DugPipeline):
"Pipeline for Clinical trials nework data set"
pipeline_name = "ctn"
parser_name = "ctn"


16 changes: 16 additions & 0 deletions dags/roger/pipelines/heal_research_programs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"Pipeline for Heal-studies data"

from roger.pipelines import DugPipeline
from roger.core import storage

class HealResearchProgramPipeline(DugPipeline):
"Pipeline for Heal-research-programs data set"
pipeline_name = "heal-research-programs"
parser_name = "heal-research"

def get_objects(self, input_data_path=None):
if not input_data_path:
input_data_path = storage.dug_heal_research_program_path()
files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'),
input_data_path)
return sorted([str(f) for f in files])
16 changes: 16 additions & 0 deletions dags/roger/pipelines/heal_studies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"Pipeline for Heal-studies data"

from roger.pipelines import DugPipeline
from roger.core import storage

class HealStudiesPipeline(DugPipeline):
"Pipeline for Heal-studies data set"
pipeline_name = "heal-studies"
parser_name = "heal-studies"

def get_objects(self, input_data_path=None):
if not input_data_path:
input_data_path = storage.dug_heal_study_path()
files = storage.get_files_recursive(lambda file_name: file_name.endswith('.xml'),
input_data_path)
return sorted([str(f) for f in files])
4 changes: 2 additions & 2 deletions dags/roger/pipelines/kfdrc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"Pipeline for BACPAC data"
"Pipeline for KDFRC data"

from roger.pipelines import DugPipeline
from roger.core import storage

class kfdrcPipeline(DugPipeline):
"Pipeline for BACPAC data set"
"Pipeline for KDFRC data set"
pipeline_name = "kfdrc"
parser_name = "kfdrc"

Expand Down
4 changes: 2 additions & 2 deletions dags/roger/pipelines/nida.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ def get_objects(self, input_data_path=None):
if not input_data_path:
input_data_path = storage.dug_input_files_path(
self.get_files_dir())
nida_file_pattern = storage.os.path.join(input_data_path, 'NIDA-*.xml')
return sorted(storage.glob.glob(nida_file_pattern))
files = sorted(storage.get_files_recursive(lambda x: 'NIDA-' in x , input_data_path))
return files
17 changes: 17 additions & 0 deletions dags/roger/pipelines/sparc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"Pipeline for Sparc data"

from roger.pipelines import DugPipeline
from roger.core import storage

class SparcPipeline(DugPipeline):
"Pipeline for Sparc data set"
pipeline_name = "sparc"
parser_name = "SciCrunch"

def get_objects(self, input_data_path=None):
if not input_data_path:
input_data_path = storage.dug_heal_study_path()
files = storage.get_files_recursive(
lambda x: True, input_data_path
)
return sorted([str(f) for f in files])
37 changes: 31 additions & 6 deletions dags/roger/pipelines/topmed.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
"Pipeline for BACPAC data"
"Pipeline for Topmed data"

from roger.pipelines import DugPipeline
from roger.pipelines.base import log, os
import jsonpickle
from roger.core import storage

from roger.logger import logger
class TopmedPipeline(DugPipeline):
"Pipeline for BACPAC data set"
"Pipeline for Topmed data set"
pipeline_name = "topmed"
parser_name = "TOPMedTag"

def get_objects(self, input_data_path=None):
if not input_data_path:
input_data_path = str(storage.dug_input_files_path('topmed'))
topmed_file_pattern = storage.os.path.join(input_data_path,
"topmed_*.csv")
return sorted(storage.glob.glob(topmed_file_pattern))
files =storage.get_files_recursive(
lambda file_name: file_name.endswith('.csv'),
input_data_path)
return sorted([str(x) for x in files])

def make_kg_tagged(self, to_string=False, elements_files=None,
input_data_path=None, output_data_path=None):
"Create tagged knowledge graphs from elements"
log.info("Override base.make_kg_tagged called")
if not output_data_path:
output_data_path = storage.dug_kgx_path("")
storage.clear_dir(output_data_path)
if not elements_files:
elements_files = storage.dug_elements_objects(input_data_path, format='txt')
for file_ in elements_files:
elements = jsonpickle.decode(storage.read_object(file_))
kg = self.make_tagged_kg(elements)
dug_base_file_name = file_.split(os.path.sep)[-2]
output_file_path = os.path.join(output_data_path,
dug_base_file_name + '_kgx.json')
storage.write_object(kg, output_file_path)
log.info("Wrote %d and %d edges, to %s", len(kg['nodes']),
len(kg['edges']), output_file_path)
output_log = self.log_stream.getvalue() if to_string else ''
return output_log

8 changes: 7 additions & 1 deletion dags/roger/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from roger.pipelines.base import DugPipeline
from avalon.mainoperations import put_files, LakeFsWrapper, get_files
from lakefs_sdk.configuration import Configuration
from lakefs_sdk.models.merge import Merge
from functools import partial

logger = get_logger()
Expand Down Expand Up @@ -186,9 +187,13 @@ def avalon_commit_callback(context: DagContext, **kwargs):

try:
# merging temp branch to working branch
# the current working branch wins incase of conflicts
merge = Merge(**{"strategy": "source-wins"})
client._client.refs_api.merge_into_branch(repository=repo,
source_ref=temp_branch_name,
destination_branch=branch)
destination_branch=branch,
merge=merge
)

logger.info(f"merged branch {temp_branch_name} into {branch}")
except Exception as e:
Expand All @@ -203,6 +208,7 @@ def avalon_commit_callback(context: DagContext, **kwargs):
logger.info(f"deleted temp branch {temp_branch_name}")
logger.info(f"deleting local dir {local_path}")
files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path]

clean_up(context, **kwargs)

def clean_up(context: DagContext, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ jsonpickle
redisgraph-bulk-loader==0.12.3
pytest
PyYAML
git+https://github.com/helxplatform/[email protected].0
git+https://github.com/helxplatform/[email protected].1
orjson
kg-utils==0.0.6
bmt==1.1.0
git+https://github.com/helxplatform/avalon.git@pydantic-upgrade
git+https://github.com/helxplatform/avalon.git@v1.0.0
linkml-runtime==1.6.0