From d3922fd09446d6460a52707873f23229fa20dc05 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Tue, 13 Aug 2024 14:30:06 -0700 Subject: [PATCH 1/7] Inital commit to create new IC-based workflow and introduce the butler. --- example_runtime_config.toml | 15 ++ src/kbmod_wf/ic_workflow.py | 261 ++++++++++++++++++++++ src/kbmod_wf/task_impls/ic_to_wu.py | 36 +-- src/kbmod_wf/task_impls/reproject_wu.py | 8 +- src/kbmod_wf/task_impls/uri_to_ic.py | 9 +- src/kbmod_wf/tno_workflow.py | 277 ++++++++++++++++++++++++ 6 files changed, 586 insertions(+), 20 deletions(-) create mode 100644 src/kbmod_wf/ic_workflow.py create mode 100644 src/kbmod_wf/tno_workflow.py diff --git a/example_runtime_config.toml b/example_runtime_config.toml index 4f8922d..e7e2383 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -13,17 +13,32 @@ checkpoint_mode = 'task_exit' # e.g. "/gscratch/dirac/kbmod/workflow/staging" staging_directory = "/home/drew/code/kbmod-wf/dev_staging" + +[apps.uri_to_ic] +# The path to the butler config file that instantiate a butler to retrieve images +butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml" + + [apps.ic_to_wu] # The path to the KBMOD search config file # e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml" search_config_filepath = "/home/drew/code/kbmod-wf/dev_staging/search_config.yaml" +# The path to the butler config file that instantiate a butler to retrieve images +butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml" + +# Remove a previously created WU file if it exists +overwrite = false + + [apps.reproject_wu] # Number of processors to use for parallelizing the reprojection n_workers = 32 + # The name of the observation site to use for reflex correction observation_site = "ctio" + [apps.kbmod_search] # The path to the KBMOD search config file # e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml" diff --git a/src/kbmod_wf/ic_workflow.py b/src/kbmod_wf/ic_workflow.py new file mode 100644 index 0000000..b0291b1 --- /dev/null +++ b/src/kbmod_wf/ic_workflow.py @@ -0,0 +1,261 @@ +import argparse +import os +import toml +import parsl +from parsl import python_app, File +import parsl.executors + +from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config +from kbmod_wf.utilities.executor_utilities import get_executors +from kbmod_wf.utilities.logger_utilities import configure_logger + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "local_thread"]), + ignore_for_cache=["logging_file"], +) +def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None): + """This app will go to a given directory, find all of the *.collection files there, + and copy the paths to a manifest file.""" + import glob + import os + from kbmod_wf.utilities.logger_utilities import configure_logger + + logger = configure_logger("task.create_manifest", logging_file.filepath) + + directory_path = runtime_config.get("staging_directory") + if directory_path is None: + raise ValueError("No staging_directory provided in the configuration.") + + logger.info(f"Looking for staged files in {directory_path}") + + # Gather all the *.lst entries in the directory + pattern = os.path.join(directory_path, "*.collection") + entries = glob.glob(pattern) + + # Filter out directories, keep only files + files = [] + for f in entries: + if os.path.isfile(os.path.join(directory_path, f)): + files.append(os.path.join(os.path.abspath(directory_path), f)) + + logger.info(f"Found {len(files)} files in {directory_path}") + + # Write the filenames to the manifest file + with open(outputs[0].filename, "w") as manifest_file: + for file in files: + manifest_file.write(file + "\n") + + return outputs[0] + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) +def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.uri_to_ic import uri_to_ic + + logger = configure_logger("task.uri_to_ic", logging_file.filepath) + + logger.info("Starting uri_to_ic") + try: + uri_to_ic( + uris_filepath=inputs[0].filepath, + uris_base_dir=None, # determine what, if any, value should be used. + ic_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running uri_to_ic: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed uri_to_ic") + + return outputs[0] + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] +) +def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.ic_to_wu import ic_to_wu + + logger = configure_logger("task.ic_to_wu", logging_file.filepath) + + logger.info("Starting ic_to_wu") + try: + ic_to_wu( + ic_filepath=inputs[0].filepath, + wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running ic_to_wu: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed ic_to_wu") + + return outputs[0] + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "sharded_reproject"]), + ignore_for_cache=["logging_file"], +) +def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.reproject_wu import reproject_wu + + logger = configure_logger("task.reproject_wu", logging_file.filepath) + + logger.info("Starting reproject_ic") + try: + reproject_wu( + original_wu_filepath=inputs[0].filepath, + uri_filepath=inputs[1].filepath, + reprojected_wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running reproject_ic: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed reproject_ic") + + return outputs[0] + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] +) +def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.kbmod_search import kbmod_search + + logger = configure_logger("task.kbmod_search", logging_file.filepath) + + logger.info("Starting kbmod_search") + try: + kbmod_search( + wu_filepath=inputs[0].filepath, + result_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running kbmod_search: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed kbmod_search") + + return outputs[0] + + +def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + with parsl.load(resource_config) as dfk: + logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) + logger = configure_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # gather all the *.collection files that are staged for processing + manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) + create_manifest_future = create_uri_manifest( + inputs=[], + outputs=[manifest_file], + runtime_config=app_configs.get("create_uri_manifest", {}), + logging_file=logging_file, + ) + + with open(create_manifest_future.result(), "r") as f: + # process each .collection file in the manifest into a .wu file + original_work_unit_futures = [] + collection_files = [] + for line in f: + collection_file = File(line.strip()) + collection_files.append(collection_file) + original_work_unit_futures.append( + ic_to_wu( + inputs=[collection_file], + outputs=[File(line + ".wu")], + runtime_config=app_configs.get("ic_to_wu", {}), + logging_file=logging_file, + ) + ) + + # reproject each WorkUnit + # For chip-by-chip, this isn't really necessary, so hardcoding to 0. + reproject_futures = [] + for f, collection_file in zip(original_work_unit_futures, collection_files): + distance = 0 + reproject_futures.append( + reproject_wu( + inputs=[f.result(), collection_file], + outputs=[File(f.result().filepath + f".{distance}.repro")], + runtime_config=app_configs.get("reproject_wu", {}), + logging_file=logging_file, + ) + ) + + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for f in reproject_futures: + search_futures.append( + kbmod_search( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".search.ecsv")], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + ) + + [f.result() for f in search_futures] + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) diff --git a/src/kbmod_wf/task_impls/ic_to_wu.py b/src/kbmod_wf/task_impls/ic_to_wu.py index 861e518..66559da 100644 --- a/src/kbmod_wf/task_impls/ic_to_wu.py +++ b/src/kbmod_wf/task_impls/ic_to_wu.py @@ -1,5 +1,6 @@ from kbmod import ImageCollection from kbmod.configuration import SearchConfiguration +from lsst.daf.butler import Butler import os import glob @@ -52,7 +53,6 @@ def __init__( self.search_config_filepath = self.runtime_config.get("search_config_filepath", None) def create_work_unit(self): - make_wu = True if len(glob.glob(self.wu_filepath)): if self.overwrite: self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.") @@ -60,21 +60,27 @@ def create_work_unit(self): else: make_wu = False - if make_wu: - ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv") - self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.") + ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv") + self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.") - last_time = time.time() - #! This needs the butler. - orig_wu = ic.toWorkUnit(search_config=SearchConfiguration.from_file(self.search_config_filepath)) - elapsed = round(time.time() - last_time, 1) - self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.") + last_time = time.time() + self.logger.info("Creating butler instance") + this_butler = Butler(self.runtime_config.get("butler_config_filepath", None)) + elapsed = round(time.time() - last_time, 1) + self.logger.debug(f"Required {elapsed}[s] to instantiate butler.") - self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}") - last_time = time.time() - directory_containing_shards, wu_filename = os.path.split(self.wu_filepath) - orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True) - elapsed = round(time.time() - last_time, 1) - self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}") + last_time = time.time() + orig_wu = ic.toWorkUnit( + search_config=SearchConfiguration.from_file(self.search_config_filepath), butler=this_butler + ) + elapsed = round(time.time() - last_time, 1) + self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.") + + self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}") + last_time = time.time() + directory_containing_shards, wu_filename = os.path.split(self.wu_filepath) + orig_wu.to_sharded_fits(wu_filename, directory_containing_shards, overwrite=True) + elapsed = round(time.time() - last_time, 1) + self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}") return self.wu_filepath diff --git a/src/kbmod_wf/task_impls/reproject_wu.py b/src/kbmod_wf/task_impls/reproject_wu.py index 7e0c072..97c28b5 100644 --- a/src/kbmod_wf/task_impls/reproject_wu.py +++ b/src/kbmod_wf/task_impls/reproject_wu.py @@ -74,13 +74,13 @@ def __init__( # Default to 8 workers if not in the config. Value must be 0 None: + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + with parsl.load(resource_config) as dfk: + logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) + logger = configure_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # gather all the .lst files that are staged for processing + manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) + create_uri_manifest_future = create_uri_manifest( + inputs=[], + outputs=[manifest_file], + runtime_config=app_configs.get("create_uri_manifest", {}), + logging_file=logging_file, + ) + + with open(create_uri_manifest_future.result(), "r") as f: + # process each .lst file in the manifest into a .ecvs file + uri_to_ic_futures = [] + uri_files = [] + for line in f: + uri_file = File(line.strip()) + uri_files.append(uri_file) + uri_to_ic_futures.append( + uri_to_ic( + inputs=[uri_file], + outputs=[File(line + ".ecsv")], + runtime_config=app_configs.get("uri_to_ic", {}), + logging_file=logging_file, + ) + ) + + # create an original WorkUnit for each .ecsv file + original_work_unit_futures = [] + for f in uri_to_ic_futures: + original_work_unit_futures.append( + ic_to_wu( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".wu")], + runtime_config=app_configs.get("ic_to_wu", {}), + logging_file=logging_file, + ) + ) + + # reproject each WorkUnit for a range of distances + reproject_futures = [] + for f, uri_file in zip(original_work_unit_futures, uri_files): + for distance in range(40, 60, 10): + reproject_futures.append( + reproject_wu( + inputs=[f.result(), uri_file], + outputs=[File(f.result().filepath + f".{distance}.repro")], + runtime_config=app_configs.get("reproject_wu", {}), + logging_file=logging_file, + ) + ) + + # run kbmod search on each reprojected WorkUnit + search_futures = [] + for f in reproject_futures: + search_futures.append( + kbmod_search( + inputs=[f.result()], + outputs=[File(f.result().filepath + ".search.ecsv")], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + ) + + [f.result() for f in search_futures] + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) From dfe1280226b481d80b6275e38dd6e757d6a7031e Mon Sep 17 00:00:00 2001 From: Drew Oldag Date: Wed, 14 Aug 2024 15:35:32 -0700 Subject: [PATCH 2/7] Making separate method to handle single chip/single night data that uses find_optimal_celestial_wcs. --- src/kbmod_wf/task_impls/__init__.py | 1 - ...=> reproject_multi_chip_multi_night_wu.py} | 0 .../reproject_single_chip_single_night_wu.py | 96 +++++++++++++++++++ src/kbmod_wf/tno_workflow.py | 2 +- src/kbmod_wf/workflow.py | 3 +- 5 files changed, 98 insertions(+), 4 deletions(-) rename src/kbmod_wf/task_impls/{reproject_wu.py => reproject_multi_chip_multi_night_wu.py} (100%) create mode 100644 src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py diff --git a/src/kbmod_wf/task_impls/__init__.py b/src/kbmod_wf/task_impls/__init__.py index b48e4f3..eef1ad2 100644 --- a/src/kbmod_wf/task_impls/__init__.py +++ b/src/kbmod_wf/task_impls/__init__.py @@ -1,6 +1,5 @@ from .ic_to_wu import ic_to_wu from .kbmod_search import kbmod_search -from .reproject_wu import reproject_wu from .uri_to_ic import uri_to_ic __all__ = [ diff --git a/src/kbmod_wf/task_impls/reproject_wu.py b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py similarity index 100% rename from src/kbmod_wf/task_impls/reproject_wu.py rename to src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py new file mode 100644 index 0000000..a159268 --- /dev/null +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py @@ -0,0 +1,96 @@ +from kbmod.work_unit import WorkUnit + +import kbmod.reprojection as reprojection + +from astropy.reproject.mosaicking import find_optimal_celestial_wcs +import os +import time +from logging import Logger + + +def reproject_wu( + original_wu_filepath: str = None, + reprojected_wu_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, +): + """This task will perform reflex correction and reproject a WorkUnit to a common WCS. + + Parameters + ---------- + original_wu_filepath : str, optional + The fully resolved filepath to the input WorkUnit file, by default None + uri_filepath : str, optional + The fully resolved filepath to the original uri file. This is used + exclusively for the header contents, by default None + reprojected_wu_filepath : str, optional + The fully resolved filepath to the resulting WorkUnit file after reflex + and reprojection, by default None + runtime_config : dict, optional + Additional configuration parameters to be used at runtime, by default {} + logger : Logger, optional + Primary logger for the workflow, by default None + + Returns + ------- + str + The fully resolved filepath of the resulting WorkUnit file after reflex + and reprojection. + """ + wu_reprojector = WUReprojector( + original_wu_filepath=original_wu_filepath, + reprojected_wu_filepath=reprojected_wu_filepath, + runtime_config=runtime_config, + logger=logger, + ) + + return wu_reprojector.reproject_workunit() + + +class WUReprojector: + def __init__( + self, + original_wu_filepath: str = None, + reprojected_wu_filepath: str = None, + runtime_config: dict = {}, + logger: Logger = None, + ): + self.original_wu_filepath = original_wu_filepath + self.reprojected_wu_filepath = reprojected_wu_filepath + self.runtime_config = runtime_config + self.logger = logger + + # Default to 8 workers if not in the config. Value must be 0 Date: Wed, 14 Aug 2024 15:38:31 -0700 Subject: [PATCH 3/7] Updated the wrong workflow in the previous commit. Applying them to the correct one here. --- src/kbmod_wf/ic_workflow.py | 3 +-- src/kbmod_wf/workflow.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kbmod_wf/ic_workflow.py b/src/kbmod_wf/ic_workflow.py index b0291b1..ad229b0 100644 --- a/src/kbmod_wf/ic_workflow.py +++ b/src/kbmod_wf/ic_workflow.py @@ -113,7 +113,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.reproject_wu import reproject_wu + from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu logger = configure_logger("task.reproject_wu", logging_file.filepath) @@ -121,7 +121,6 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): try: reproject_wu( original_wu_filepath=inputs[0].filepath, - uri_filepath=inputs[1].filepath, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index fd4bca6..1ee09fa 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -113,7 +113,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu + from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu logger = configure_logger("task.reproject_wu", logging_file.filepath) @@ -121,6 +121,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): try: reproject_wu( original_wu_filepath=inputs[0].filepath, + uri_filepath=inputs[1].filepath, reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, From 0a3db4d9f42a3f8aacfcf350041452c276ba9267 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 15 Aug 2024 14:26:34 -0700 Subject: [PATCH 4/7] PR feedback. Remove bulter for uri_to_ic, []->(), clean up comments and function names. --- example_runtime_config.toml | 2 +- src/kbmod_wf/ic_workflow.py | 54 +++++++------------ .../reproject_single_chip_single_night_wu.py | 19 ++++--- src/kbmod_wf/task_impls/uri_to_ic.py | 7 +-- src/kbmod_wf/tno_workflow.py | 30 +++++++---- src/kbmod_wf/workflow.py | 30 +++++++---- 6 files changed, 70 insertions(+), 72 deletions(-) diff --git a/example_runtime_config.toml b/example_runtime_config.toml index e7e2383..32daff7 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -8,7 +8,7 @@ checkpoint_mode = 'task_exit' # Values in the apps.XXX section will be passed as a dictionary to the corresponding # app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app. -[apps.create_uri_manifest] +[apps.create_manifest] # The path to the staging directory # e.g. "/gscratch/dirac/kbmod/workflow/staging" staging_directory = "/home/drew/code/kbmod-wf/dev_staging" diff --git a/src/kbmod_wf/ic_workflow.py b/src/kbmod_wf/ic_workflow.py index ad229b0..4da2c83 100644 --- a/src/kbmod_wf/ic_workflow.py +++ b/src/kbmod_wf/ic_workflow.py @@ -15,7 +15,7 @@ executors=get_executors(["local_dev_testing", "local_thread"]), ignore_for_cache=["logging_file"], ) -def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): """This app will go to a given directory, find all of the *.collection files there, and copy the paths to a manifest file.""" import glob @@ -30,7 +30,7 @@ def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=N logger.info(f"Looking for staged files in {directory_path}") - # Gather all the *.lst entries in the directory + # Gather all the *.collection entries in the directory pattern = os.path.join(directory_path, "*.collection") entries = glob.glob(pattern) @@ -50,38 +50,10 @@ def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=N return outputs[0] -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] -) -def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.uri_to_ic import uri_to_ic - - logger = configure_logger("task.uri_to_ic", logging_file.filepath) - - logger.info("Starting uri_to_ic") - try: - uri_to_ic( - uris_filepath=inputs[0].filepath, - uris_base_dir=None, # determine what, if any, value should be used. - ic_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running uri_to_ic: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed uri_to_ic") - - return outputs[0] - - @python_app( cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] ) -def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.ic_to_wu import ic_to_wu @@ -110,7 +82,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): executors=get_executors(["local_dev_testing", "sharded_reproject"]), ignore_for_cache=["logging_file"], ) -def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu @@ -137,7 +109,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] ) -def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.kbmod_search import kbmod_search @@ -161,7 +133,17 @@ def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): return outputs[0] -def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ resource_config = get_resource_config(env=env) resource_config = apply_runtime_updates(resource_config, runtime_config) @@ -178,10 +160,10 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: # gather all the *.collection files that are staged for processing manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) - create_manifest_future = create_uri_manifest( + create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], - runtime_config=app_configs.get("create_uri_manifest", {}), + runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py index a159268..96604ad 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu.py @@ -14,18 +14,15 @@ def reproject_wu( runtime_config: dict = {}, logger: Logger = None, ): - """This task will perform reflex correction and reproject a WorkUnit to a common WCS. + """This task will reproject a WorkUnit to a common WCS. Parameters ---------- original_wu_filepath : str, optional The fully resolved filepath to the input WorkUnit file, by default None - uri_filepath : str, optional - The fully resolved filepath to the original uri file. This is used - exclusively for the header contents, by default None reprojected_wu_filepath : str, optional - The fully resolved filepath to the resulting WorkUnit file after reflex - and reprojection, by default None + The fully resolved filepath to the resulting WorkUnit file after + reprojection, by default None runtime_config : dict, optional Additional configuration parameters to be used at runtime, by default {} logger : Logger, optional @@ -81,13 +78,15 @@ def reproject_workunit(self): self.logger.debug(f"Reprojecting WorkUnit with {self.n_workers} workers...") last_time = time.time() - opt_wcs, _ = find_optimal_celestial_wcs(list(wu._per_image_wcs)) - reprojection.reproject_lazy_work_unit( + opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs)) + opt_wcs.array_shape = shape + reprojection.reproject_work_unit( wu, opt_wcs, - directory_containing_reprojected_shards, - reprojected_wu_filename, max_parallel_processes=self.n_workers, + write_output=True, + directory=directory_containing_reprojected_shards, + filename=reprojected_wu_filename, ) elapsed = round(time.time() - last_time, 1) diff --git a/src/kbmod_wf/task_impls/uri_to_ic.py b/src/kbmod_wf/task_impls/uri_to_ic.py index cb7faef..c1a3caf 100644 --- a/src/kbmod_wf/task_impls/uri_to_ic.py +++ b/src/kbmod_wf/task_impls/uri_to_ic.py @@ -84,12 +84,9 @@ def uri_to_ic( logger.info("Creating ImageCollection") # Create an ImageCollection object from the list of URIs last_time = time.time() - logger.info("Creating butler instance") - this_butler = Butler(runtime_config.get("butler_config_filepath", None)) + ic = ImageCollection.fromTargets(uris) elapsed = round(time.time() - last_time, 1) - logger.debug(f"Required {elapsed}[s] to instantiate butler.") - - ic = ImageCollection.fromTargets(uris, butler=this_butler) + logger.debug(f"Required {elapsed}[s] to create ImageCollection.") logger.info(f"Writing ImageCollection to file {ic_filepath}") ic.write(ic_filepath, format="ascii.ecsv") diff --git a/src/kbmod_wf/tno_workflow.py b/src/kbmod_wf/tno_workflow.py index a3acef6..71947a4 100644 --- a/src/kbmod_wf/tno_workflow.py +++ b/src/kbmod_wf/tno_workflow.py @@ -20,14 +20,14 @@ executors=get_executors(["local_dev_testing", "local_thread"]), ignore_for_cache=["logging_file"], ) -def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): """This app will go to a given directory, find all of the uri.lst files there, and copy the paths to the manifest file.""" import glob import os from kbmod_wf.utilities.logger_utilities import configure_logger - logger = configure_logger("task.create_uri_manifest", logging_file.filepath) + logger = configure_logger("task.create_manifest", logging_file.filepath) directory_path = runtime_config.get("staging_directory") if directory_path is None: @@ -58,7 +58,7 @@ def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=N @python_app( cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] ) -def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.uri_to_ic import uri_to_ic @@ -86,7 +86,7 @@ def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] ) -def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.ic_to_wu import ic_to_wu @@ -115,7 +115,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): executors=get_executors(["local_dev_testing", "sharded_reproject"]), ignore_for_cache=["logging_file"], ) -def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu @@ -143,7 +143,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] ) -def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.kbmod_search import kbmod_search @@ -167,7 +167,17 @@ def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): return outputs[0] -def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ resource_config = get_resource_config(env=env) resource_config = apply_runtime_updates(resource_config, runtime_config) @@ -184,14 +194,14 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: # gather all the .lst files that are staged for processing manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) - create_uri_manifest_future = create_uri_manifest( + create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], - runtime_config=app_configs.get("create_uri_manifest", {}), + runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) - with open(create_uri_manifest_future.result(), "r") as f: + with open(create_manifest_future.result(), "r") as f: # process each .lst file in the manifest into a .ecvs file uri_to_ic_futures = [] uri_files = [] diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index 1ee09fa..88cdd84 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -15,14 +15,14 @@ executors=get_executors(["local_dev_testing", "local_thread"]), ignore_for_cache=["logging_file"], ) -def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): """This app will go to a given directory, find all of the uri.lst files there, and copy the paths to the manifest file.""" import glob import os from kbmod_wf.utilities.logger_utilities import configure_logger - logger = configure_logger("task.create_uri_manifest", logging_file.filepath) + logger = configure_logger("task.create_manifest", logging_file.filepath) directory_path = runtime_config.get("staging_directory") if directory_path is None: @@ -53,7 +53,7 @@ def create_uri_manifest(inputs=[], outputs=[], runtime_config={}, logging_file=N @python_app( cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] ) -def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.uri_to_ic import uri_to_ic @@ -81,7 +81,7 @@ def uri_to_ic(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] ) -def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.ic_to_wu import ic_to_wu @@ -110,7 +110,7 @@ def ic_to_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): executors=get_executors(["local_dev_testing", "sharded_reproject"]), ignore_for_cache=["logging_file"], ) -def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu @@ -138,7 +138,7 @@ def reproject_wu(inputs=[], outputs=[], runtime_config={}, logging_file=None): @python_app( cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] ) -def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): +def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.kbmod_search import kbmod_search @@ -162,7 +162,17 @@ def kbmod_search(inputs=[], outputs=[], runtime_config={}, logging_file=None): return outputs[0] -def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ resource_config = get_resource_config(env=env) resource_config = apply_runtime_updates(resource_config, runtime_config) @@ -179,14 +189,14 @@ def workflow_runner(env: str = None, runtime_config: dict = {}) -> None: # gather all the .lst files that are staged for processing manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) - create_uri_manifest_future = create_uri_manifest( + create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], - runtime_config=app_configs.get("create_uri_manifest", {}), + runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) - with open(create_uri_manifest_future.result(), "r") as f: + with open(create_manifest_future.result(), "r") as f: # process each .lst file in the manifest into a .ecvs file uri_to_ic_futures = [] uri_files = [] From 3e9cfe38df55b5464d657c366d2195d66923df43 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:58:14 -0700 Subject: [PATCH 5/7] Moved workflow task definitions into separate module for reuse. --- example_runtime_config.toml | 1 + ...ic_workflow.py => single_chip_workflow.py} | 102 +---------- src/kbmod_wf/task_impls/uri_to_ic.py | 1 - src/kbmod_wf/tno_workflow.py | 160 +----------------- src/kbmod_wf/workflow.py | 160 +----------------- src/kbmod_wf/workflow_tasks/__init__.py | 7 + .../workflow_tasks/create_manifest.py | 81 +++++++++ src/kbmod_wf/workflow_tasks/ic_to_wu.py | 29 ++++ src/kbmod_wf/workflow_tasks/kbmod_search.py | 29 ++++ src/kbmod_wf/workflow_tasks/reproject_wu.py | 31 ++++ src/kbmod_wf/workflow_tasks/uri_to_ic.py | 30 ++++ 11 files changed, 228 insertions(+), 403 deletions(-) rename src/kbmod_wf/{ic_workflow.py => single_chip_workflow.py} (61%) create mode 100644 src/kbmod_wf/workflow_tasks/__init__.py create mode 100644 src/kbmod_wf/workflow_tasks/create_manifest.py create mode 100644 src/kbmod_wf/workflow_tasks/ic_to_wu.py create mode 100644 src/kbmod_wf/workflow_tasks/kbmod_search.py create mode 100644 src/kbmod_wf/workflow_tasks/reproject_wu.py create mode 100644 src/kbmod_wf/workflow_tasks/uri_to_ic.py diff --git a/example_runtime_config.toml b/example_runtime_config.toml index 32daff7..b2aae7e 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -12,6 +12,7 @@ checkpoint_mode = 'task_exit' # The path to the staging directory # e.g. "/gscratch/dirac/kbmod/workflow/staging" staging_directory = "/home/drew/code/kbmod-wf/dev_staging" +output_directory = "/home/drew/code/kbmod-wf/dev_staging/single_chip_workflow" [apps.uri_to_ic] diff --git a/src/kbmod_wf/ic_workflow.py b/src/kbmod_wf/single_chip_workflow.py similarity index 61% rename from src/kbmod_wf/ic_workflow.py rename to src/kbmod_wf/single_chip_workflow.py index 4da2c83..2a1e08b 100644 --- a/src/kbmod_wf/ic_workflow.py +++ b/src/kbmod_wf/single_chip_workflow.py @@ -9,74 +9,12 @@ from kbmod_wf.utilities.executor_utilities import get_executors from kbmod_wf.utilities.logger_utilities import configure_logger - -@python_app( - cache=True, - executors=get_executors(["local_dev_testing", "local_thread"]), - ignore_for_cache=["logging_file"], -) -def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): - """This app will go to a given directory, find all of the *.collection files there, - and copy the paths to a manifest file.""" - import glob - import os - from kbmod_wf.utilities.logger_utilities import configure_logger - - logger = configure_logger("task.create_manifest", logging_file.filepath) - - directory_path = runtime_config.get("staging_directory") - if directory_path is None: - raise ValueError("No staging_directory provided in the configuration.") - - logger.info(f"Looking for staged files in {directory_path}") - - # Gather all the *.collection entries in the directory - pattern = os.path.join(directory_path, "*.collection") - entries = glob.glob(pattern) - - # Filter out directories, keep only files - files = [] - for f in entries: - if os.path.isfile(os.path.join(directory_path, f)): - files.append(os.path.join(os.path.abspath(directory_path), f)) - - logger.info(f"Found {len(files)} files in {directory_path}") - - # Write the filenames to the manifest file - with open(outputs[0].filename, "w") as manifest_file: - for file in files: - manifest_file.write(file + "\n") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] -) -def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.ic_to_wu import ic_to_wu - - logger = configure_logger("task.ic_to_wu", logging_file.filepath) - - logger.info("Starting ic_to_wu") - try: - ic_to_wu( - ic_filepath=inputs[0].filepath, - wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running ic_to_wu: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed ic_to_wu") - - return outputs[0] +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search +# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu +# that should be refactored. +# The only difference is the import of reproject_single_chip_single_night_wu here. @python_app( cache=True, executors=get_executors(["local_dev_testing", "sharded_reproject"]), @@ -106,33 +44,6 @@ def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): return outputs[0] -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] -) -def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.kbmod_search import kbmod_search - - logger = configure_logger("task.kbmod_search", logging_file.filepath) - - logger.info("Starting kbmod_search") - try: - kbmod_search( - wu_filepath=inputs[0].filepath, - result_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running kbmod_search: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed kbmod_search") - - return outputs[0] - - def workflow_runner(env=None, runtime_config={}): """This function will load and configure Parsl, and run the workflow. @@ -159,7 +70,10 @@ def workflow_runner(env=None, runtime_config={}): logger.info("Starting workflow") # gather all the *.collection files that are staged for processing - manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], diff --git a/src/kbmod_wf/task_impls/uri_to_ic.py b/src/kbmod_wf/task_impls/uri_to_ic.py index c1a3caf..4e5e692 100644 --- a/src/kbmod_wf/task_impls/uri_to_ic.py +++ b/src/kbmod_wf/task_impls/uri_to_ic.py @@ -3,7 +3,6 @@ import time from logging import Logger from kbmod import ImageCollection -from lsst.daf.butler import Butler #! I believe that we can remove the `uris_base_dir` parameter from the function diff --git a/src/kbmod_wf/tno_workflow.py b/src/kbmod_wf/tno_workflow.py index 71947a4..87f13b1 100644 --- a/src/kbmod_wf/tno_workflow.py +++ b/src/kbmod_wf/tno_workflow.py @@ -7,164 +7,13 @@ import os import toml import parsl -from parsl import python_app, File +from parsl import File import parsl.executors from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config -from kbmod_wf.utilities.executor_utilities import get_executors from kbmod_wf.utilities.logger_utilities import configure_logger - -@python_app( - cache=True, - executors=get_executors(["local_dev_testing", "local_thread"]), - ignore_for_cache=["logging_file"], -) -def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): - """This app will go to a given directory, find all of the uri.lst files there, - and copy the paths to the manifest file.""" - import glob - import os - from kbmod_wf.utilities.logger_utilities import configure_logger - - logger = configure_logger("task.create_manifest", logging_file.filepath) - - directory_path = runtime_config.get("staging_directory") - if directory_path is None: - raise ValueError("No staging_directory provided in the configuration.") - - logger.info(f"Looking for staged files in {directory_path}") - - # Gather all the *.lst entries in the directory - pattern = os.path.join(directory_path, "*.lst") - entries = glob.glob(pattern) - - # Filter out directories, keep only files - files = [] - for f in entries: - if os.path.isfile(os.path.join(directory_path, f)): - files.append(os.path.join(os.path.abspath(directory_path), f)) - - logger.info(f"Found {len(files)} files in {directory_path}") - - # Write the filenames to the manifest file - with open(outputs[0].filename, "w") as manifest_file: - for file in files: - manifest_file.write(file + "\n") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] -) -def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.uri_to_ic import uri_to_ic - - logger = configure_logger("task.uri_to_ic", logging_file.filepath) - - logger.info("Starting uri_to_ic") - try: - uri_to_ic( - uris_filepath=inputs[0].filepath, - uris_base_dir=None, # determine what, if any, value should be used. - ic_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running uri_to_ic: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed uri_to_ic") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] -) -def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.ic_to_wu import ic_to_wu - - logger = configure_logger("task.ic_to_wu", logging_file.filepath) - - logger.info("Starting ic_to_wu") - try: - ic_to_wu( - ic_filepath=inputs[0].filepath, - wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running ic_to_wu: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed ic_to_wu") - - return outputs[0] - - -@python_app( - cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), - ignore_for_cache=["logging_file"], -) -def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu - - logger = configure_logger("task.reproject_wu", logging_file.filepath) - - logger.info("Starting reproject_ic") - try: - reproject_wu( - original_wu_filepath=inputs[0].filepath, - uri_filepath=inputs[1].filepath, - reprojected_wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running reproject_ic: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed reproject_ic") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] -) -def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.kbmod_search import kbmod_search - - logger = configure_logger("task.kbmod_search", logging_file.filepath) - - logger.info("Starting kbmod_search") - try: - kbmod_search( - wu_filepath=inputs[0].filepath, - result_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running kbmod_search: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed kbmod_search") - - return outputs[0] +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search, reproject_wu, uri_to_ic def workflow_runner(env=None, runtime_config={}): @@ -193,7 +42,10 @@ def workflow_runner(env=None, runtime_config={}): logger.info("Starting workflow") # gather all the .lst files that are staged for processing - manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index 88cdd84..b258e2c 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -2,164 +2,13 @@ import os import toml import parsl -from parsl import python_app, File +from parsl import File import parsl.executors from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config -from kbmod_wf.utilities.executor_utilities import get_executors from kbmod_wf.utilities.logger_utilities import configure_logger - -@python_app( - cache=True, - executors=get_executors(["local_dev_testing", "local_thread"]), - ignore_for_cache=["logging_file"], -) -def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): - """This app will go to a given directory, find all of the uri.lst files there, - and copy the paths to the manifest file.""" - import glob - import os - from kbmod_wf.utilities.logger_utilities import configure_logger - - logger = configure_logger("task.create_manifest", logging_file.filepath) - - directory_path = runtime_config.get("staging_directory") - if directory_path is None: - raise ValueError("No staging_directory provided in the configuration.") - - logger.info(f"Looking for staged files in {directory_path}") - - # Gather all the *.lst entries in the directory - pattern = os.path.join(directory_path, "*.lst") - entries = glob.glob(pattern) - - # Filter out directories, keep only files - files = [] - for f in entries: - if os.path.isfile(os.path.join(directory_path, f)): - files.append(os.path.join(os.path.abspath(directory_path), f)) - - logger.info(f"Found {len(files)} files in {directory_path}") - - # Write the filenames to the manifest file - with open(outputs[0].filename, "w") as manifest_file: - for file in files: - manifest_file.write(file + "\n") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] -) -def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.uri_to_ic import uri_to_ic - - logger = configure_logger("task.uri_to_ic", logging_file.filepath) - - logger.info("Starting uri_to_ic") - try: - uri_to_ic( - uris_filepath=inputs[0].filepath, - uris_base_dir=None, # determine what, if any, value should be used. - ic_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running uri_to_ic: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed uri_to_ic") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] -) -def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.ic_to_wu import ic_to_wu - - logger = configure_logger("task.ic_to_wu", logging_file.filepath) - - logger.info("Starting ic_to_wu") - try: - ic_to_wu( - ic_filepath=inputs[0].filepath, - wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running ic_to_wu: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed ic_to_wu") - - return outputs[0] - - -@python_app( - cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), - ignore_for_cache=["logging_file"], -) -def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu - - logger = configure_logger("task.reproject_wu", logging_file.filepath) - - logger.info("Starting reproject_ic") - try: - reproject_wu( - original_wu_filepath=inputs[0].filepath, - uri_filepath=inputs[1].filepath, - reprojected_wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running reproject_ic: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed reproject_ic") - - return outputs[0] - - -@python_app( - cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] -) -def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): - import traceback - from kbmod_wf.utilities.logger_utilities import configure_logger - from kbmod_wf.task_impls.kbmod_search import kbmod_search - - logger = configure_logger("task.kbmod_search", logging_file.filepath) - - logger.info("Starting kbmod_search") - try: - kbmod_search( - wu_filepath=inputs[0].filepath, - result_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - except Exception as e: - logger.error(f"Error running kbmod_search: {e}") - logger.error(traceback.format_exc()) - raise e - logger.warning("Completed kbmod_search") - - return outputs[0] +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search, reproject_wu, uri_to_ic def workflow_runner(env=None, runtime_config={}): @@ -188,7 +37,10 @@ def workflow_runner(env=None, runtime_config={}): logger.info("Starting workflow") # gather all the .lst files that are staged for processing - manifest_file = File(os.path.join(os.getcwd(), "manifest.txt")) + create_manifest_config = app_configs.get("create_manifest", {}) + manifest_file = File( + os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + ) create_manifest_future = create_manifest( inputs=[], outputs=[manifest_file], diff --git a/src/kbmod_wf/workflow_tasks/__init__.py b/src/kbmod_wf/workflow_tasks/__init__.py new file mode 100644 index 0000000..c5058ad --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/__init__.py @@ -0,0 +1,7 @@ +from .create_manifest import create_manifest +from .ic_to_wu import ic_to_wu +from .kbmod_search import kbmod_search +from .reproject_wu import reproject_wu +from .uri_to_ic import uri_to_ic + +__all__ = ["create_manifest", "ic_to_wu", "kbmod_search", "reproject_wu", "uri_to_ic"] diff --git a/src/kbmod_wf/workflow_tasks/create_manifest.py b/src/kbmod_wf/workflow_tasks/create_manifest.py new file mode 100644 index 0000000..89e6755 --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/create_manifest.py @@ -0,0 +1,81 @@ +from parsl import python_app +from kbmod_wf.utilities.executor_utilities import get_executors + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "local_thread"]), + ignore_for_cache=["logging_file"], +) +def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """This app will go to a given directory, find all of the *.collection files there, + and copy the paths to a manifest file. + + Parameters + ---------- + inputs : tuple, optional + No inputs required, by default () + outputs : tuple, optional + Currently expects an iterable with 1 element - a parsl.File object that + specifies where the manifest file will be written, by default () + runtime_config : dict, optional + A dictionary of configuration setting specific to this task, by default {} + logging_file : parsl.File, optional + The parsl.File object the defines where the logs are written, by default None + + Returns + ------- + parsl.File + The file object that points to the manifest file that was created. + + Raises + ------ + ValueError + If the staging_directory is not provided in the runtime_config. + """ + + import glob + import os + import shutil + from kbmod_wf.utilities.logger_utilities import configure_logger + + logger = configure_logger("task.create_manifest", logging_file.filepath) + + directory_path = runtime_config.get("staging_directory") + output_path = runtime_config.get("output_directory") + + if directory_path is None: + logger.error(f"No staging_directory provided in the configuration.") + raise ValueError("No staging_directory provided in the configuration.") + + if output_path is None: + logger.info( + f"No output_directory provided in the configuration. Using staging directory: {directory_path}" + ) + output_path = directory_path + + if not os.path.exists(output_path): + logger.info(f"Creating output directory: {output_path}") + os.makedirs(output_path) + + logger.info(f"Looking for staged files in {directory_path}") + + # Gather all the *.collection entries in the directory + pattern = os.path.join(directory_path, "*.collection") + entries = glob.glob(pattern) + + # Filter out directories, keep only files + # Copy files to the output directory, and adds them to the list of files + files = [] + for f in entries: + if os.path.isfile(os.path.join(directory_path, f)): + files.append(shutil.copy2(f, output_path)) + + logger.info(f"Found {len(files)} files in {directory_path}") + + # Write the filenames to the manifest file + with open(outputs[0].filename, "w") as manifest_file: + for file in files: + manifest_file.write(file + "\n") + + return outputs[0] diff --git a/src/kbmod_wf/workflow_tasks/ic_to_wu.py b/src/kbmod_wf/workflow_tasks/ic_to_wu.py new file mode 100644 index 0000000..76536c3 --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/ic_to_wu.py @@ -0,0 +1,29 @@ +from parsl import python_app +from kbmod_wf.utilities.executor_utilities import get_executors + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] +) +def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.ic_to_wu import ic_to_wu + + logger = configure_logger("task.ic_to_wu", logging_file.filepath) + + logger.info("Starting ic_to_wu") + try: + ic_to_wu( + ic_filepath=inputs[0].filepath, + wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running ic_to_wu: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed ic_to_wu") + + return outputs[0] diff --git a/src/kbmod_wf/workflow_tasks/kbmod_search.py b/src/kbmod_wf/workflow_tasks/kbmod_search.py new file mode 100644 index 0000000..e01b772 --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/kbmod_search.py @@ -0,0 +1,29 @@ +from parsl import python_app +from kbmod_wf.utilities.executor_utilities import get_executors + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] +) +def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.kbmod_search import kbmod_search + + logger = configure_logger("task.kbmod_search", logging_file.filepath) + + logger.info("Starting kbmod_search") + try: + kbmod_search( + wu_filepath=inputs[0].filepath, + result_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running kbmod_search: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed kbmod_search") + + return outputs[0] diff --git a/src/kbmod_wf/workflow_tasks/reproject_wu.py b/src/kbmod_wf/workflow_tasks/reproject_wu.py new file mode 100644 index 0000000..bdf374b --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/reproject_wu.py @@ -0,0 +1,31 @@ +from parsl import python_app +from kbmod_wf.utilities.executor_utilities import get_executors + + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "sharded_reproject"]), + ignore_for_cache=["logging_file"], +) +def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu + + logger = configure_logger("task.reproject_wu", logging_file.filepath) + + logger.info("Starting reproject_ic") + try: + reproject_wu( + original_wu_filepath=inputs[0].filepath, + reprojected_wu_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running reproject_ic: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed reproject_ic") + + return outputs[0] diff --git a/src/kbmod_wf/workflow_tasks/uri_to_ic.py b/src/kbmod_wf/workflow_tasks/uri_to_ic.py new file mode 100644 index 0000000..a58ef3b --- /dev/null +++ b/src/kbmod_wf/workflow_tasks/uri_to_ic.py @@ -0,0 +1,30 @@ +from parsl import python_app +from kbmod_wf.utilities.executor_utilities import get_executors + + +@python_app( + cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] +) +def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): + import traceback + from kbmod_wf.utilities.logger_utilities import configure_logger + from kbmod_wf.task_impls.uri_to_ic import uri_to_ic + + logger = configure_logger("task.uri_to_ic", logging_file.filepath) + + logger.info("Starting uri_to_ic") + try: + uri_to_ic( + uris_filepath=inputs[0].filepath, + uris_base_dir=None, # determine what, if any, value should be used. + ic_filepath=outputs[0].filepath, + runtime_config=runtime_config, + logger=logger, + ) + except Exception as e: + logger.error(f"Error running uri_to_ic: {e}") + logger.error(traceback.format_exc()) + raise e + logger.warning("Completed uri_to_ic") + + return outputs[0] From c2af2aa3b436f3b352a5dc088862c9b4bfe27661 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Fri, 16 Aug 2024 09:10:49 -0700 Subject: [PATCH 6/7] Adding docstrings to the remaining workflow_tasks. --- src/kbmod_wf/workflow_tasks/ic_to_wu.py | 27 +++++++++++++++++++++ src/kbmod_wf/workflow_tasks/kbmod_search.py | 26 ++++++++++++++++++++ src/kbmod_wf/workflow_tasks/reproject_wu.py | 27 +++++++++++++++++++++ src/kbmod_wf/workflow_tasks/uri_to_ic.py | 27 +++++++++++++++++++++ 4 files changed, 107 insertions(+) diff --git a/src/kbmod_wf/workflow_tasks/ic_to_wu.py b/src/kbmod_wf/workflow_tasks/ic_to_wu.py index 76536c3..cc62b0c 100644 --- a/src/kbmod_wf/workflow_tasks/ic_to_wu.py +++ b/src/kbmod_wf/workflow_tasks/ic_to_wu.py @@ -6,6 +6,33 @@ cache=True, executors=get_executors(["local_dev_testing", "large_mem"]), ignore_for_cache=["logging_file"] ) def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """This app will call the ic_to_wu function to convert a given ImageCollection + file into a WorkUnit file. + + Parameters + ---------- + inputs : tuple, optional + A tuple with a single parsl.File object that references the ImageCollection + file, by default () + outputs : tuple, optional + A tuple with a single parsl.File object that references the output WorkUnit + file, by default () + runtime_config : dict, optional + A dictionary of configuration setting specific to this task, by default {} + logging_file : parsl.File, optional + The parsl.File object the defines where the logs are written, by default None + + Returns + ------- + parsl.File + The file object that points to the WorkUnit file that was created. + + Raises + ------ + e + Reraises any exceptions that occur during the execution of the ic_to_wu + function. + """ import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.ic_to_wu import ic_to_wu diff --git a/src/kbmod_wf/workflow_tasks/kbmod_search.py b/src/kbmod_wf/workflow_tasks/kbmod_search.py index e01b772..f0d6106 100644 --- a/src/kbmod_wf/workflow_tasks/kbmod_search.py +++ b/src/kbmod_wf/workflow_tasks/kbmod_search.py @@ -6,6 +6,32 @@ cache=True, executors=get_executors(["local_dev_testing", "gpu"]), ignore_for_cache=["logging_file"] ) def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """This app will call the kbmod_search function for a given WorkUnit file. + + Parameters + ---------- + inputs : tuple, optional + A tuple with a single parsl.File object that references the WorkUnit file + to be searched, by default () + outputs : tuple, optional + A tuple with a single parsl.File object that references the results output + file, by default () + runtime_config : dict, optional + A dictionary of configuration setting specific to this task, by default {} + logging_file : parsl.File, optional + The parsl.File object the defines where the logs are written, by default None + + Returns + ------- + parsl.File + The file object that points to the search results file that was created. + + Raises + ------ + e + Reraises any exceptions that occur during the execution of the kbmod_search + function. + """ import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.kbmod_search import kbmod_search diff --git a/src/kbmod_wf/workflow_tasks/reproject_wu.py b/src/kbmod_wf/workflow_tasks/reproject_wu.py index bdf374b..9b75141 100644 --- a/src/kbmod_wf/workflow_tasks/reproject_wu.py +++ b/src/kbmod_wf/workflow_tasks/reproject_wu.py @@ -8,6 +8,33 @@ ignore_for_cache=["logging_file"], ) def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """This app will call the reproject_wu function to reproject and reflex correct + a given WorkUnit file. + + Parameters + ---------- + inputs : tuple, optional + A tuple with a single parsl.File object that references the original WorkUnit + file, by default () + outputs : tuple, optional + A tuple with a single parsl.File object that references the reprojected + WorkUnit file, by default () + runtime_config : dict, optional + A dictionary of configuration setting specific to this task, by default {} + logging_file : parsl.File, optional + The parsl.File object the defines where the logs are written, by default None + + Returns + ------- + parsl.File + The file object that points to the resulting WorkUnit file that was created. + + Raises + ------ + e + Reraises any exceptions that occur during the execution of the reproject_wu + function. + """ import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu diff --git a/src/kbmod_wf/workflow_tasks/uri_to_ic.py b/src/kbmod_wf/workflow_tasks/uri_to_ic.py index a58ef3b..f2cfa94 100644 --- a/src/kbmod_wf/workflow_tasks/uri_to_ic.py +++ b/src/kbmod_wf/workflow_tasks/uri_to_ic.py @@ -6,6 +6,33 @@ cache=True, executors=get_executors(["local_dev_testing", "small_cpu"]), ignore_for_cache=["logging_file"] ) def uri_to_ic(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """This app will call the uri_to_ic function to convert a given list of URIs + file into an ImageCollection file. + + Parameters + ---------- + inputs : tuple, optional + A tuple with a single parsl.File object that references the uri list file, + by default () + outputs : tuple, optional + A tuple with a single parsl.File object that references the ImageCollection + file, by default () + runtime_config : dict, optional + A dictionary of configuration setting specific to this task, by default {} + logging_file : parsl.File, optional + The parsl.File object the defines where the logs are written, by default None + + Returns + ------- + parsl.File + The file object that points to the ImageCollection file that was created. + + Raises + ------ + e + Reraises any exceptions that occur during the execution of the uri_to_ic + function. + """ import traceback from kbmod_wf.utilities.logger_utilities import configure_logger from kbmod_wf.task_impls.uri_to_ic import uri_to_ic From d2c94c2be017729631d105707271ad50d1dc828c Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:18:21 -0700 Subject: [PATCH 7/7] Small changes after testing on Klone using older version of parsl. --- example_runtime_config.toml | 6 +----- src/kbmod_wf/resource_configs/klone_configuration.py | 8 ++++---- src/kbmod_wf/single_chip_workflow.py | 3 ++- src/kbmod_wf/tno_workflow.py | 3 ++- src/kbmod_wf/workflow.py | 3 ++- src/kbmod_wf/workflow_tasks/create_manifest.py | 6 ++++-- 6 files changed, 15 insertions(+), 14 deletions(-) diff --git a/example_runtime_config.toml b/example_runtime_config.toml index b2aae7e..a33daf1 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -13,11 +13,7 @@ checkpoint_mode = 'task_exit' # e.g. "/gscratch/dirac/kbmod/workflow/staging" staging_directory = "/home/drew/code/kbmod-wf/dev_staging" output_directory = "/home/drew/code/kbmod-wf/dev_staging/single_chip_workflow" - - -[apps.uri_to_ic] -# The path to the butler config file that instantiate a butler to retrieve images -butler_config_filepath = "/gscratch/dirac/DEEP/repo/butler.yaml" +file_pattern = "*.collection" [apps.ic_to_wu] diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index 8150a8d..953622b 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -25,7 +25,7 @@ def klone_resource_config(): executors=[ HighThroughputExecutor( label="small_cpu", - max_workers_per_node=1, + max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -44,7 +44,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="large_mem", - max_workers_per_node=1, + max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -63,7 +63,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="sharded_reproject", - max_workers_per_node=1, + max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -82,7 +82,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="gpu", - max_workers_per_node=1, + max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="escience", diff --git a/src/kbmod_wf/single_chip_workflow.py b/src/kbmod_wf/single_chip_workflow.py index 2a1e08b..fb1386b 100644 --- a/src/kbmod_wf/single_chip_workflow.py +++ b/src/kbmod_wf/single_chip_workflow.py @@ -60,7 +60,8 @@ def workflow_runner(env=None, runtime_config={}): app_configs = runtime_config.get("apps", {}) - with parsl.load(resource_config) as dfk: + dfk = parsl.load(resource_config) + if dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) logger = configure_logger("workflow.workflow_runner", logging_file.filepath) diff --git a/src/kbmod_wf/tno_workflow.py b/src/kbmod_wf/tno_workflow.py index 87f13b1..91ed76d 100644 --- a/src/kbmod_wf/tno_workflow.py +++ b/src/kbmod_wf/tno_workflow.py @@ -32,7 +32,8 @@ def workflow_runner(env=None, runtime_config={}): app_configs = runtime_config.get("apps", {}) - with parsl.load(resource_config) as dfk: + dfk = parsl.load(resource_config) + if dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) logger = configure_logger("workflow.workflow_runner", logging_file.filepath) diff --git a/src/kbmod_wf/workflow.py b/src/kbmod_wf/workflow.py index b258e2c..9ac4652 100644 --- a/src/kbmod_wf/workflow.py +++ b/src/kbmod_wf/workflow.py @@ -27,7 +27,8 @@ def workflow_runner(env=None, runtime_config={}): app_configs = runtime_config.get("apps", {}) - with parsl.load(resource_config) as dfk: + dfk = parsl.load(resource_config) + if dfk: logging_file = File(os.path.join(dfk.run_dir, "parsl.log")) logger = configure_logger("workflow.workflow_runner", logging_file.filepath) diff --git a/src/kbmod_wf/workflow_tasks/create_manifest.py b/src/kbmod_wf/workflow_tasks/create_manifest.py index 89e6755..51207fb 100644 --- a/src/kbmod_wf/workflow_tasks/create_manifest.py +++ b/src/kbmod_wf/workflow_tasks/create_manifest.py @@ -61,7 +61,8 @@ def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None) logger.info(f"Looking for staged files in {directory_path}") # Gather all the *.collection entries in the directory - pattern = os.path.join(directory_path, "*.collection") + file_pattern = runtime_config.get("file_pattern", "*.collection") + pattern = os.path.join(directory_path, file_pattern) entries = glob.glob(pattern) # Filter out directories, keep only files @@ -74,7 +75,8 @@ def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None) logger.info(f"Found {len(files)} files in {directory_path}") # Write the filenames to the manifest file - with open(outputs[0].filename, "w") as manifest_file: + logger.info(f"Writing manifest file: {outputs[0].filepath}") + with open(outputs[0].filepath, "w") as manifest_file: for file in files: manifest_file.write(file + "\n")