Skip to content

Commit

Permalink
Pipeline parameterize restructure (#95)
Browse files Browse the repository at this point in the history
* roger cli preped for Merge Deploy

* Update Makefile to work with python env

* Update redisgraph-bulk-loader to fix issue with loading MODULE LIST

* Revert "Update redisgraph-bulk-loader to fix issue with loading MODULE LIST"

This reverts commit 7baf7ef.

* Finalized dev deployment of dug inside Catapult Merge, deployment yamls, code changes and configurations

* updated to reflect the Dug-Api updates to FastAPI

* adding multi label redis by removing 'biolink:' on nodes, edges cannot be fixed after update so they need to be solved either by changing TranQl AND Plater or forking bulk-redisgraph to allow for colons to be added in the edges

* Working multi label redis nodes w/ no biolink label

* Latest code changes to deploy working Roger in Merge

* biolink data move to '.' separator

* updates to include new dug fixes, upgraded redis-bulk-loader and made changes to for biolink variables to specify it's domain with a 'biolink.'

* adding test roger code

* removed helm deployments

* change docker owner

* remove core.py

* remove dup dev config

* redis graph is not directly used removing cruft

* remove print statement

* remove logging files

* update requriemtns

* update requriemtns

* add redis graph.py

* fix import error for logger

* adding es scheme and ca_path config

* adding es scheme and ca_path config

* Parameterized annotate tasks with input_data_path and output_data_path

* adding debug code

* removing debug

* adding nodes args

* adding biolink.

* adding biolink.

* Parameterized annotate tasks with input_data_path and output_data_path (#85)

* adding lakefs changes to roger-2.0

* point avalon to vg1 branch

* change avalon dep

* update airflow

* fix avalon tag typo

* update jenkins to tag version on main branch only

* update jenkins to tag version

* update jenkins to tag version

* psycopg2 installation

* add cncf k8s req

* use airflow non-slim

* simplified for testing

* simplified for testing

* change dag name

* Erroneous parameter passed, should not be None

* adding pre-exec

* adding pre-exec

* adding pre-exec

* typo preexec

* typo preexec

* fix context

* get files from repo

* get files from repo

* get files from repo

* get files from repo

* First shot at moving pipeline into base class and implementing. Anvil pipeline not complete

* Syntax fix, docker image version bump to airflow 2.7.2-python3.11

* update storage dir

* update remove dir code

* update remove dir code

* remote path to *

* fix input dir for annotators

* fix input dir for annotators

* fix input dir for annotators

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* kwargs to task

* adding branch info on lakefs config

* callback push to branch

* back to relative import

* reformat temp branch name based on unique task id

* add logging

* add logging

* convert posix path to str for avalon

* add extra / to root path

* New dag created using DugPipeline subclasses

* EmptyOperator imported from wrong place

* import and syntax fixes

* utterly silly syntax error

* Added anvil to default input data sets for testing purposes

* adding / to local path

* commit meta task args empty string

* add merge logic

* add merge logic

* upstream task dir pull for downstream task

* Switched from subdag to taskgroup because latest Airflow depricated subdag

* Added BACPAC pipeline object

* Temporarily ignoring configuration variable for enabled datasets for testing

* Passed dag in to create task group to see if it helps dag errors

* Fixed silly syntax error

* adding input / output dir params for make kgx

* Trying different syntax to make taskgroups work.

* adding input / output dir params for make kgx

* Parsing, syntax, pylint fixes

* adding input / output dir params for make kgx

* Added pipeline name to task group name to ensure uniqueness

* oops, moved something out of scope. Fixed

* Filled out pipeline with methods from dug_utils. Needs data path changes

* Finished implementing input_data_path and output_data_path handling, pylint cleanup

* Update requirements.txt

* adding toggle to avoid sending config obj

* adding toggle to avoid sending config obj

* disable to string for test

* control pipelines for testing

* add self to anvil get files

* add log stream to make it available

* typo fix

* correcting branch id

* adding source repo

* adding source repo

* patch name-resolver response

* no pass input repo and branch , if not overriden to pre-exec

* no pass input repo and branch , if not overriden to pre-exec

* no pass input repo and branch , if not overriden to pre-exec

* dug pipeline edit

* recurisvely find recursively

* recurisvely find recursively

* setup output path for crawling

* all task functions should have input and output params

* adding annotation as upstream for validate index

* revamp create task , and task wrapper

* add validate concepts index task

* adding concept validation

* add index_variables task as dependecy for validate concepts

* add index_variables task as dependecy for validate concepts

* await client exist

* await client exist

* concepts not getting picked up for indexing

* concepts not getting picked up for indexing

* fix search elements

* converting annotation output to json

* json format annotation outputs

* adding support for json format elements and concepts read

* json back to dug objects

* fixing index valriables with json objects

* indetation and new line for better change detection :?

* indetation and new line for better change detection

* treat dictionary concepts as dictionary

* read concepts json as a dict

* concepts files are actually file paths

* debug message

* make output jsonable

* clear up dir after commit , and delete unmerged branch even if no changes

* don`t clear indexes, parallel dataset processing will be taxed

* memory leak?

* memory leak?

* memory leak?

* dumping pickles to debug locally

* find out why concepts are being added to every other element

* find out why concepts are being added to every other element

* pointless shuffle 🤷‍♂️

* revert back in time

* back to sanitize dug

* output just json for annotation

* adding jsonpickle

* jsonpickle 🥒

* unpickle for index

* unpickle for validate index

* crawling fixes

* crawling fixes

* crawling validation fixes

* fix index concepts

* fix makekgx

* adding other bdc pipelines

* adding pipeline paramters to be able to configure per instance

* fix

* add input dataset for pipelines

* Adding README to document how to create data set-specific pipelines

* catchup on base.py

* Added dbgap and nida pipelines

* fix import errors

* annotator modules added by passing config val (#90)

* annotator modules added by passing config val

* fix merge conflict

* following same pattern as parsers , modify configs

* fix to dug config method

* fix old dug pipeline for backward compatiblity

* correct default annotator type

* reflective changes

* typo extra quotes

* annotator type not being picked up from config

* remove annotate simple , log env value for lakefs enabled

* testing lakefs off

* add more logging

* add more logging

* post init for config to parse to boolean

* put back task calls

* revert some changes

* adding new pipeline

* lakefs io support for merge task

* fix name

* add io params for kg tasks

* wire up i/o paths for merge

* fix variable name

* print files

* few debug logs

* few debug logs

* treat path as path not str

* few debug logs

* some fixes

* logging edge files

* bug fix knowledge has edge

* re-org graph structure

* adding pathing for other tasks

* pagenation logic fix for avalon

* update lakefs client code

* fix glob for get kgx files

* fix up get merged objects

* send down fake commit id for metadata

* working on edges schema

* bulk create nodes I/O

* find schema file

* bulk create edges  I/O

* bulk create edges  I/O

* bulk load io

* no outputs for final tasks

* add recursive glob

* fix globbing

* oops

* delete dags

* pin dug to latest release

* cruft cleanup

* re-org kgx config

* add support for multiple initial repos

* fix comma

* create dir to download to

* swap branch and repo

* clean up dirs

* fix up other pipeline 👌

---------

Co-authored-by: YaphetKG <[email protected]>

* Add heal parsers (#96)

* annotator modules added by passing config val

* fix merge conflict

* following same pattern as parsers , modify configs

* fix to dug config method

* fix old dug pipeline for backward compatiblity

* correct default annotator type

* reflective changes

* typo extra quotes

* annotator type not being picked up from config

* remove annotate simple , log env value for lakefs enabled

* testing lakefs off

* add more logging

* add more logging

* post init for config to parse to boolean

* put back task calls

* revert some changes

* adding new pipeline

* lakefs io support for merge task

* fix name

* add io params for kg tasks

* wire up i/o paths for merge

* fix variable name

* print files

* few debug logs

* few debug logs

* treat path as path not str

* few debug logs

* some fixes

* logging edge files

* bug fix knowledge has edge

* re-org graph structure

* adding pathing for other tasks

* pagenation logic fix for avalon

* update lakefs client code

* fix glob for get kgx files

* fix up get merged objects

* send down fake commit id for metadata

* working on edges schema

* bulk create nodes I/O

* find schema file

* bulk create edges  I/O

* bulk create edges  I/O

* bulk load io

* no outputs for final tasks

* add recursive glob

* fix globbing

* oops

* delete dags

* pin dug to latest release

* cruft cleanup

* re-org kgx config

* add support for multiple initial repos

* fix comma

* create dir to download to

* swap branch and repo

* clean up dirs

* fix up other pipeline 👌

* add remaining pipelines

* adding ctn parser

* change merge strategy

* merge init fix

* debug dir

* fix topmed file read

* fix topmed file read

* return file names as strings

* topmed kgx builder custom

* topmed kgx builder custom

* add skip

* get files pattern recursive

* version pin avalon

* pin dug

---------

Co-authored-by: braswent <[email protected]>

* Add heal parsers (#97)

* annotator modules added by passing config val

* fix merge conflict

* following same pattern as parsers , modify configs

* fix to dug config method

* fix old dug pipeline for backward compatiblity

* correct default annotator type

* reflective changes

* typo extra quotes

* annotator type not being picked up from config

* remove annotate simple , log env value for lakefs enabled

* testing lakefs off

* add more logging

* add more logging

* post init for config to parse to boolean

* put back task calls

* revert some changes

* adding new pipeline

* lakefs io support for merge task

* fix name

* add io params for kg tasks

* wire up i/o paths for merge

* fix variable name

* print files

* few debug logs

* few debug logs

* treat path as path not str

* few debug logs

* some fixes

* logging edge files

* bug fix knowledge has edge

* re-org graph structure

* adding pathing for other tasks

* pagenation logic fix for avalon

* update lakefs client code

* fix glob for get kgx files

* fix up get merged objects

* send down fake commit id for metadata

* working on edges schema

* bulk create nodes I/O

* find schema file

* bulk create edges  I/O

* bulk create edges  I/O

* bulk load io

* no outputs for final tasks

* add recursive glob

* fix globbing

* oops

* delete dags

* pin dug to latest release

* cruft cleanup

* re-org kgx config

* add support for multiple initial repos

* fix comma

* create dir to download to

* swap branch and repo

* clean up dirs

* fix up other pipeline 👌

* add remaining pipelines

* adding ctn parser

* change merge strategy

* merge init fix

* debug dir

* fix topmed file read

* fix topmed file read

* return file names as strings

* topmed kgx builder custom

* topmed kgx builder custom

* add skip

* get files pattern recursive

* version pin avalon

* pin dug

---------

Co-authored-by: braswent <[email protected]>

* Radx pipeline (#99)

* point to large download

* fix schema path

* debug bulk input dir

* fix schema read

* fix schema read

* fix schema read

* commenting steup dir for test

* adding logs

* fix path stuff

* add commented stuff back in

* testing radx parser

* adding parser

* skip indexing vars with no id

* adding indexes as part of bulk loader paramters

* fix id index cli arg

* fix local cli

* dug latest

---------

Co-authored-by: Nathan Braswell <[email protected]>
Co-authored-by: esurface <[email protected]>
Co-authored-by: braswent <[email protected]>
Co-authored-by: Michael T. Bacon <[email protected]>
Co-authored-by: Michael T Bacon <[email protected]>
  • Loading branch information
6 people authored Apr 18, 2024
1 parent 85cbead commit 2eb136a
Show file tree
Hide file tree
Showing 35 changed files with 2,238 additions and 470 deletions.
6 changes: 3 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ DATA_DIR=./local_storage

DUG_LOG_LEVEL=INFO

ELASTIC_PASSWORD=12345
ELASTIC_API_HOST=elasticsearch
ELASTIC_USERNAME=elastic
ELASTICSEARCH_PASSWORD=12345
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_USERNAME=elastic

NBOOST_API_HOST=nboost

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Git ignore bioler plate from https://github.com/github/gitignore/blob/master/Python.gitignore
.secret-env
.vscode/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
10 changes: 4 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
FROM apache/airflow:2.5.0-python3.10
FROM apache/airflow:2.7.2-python3.11

USER root
RUN apt-get update && \
apt-get install -y git gcc python3-dev nano vim
apt-get install -y git nano vim
COPY requirements.txt requirements.txt
USER airflow
# dependency resolution taking hours eventually failing,
# @TODO fix click lib dependency
RUN pip install -r requirements.txt && \
pip uninstall -y elasticsearch-dsl
RUN pip install -r requirements.txt
RUN rm -f requirements.txt
32 changes: 8 additions & 24 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,31 +70,15 @@ spec:
steps {
script {
container(name: 'kaniko', shell: '/busybox/sh') {
kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2", "$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"])
if (env.BRANCH_NAME == "main") {
// Tag with latest and version iff when pushed to master
kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2", "$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"])
} else {
kaniko.buildAndPush("./Dockerfile", ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2"])
}
}
}
}
// post {
// always {
// archiveArtifacts artifacts: 'image.tar', onlyIfSuccessful: true
// }
// }
}
// stage('Publish') {
// steps {
// script {
// container(name: 'crane', shell: '/busybox/sh') {
// def imageTagsPushAlways = ["$IMAGE_NAME:$TAG1", "$IMAGE_NAME:$TAG2"]
// def imageTagsPushForDevelopBranch = ["$IMAGE_NAME:$TAG3"]
// def imageTagsPushForMasterBranch = ["$IMAGE_NAME:$TAG3", "$IMAGE_NAME:$TAG4"]
// image.publish(
// imageTagsPushAlways,
// imageTagsPushForDevelopBranch,
// imageTagsPushForMasterBranch
// )
// }
// }
// }
// }
}
}
}
}
103 changes: 0 additions & 103 deletions dags/annotate.py

This file was deleted.

44 changes: 44 additions & 0 deletions dags/annotate_and_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""DAG which performs Dug annotate and index operations
This DAG differes slightly from prior versions of the same functionality in
Roger not only in that the annotation and indexing happen in the same DAG, but
also those tasks are broken out into sub-DAGs organized by dataset. Each dataset
has a subdag for all tasks.
"""

import os

from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from roger.tasks import default_args, create_pipeline_taskgroup

env_enabled_datasets = os.getenv(
"ROGER_DUG__INPUTS_DATA__SETS", "topmed,anvil").split(",")

with DAG(
dag_id='annotate_and_index',
default_args=default_args,
schedule_interval=None
) as dag:
init = EmptyOperator(task_id="init", dag=dag)
finish = EmptyOperator(task_id="finish", dag=dag)

from roger import pipelines
from roger.config import config
envspec = os.getenv("ROGER_DUG__INPUTS_DATA__SETS","topmed:v2.0")
data_sets = envspec.split(",")
pipeline_names = {x.split(':')[0]: x.split(':')[1] for x in data_sets}
for pipeline_class in pipelines.get_pipeline_classes(pipeline_names):
# Only use pipeline classes that are in the enabled datasets list and
# that have a properly defined pipeline_name attribute

# TODO
# Overriding environment variable just to see if this is working.
# name = getattr(pipeline_class, 'pipeline_name', '*not defined*')
# if not name in env_enabled_datasets:
# continue

# Do the thing to add the pipeline's subdag to the dag in the right way
# . . .

init >> create_pipeline_taskgroup(dag, pipeline_class, config) >> finish
Loading

0 comments on commit 2eb136a

Please sign in to comment.