Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterized annotate tasks with input_data_path and output_data_path #85

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dags/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.operators.empty import EmptyOperator

from dug_helpers.dug_utils import (
DugUtil,
DugUtil,
get_topmed_files,
get_dbgap_files,
get_nida_files,
Expand Down Expand Up @@ -79,7 +79,7 @@
prepare_files = create_python_task(dag, "get_bacpac_files", get_bacpac_files)
annotate_files = create_python_task(dag, "annotate_bacpac_files",
DugUtil.annotate_bacpac_files)

elif data_set.startswith("heal-studies"):
prepare_files = create_python_task(dag, "get_heal_study_files", get_heal_study_files)
annotate_files = create_python_task(dag, "annotate_heal_study_files",
Expand All @@ -92,7 +92,6 @@
annotate_files = create_python_task(dag, "annotate_heal_research_program_files",
DugUtil.annotate_heal_research_program_files)


intro >> prepare_files
prepare_files >> clear_annotation_items

Expand Down
109 changes: 73 additions & 36 deletions dags/dug_helpers/dug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
log.error(f"{exc_val} {exc_val} {exc_tb}")
log.exception("Got an exception")

def annotate_files(self, parser_name, parsable_files):
def annotate_files(self, parser_name, parsable_files,
output_data_path=None):
"""
Annotates a Data element file using a Dug parser.
:param parser_name: Name of Dug parser to use.
Expand All @@ -94,7 +95,8 @@ def annotate_files(self, parser_name, parsable_files):
"""
dug_plugin_manager = get_plugin_manager()
parser: Parser = get_parser(dug_plugin_manager.hook, parser_name)
output_base_path = storage.dug_annotation_path('')
if not output_data_path:
output_data_path = storage.dug_annotation_path('')
log.info("Parsing files")
for parse_file in parsable_files:
log.debug("Creating Dug Crawler object")
Expand All @@ -109,7 +111,7 @@ def annotate_files(self, parser_name, parsable_files):

# configure output space.
current_file_name = '.'.join(os.path.basename(parse_file).split('.')[:-1])
elements_file_path = os.path.join(output_base_path, current_file_name)
elements_file_path = os.path.join(output_data_path, current_file_name)
elements_file_name = 'elements.pickle'
concepts_file_name = 'concepts.pickle'

Expand Down Expand Up @@ -549,131 +551,166 @@ def clear_annotation_cached(config=None, to_string=False):
dug.cached_session.cache.clear()

@staticmethod
def annotate_db_gap_files(config=None, to_string=False, files=None):
def annotate_db_gap_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_dd_xml_objects()
files = storage.dug_dd_xml_objects(
input_data_path=input_data_path)
parser_name = "DbGaP"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_anvil_files(config=None, to_string=False, files=None):
def annotate_anvil_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_anvil_objects()
files = storage.dug_anvil_objects(
input_data_path=input_data_path)
parser_name = "Anvil"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=None)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_cancer_commons_files(config=None, to_string=False, files=None):
def annotate_cancer_commons_files(config=None, to_string=False, files=None,
input_data_path=None,
output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_crdc_objects()
files = storage.dug_crdc_objects(
input_data_path=input_data_path)
parser_name = "crdc"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_kids_first_files(config=None, to_string=False, files=None):
def annotate_kids_first_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_kfdrc_objects()
files = storage.dug_kfdrc_objects(
input_data_path=input_data_path)
parser_name = "kfdrc"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_nida_files(config=None, to_string=False, files=None):
def annotate_nida_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_nida_objects()
files = storage.dug_nida_objects(
input_data_path=input_data_path)
parser_name = "NIDA"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_sparc_files(config=None, to_string=False, files=None):
def annotate_sparc_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_sparc_objects()
files = storage.dug_sparc_objects(
input_data_path=input_data_path)
parser_name = "SciCrunch"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_sprint_files(config=None, to_string=False, files=None):
def annotate_sprint_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_sprint_objects()
files = storage.dug_sprint_objects(
input_data_path=input_data_path)
parser_name = "SPRINT"
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_topmed_files(config=None, to_string=False, files=None):
def annotate_topmed_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_topmed_objects()
files = storage.dug_topmed_objects(
input_data_path=None)
parser_name = "TOPMedTag"
log.info(files)
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

@staticmethod
def annotate_bacpac_files(config=None, to_string=False, files=None):
def annotate_bacpac_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_bacpac_objects()
files = storage.dug_bacpac_objects(
input_data_path=None)
parser_name = "BACPAC"
log.info(files)
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log


@staticmethod
def annotate_heal_study_files(config=None, to_string=False, files=None):
def annotate_heal_study_files(config=None, to_string=False, files=None,
input_data_path=None, output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_heal_study_objects()
files = storage.dug_heal_study_objects(
input_data_path=None)

parser_name = "heal-studies"
log.info(files)
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log


@staticmethod
def annotate_heal_research_program_files(config=None, to_string=False, files=None):
def annotate_heal_research_program_files(config=None, to_string=False,
files=None, input_data_path=None,
output_data_path=None):
with Dug(config, to_string=to_string) as dug:
if files is None:
files = storage.dug_heal_research_program_objects()
files = storage.dug_heal_research_program_objects(
input_data_path=None)

parser_name = "heal-research"
log.info(files)
dug.annotate_files(parser_name=parser_name,
parsable_files=files)
parsable_files=files,
output_data_path=output_data_path)
output_log = dug.log_stream.getvalue() if to_string else ''
return output_log

Expand Down
Loading