Skip to content

Commit

Permalink
Merge pull request #2 from dirac-institute/awo/parsl-poc
Browse files Browse the repository at this point in the history
Cleaning up logging, adding new tasks directory where the task logic will live
  • Loading branch information
drewoldag authored Jul 8, 2024
2 parents a6ae65d + 10a432d commit 3c667f1
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
python-version: '3.12'
- name: Install dependencies
run: |
sudo apt-get update
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,6 @@ _html/

# Project initialization script
.initialize_new_project.sh

# Parsl log files
runinfo/
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ repos:
# supported by your project here, or alternatively use
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.10
language_version: python3.12
# Make sure Sphinx can build the documentation while explicitly omitting
# notebooks from the docs, so users don't have to wait through the execution
# of each notebook or each commit. By default, these will be checked in the
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ testpaths = [

[tool.black]
line-length = 110
target-version = ["py39"]
target-version = ["py312"]

[tool.isort]
profile = "black"
line_length = 110

[tool.ruff]
line-length = 110
target-version = "py39"
target-version = "py312"

[tool.ruff.lint]
select = [
Expand Down
3 changes: 0 additions & 3 deletions src/kbmod_wf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
from .configurations.dev_configuration import dev_config
from .workflow import workflow_runner
from .configurations.klone_configuration import klone_config
from .utilities.logger_utilities import configure_logger
8 changes: 6 additions & 2 deletions src/kbmod_wf/configurations/dev_configuration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os
from parsl import Config
from parsl.executors import ThreadPoolExecutor

this_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = os.path.abspath(os.path.join(this_dir, "../../../"))


def dev_config():
return Config(
# run_dir='runinfo', # do some introspection here so that we can place the runinfo directory somewhere above src.
initialize_logging=False,
# put the log files in in the top level folder, "run_logs".
run_dir=os.path.join(project_dir, "run_logs"),
executors=[
ThreadPoolExecutor(
label="local_dev_testing",
Expand Down
3 changes: 2 additions & 1 deletion src/kbmod_wf/configurations/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

def klone_config():
return Config(
run_dir="/gscratch/dirac/kbmod/workflow/",
executors=[
HighThroughputExecutor(
label="small_cpu",
Expand All @@ -26,5 +27,5 @@ def klone_config():
walltime=walltimes["compute-bigmem"],
),
),
]
],
)
7 changes: 7 additions & 0 deletions src/kbmod_wf/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .reproject_ic import reproject_ic
from .uri_to_ic import uri_to_ic

__all__ = [
"reproject_ic",
"uri_to_ic",
]
3 changes: 3 additions & 0 deletions src/kbmod_wf/tasks/reproject_ic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def reproject_ic(logger=None):
logger.info("In the reproject_ic task_impl")
return 20
3 changes: 3 additions & 0 deletions src/kbmod_wf/tasks/uri_to_ic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def uri_to_ic(logger=None):
logger.info("In the uri_to_ic task_impl")
return 10
44 changes: 22 additions & 22 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
import os
import parsl
from datetime import datetime
from parsl import python_app, File
import parsl.executors

from kbmod_wf.configurations import *


@python_app(executors=["local_dev_testing"])
def uri_to_ic(inputs=[], outputs=[], logger=None):
def uri_to_ic(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.tasks.uri_to_ic import uri_to_ic

logger = configure_logger("task:uri_to_ic", inputs[-1].filepath)
logger = configure_logger("task.uri_to_ic", logging_file.filepath)

logger.info("Starting uri_to_ic")
output = uri_to_ic(logger=logger)
logger.warning("You're the cool one.")
return 42
return output


@python_app(executors=["local_dev_testing"])
def part2(inputs=[], outputs=[], logger=None):
def reproject_ic(inputs=[], outputs=[], logging_file=None):
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.tasks.reproject_ic import reproject_ic

logger = configure_logger("task:part2", inputs[-1].filepath)
logger = configure_logger("task.reproject_ic", logging_file.filepath)

logger.info("Starting part2")
return 43
logger.info("Starting reproject_ic")
output = reproject_ic(logger=logger)
logger.warning("This is a slow step.")
return output


def workflow_runner():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
logging_file = File(os.path.join(os.getcwd(), f"kbmod_wf_{timestamp}.log"))
with parsl.load(dev_config()) as dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))

logger = parsl.set_file_logger(logging_file.filepath)

with parsl.load(dev_config()):
uri_list = File(os.path.join(os.getcwd(), "uri_list.txt"))

uri_to_ic_future = uri_to_ic(
inputs=[uri_list, logging_file],
outputs=[File(os.path.join(os.getcwd(), "ic.ecsv")), logging_file],
inputs=[uri_list],
outputs=[File(os.path.join(os.getcwd(), "ic.ecsv"))],
logging_file=logging_file,
)

part2_future = part2(
inputs=[logging_file],
outputs=[logging_file],
reproject_ic_future = reproject_ic(
inputs=[],
outputs=[],
logging_file=logging_file,
)

logger.warning("You are here")

print(uri_to_ic_future.result())
print(part2_future.result())
print(reproject_ic_future.result())

parsl.clear()

Expand Down

0 comments on commit 3c667f1

Please sign in to comment.