diff --git a/pyproject.toml b/pyproject.toml index 0c655e8..24e3046 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ + "parsl", # The primary workflow orchestration tool ] [project.urls] diff --git a/src/kbmod_wf/__init__.py b/src/kbmod_wf/__init__.py index b564b85..7c88a59 100644 --- a/src/kbmod_wf/__init__.py +++ b/src/kbmod_wf/__init__.py @@ -1,3 +1,4 @@ -from .example_module import greetings, meaning - -__all__ = ["greetings", "meaning"] +from .configurations.dev_configuration import dev_config +from .workflow import workflow_runner +from .configurations.klone_configuration import klone_config +from .utilities.logger_utilities import configure_logger diff --git a/src/kbmod_wf/configurations/__init__.py b/src/kbmod_wf/configurations/__init__.py new file mode 100644 index 0000000..2426833 --- /dev/null +++ b/src/kbmod_wf/configurations/__init__.py @@ -0,0 +1,4 @@ +from .dev_configuration import dev_config +from .klone_configuration import klone_config + +__all__ = ["dev_config", "klone_config"] diff --git a/src/kbmod_wf/configurations/dev_configuration.py b/src/kbmod_wf/configurations/dev_configuration.py new file mode 100644 index 0000000..5b95629 --- /dev/null +++ b/src/kbmod_wf/configurations/dev_configuration.py @@ -0,0 +1,14 @@ +from parsl import Config +from parsl.executors import ThreadPoolExecutor + + +def dev_config(): + return Config( + # run_dir='runinfo', # do some introspection here so that we can place the runinfo directory somewhere above src. + initialize_logging=False, + executors=[ + ThreadPoolExecutor( + label="local_dev_testing", + ) + ], + ) diff --git a/src/kbmod_wf/configurations/klone_configuration.py b/src/kbmod_wf/configurations/klone_configuration.py new file mode 100644 index 0000000..5fdf340 --- /dev/null +++ b/src/kbmod_wf/configurations/klone_configuration.py @@ -0,0 +1,30 @@ +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.providers import SlurmProvider + +walltimes = { + "compute-bigmem": "01:00:00", # change this to be appropriate +} + + +def klone_config(): + return Config( + executors=[ + HighThroughputExecutor( + label="small_cpu", + provider=SlurmProvider( + partition="compute-bigmem", + account="astro", + min_blocks=0, + max_blocks=4, + init_blocks=1, + parallelism=1, + nodes_per_block=1, + cores_per_node=1, # perhaps should be 8??? + mem_per_node=64, # In GB + exclusive=False, + walltime=walltimes["compute-bigmem"], + ), + ), + ] + ) diff --git a/src/kbmod_wf/example_module.py b/src/kbmod_wf/example_module.py deleted file mode 100644 index f76e837..0000000 --- a/src/kbmod_wf/example_module.py +++ /dev/null @@ -1,23 +0,0 @@ -"""An example module containing simplistic functions.""" - - -def greetings() -> str: - """A friendly greeting for a future friend. - - Returns - ------- - str - A typical greeting from a software engineer. - """ - return "Hello from LINCC-Frameworks!" - - -def meaning() -> int: - """The meaning of life, the universe, and everything. - - Returns - ------- - int - The meaning of life. - """ - return 42 diff --git a/src/kbmod_wf/runner.py b/src/kbmod_wf/runner.py new file mode 100644 index 0000000..2619ab2 --- /dev/null +++ b/src/kbmod_wf/runner.py @@ -0,0 +1 @@ +# this file will be what we call to run the workflows diff --git a/src/kbmod_wf/utilities/__init__.py b/src/kbmod_wf/utilities/__init__.py new file mode 100644 index 0000000..65f4a77 --- /dev/null +++ b/src/kbmod_wf/utilities/__init__.py @@ -0,0 +1,3 @@ +from .logger_utilities import configure_logger + +__all__ = ["configure_logger"] diff --git a/src/kbmod_wf/utilities/logger_utilities.py b/src/kbmod_wf/utilities/logger_utilities.py new file mode 100644 index 0000000..d0287b0 --- /dev/null +++ b/src/kbmod_wf/utilities/logger_utilities.py @@ -0,0 +1,24 @@ +DEFAULT_FORMAT = ( + "%(created)f %(asctime)s %(processName)s-%(process)d " + "%(threadName)s-%(thread)d %(name)s:%(lineno)d %(funcName)s %(levelname)s: " + "%(message)s" +) + + +def configure_logger(name, file_path): + """ + Simple function that will create a logger object and configure it to write + to a file at the specified path. + Note: We import logging within the function because we expect this to be + called within a parsl app.""" + + import logging + + logger = logging.getLogger(name) + handler = logging.FileHandler(file_path) + formatter = logging.Formatter(DEFAULT_FORMAT, datefmt="%Y-%m-%d %H:%M:%S") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + return logger diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py new file mode 100644 index 0000000..0e4f841 --- /dev/null +++ b/src/kbmod_wf/workflow.py @@ -0,0 +1,59 @@ +import os +import parsl +from datetime import datetime +from parsl import python_app, File +import parsl.executors + +from kbmod_wf.configurations import * + + +@python_app(executors=["local_dev_testing"]) +def uri_to_ic(inputs=[], outputs=[], logger=None): + from kbmod_wf.utilities.logger_utilities import configure_logger + + logger = configure_logger("task:uri_to_ic", inputs[-1].filepath) + + logger.info("Starting uri_to_ic") + logger.warning("You're the cool one.") + return 42 + + +@python_app(executors=["local_dev_testing"]) +def part2(inputs=[], outputs=[], logger=None): + from kbmod_wf.utilities.logger_utilities import configure_logger + + logger = configure_logger("task:part2", inputs[-1].filepath) + + logger.info("Starting part2") + return 43 + + +def workflow_runner(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + logging_file = File(os.path.join(os.getcwd(), f"kbmod_wf_{timestamp}.log")) + + logger = parsl.set_file_logger(logging_file.filepath) + + with parsl.load(dev_config()): + uri_list = File(os.path.join(os.getcwd(), "uri_list.txt")) + + uri_to_ic_future = uri_to_ic( + inputs=[uri_list, logging_file], + outputs=[File(os.path.join(os.getcwd(), "ic.ecsv")), logging_file], + ) + + part2_future = part2( + inputs=[logging_file], + outputs=[logging_file], + ) + + logger.warning("You are here") + + print(uri_to_ic_future.result()) + print(part2_future.result()) + + parsl.clear() + + +if __name__ == "__main__": + workflow_runner() diff --git a/tests/kbmod_wf/test_example_module.py b/tests/kbmod_wf/test_example_module.py deleted file mode 100644 index 9966e67..0000000 --- a/tests/kbmod_wf/test_example_module.py +++ /dev/null @@ -1,13 +0,0 @@ -from kbmod_wf import example_module - - -def test_greetings() -> None: - """Verify the output of the `greetings` function""" - output = example_module.greetings() - assert output == "Hello from LINCC-Frameworks!" - - -def test_meaning() -> None: - """Verify the output of the `meaning` function""" - output = example_module.meaning() - assert output == 42