From fb2b9a4b7a7ad89cffe7419aa94c1676d1639104 Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:12:03 -0700 Subject: [PATCH] WIP - Expect several bugs, but need to test on Klone now. --- example_runtime_config.toml | 4 +- src/kbmod_wf/parallel_repro_single_chip_wf.py | 79 ++++------ .../resource_configs/klone_configuration.py | 10 +- src/kbmod_wf/task_impls/ic_to_wu.py | 4 +- ...oject_single_chip_single_night_wu_shard.py | 140 ++++++++++-------- src/kbmod_wf/test_workflow.py | 2 +- src/kbmod_wf/workflow_tasks/__init__.py | 1 + .../workflow_tasks/ic_to_wu_return_shards.py | 53 +++++++ 8 files changed, 169 insertions(+), 124 deletions(-) create mode 100644 src/kbmod_wf/workflow_tasks/ic_to_wu_return_shards.py diff --git a/example_runtime_config.toml b/example_runtime_config.toml index 2956ffb..671a93a 100644 --- a/example_runtime_config.toml +++ b/example_runtime_config.toml @@ -11,8 +11,8 @@ checkpoint_mode = 'task_exit' [apps.create_manifest] # The path to the staging directory # e.g. "/gscratch/dirac/kbmod/workflow/staging" -staging_directory = "/home/drew/code/kbmod-wf/dev_staging" -output_directory = "/home/drew/code/kbmod-wf/dev_staging/processing" +staging_directory = "/Users/drew/code/kbmod-wf/dev_staging" +output_directory = "/Users/drew/code/kbmod-wf/dev_staging/processing" file_pattern = "*.collection" diff --git a/src/kbmod_wf/parallel_repro_single_chip_wf.py b/src/kbmod_wf/parallel_repro_single_chip_wf.py index 85b9489..3edd27f 100644 --- a/src/kbmod_wf/parallel_repro_single_chip_wf.py +++ b/src/kbmod_wf/parallel_repro_single_chip_wf.py @@ -4,7 +4,7 @@ import toml import parsl -from parsl import join_app, python_app, File +from parsl import python_app, File import parsl.executors from kbmod_wf.utilities import ( @@ -14,50 +14,26 @@ get_configured_logger, ) -from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search - - -# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu -# that should be refactored. -# The only difference is the import of reproject_single_chip_single_night_wu here. -@join_app( - cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), - ignore_for_cache=["logging_file"], -) -def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None): - from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger - - logger = get_configured_logger("task.reproject_wu", logging_file.filepath) - - logger.info("Starting reproject_ic") - with ErrorLogger(logger): - future = sharded_reproject( - original_wu_filepath=inputs[0].filepath, - reprojected_wu_filepath=outputs[0].filepath, - runtime_config=runtime_config, - logger=logger, - ) - logger.info("Completed reproject_ic") - return future +from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu_return_shards, kbmod_search @python_app( cache=True, - executors=get_executors(["local_dev_testing", "sharded_reproject"]), + executors=get_executors(["local_dev_testing", "reproject_single_shard"]), ignore_for_cache=["logging_file"], ) -def sharded_reproject(inputs=(), outputs=(), runtime_config={}, logging_file=None): +def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None): from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger logger = get_configured_logger("task.sharded_reproject", logging_file.filepath) - from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_wu_shard + from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_shard logger.info("Starting reproject_ic") with ErrorLogger(logger): - reproject_wu_shard( + reproject_shard( original_wu_filepath=inputs[0].filepath, + original_wcs=inputs[1], reprojected_wu_filepath=outputs[0].filepath, runtime_config=runtime_config, logger=logger, @@ -94,33 +70,32 @@ def workflow_runner(env=None, runtime_config={}): # gather all the *.collection files that are staged for processing create_manifest_config = app_configs.get("create_manifest", {}) - manifest_file = File( - os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") - ) + manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt") + create_manifest_future = create_manifest( inputs=[], - outputs=[manifest_file], + outputs=[File(manifest_file_path)], runtime_config=app_configs.get("create_manifest", {}), logging_file=logging_file, ) - with open(create_manifest_future.result(), "r") as f: - # process each .collection file in the manifest into a .wu file + with open(create_manifest_future.result(), "r") as manifest: + # process each .collection file in the manifest original_work_unit_futures = [] - for line in f: + for line in manifest: # Create path object for the line in the manifest input_file = Path(line.strip()) - # Create a directory for the sharded work unit files + # Create a directory to contain each work unit's shards sharded_directory = Path(input_file.parent, input_file.stem) sharded_directory.mkdir(exist_ok=True) - # Create the work unit filepath + # Construct the work unit filepath output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu") # Create the work unit future original_work_unit_futures.append( - ic_to_wu( + ic_to_wu_return_shards( inputs=[input_file], outputs=[File(output_workunit_filepath)], runtime_config=app_configs.get("ic_to_wu", {}), @@ -128,30 +103,28 @@ def workflow_runner(env=None, runtime_config={}): ) ) - # reproject each WorkUnit + # reproject each WorkUnit shard individually # For chip-by-chip, this isn't really necessary, so hardcoding to 0. reproject_futures = [] for f in original_work_unit_futures: - distance = 0 - - unique_obstimes, unique_obstimes_indices = work_unit.get_unique_obstimes_and_indices() - - reproject_futures.append( - reproject_wu( - inputs=[f.result()], - outputs=[File(f.result().filepath + f".{distance}.repro")], + shard_futures = [] + for i in f.result(): + shard_future = reproject_shard( + inputs=[i], + outputs=[File(i.parent / (i.stem + ".repro"))], runtime_config=app_configs.get("reproject_wu", {}), logging_file=logging_file, ) - ) + shard_futures.append(shard_future) + reproject_futures.append(shard_futures) # run kbmod search on each reprojected WorkUnit search_futures = [] for f in reproject_futures: search_futures.append( kbmod_search( - inputs=[f.result()], - outputs=[File(f.result().filepath + ".search.ecsv")], + inputs=[i.result() for i in f], + outputs=[], runtime_config=app_configs.get("kbmod_search", {}), logging_file=logging_file, ) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index f76a978..256f12d 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -9,7 +9,7 @@ "compute_bigmem": "01:00:00", "large_mem": "04:00:00", "sharded_reproject": "04:00:00", - "parallel_reproject": "00:30:00", + "reproject_single_shard": "00:30:00", "gpu_max": "08:00:00", } @@ -82,20 +82,20 @@ def klone_resource_config(): ), ), HighThroughputExecutor( - label="parallel_reproject", + label="reproject_single_shard", max_workers=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", min_blocks=0, - max_blocks=2, + max_blocks=256, init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=1, - mem_per_node=2, # ~2-4 GB per core + mem_per_node=1, # only working on 1 image, so <1 GB should be required exclusive=False, - walltime=walltimes["parallel_reproject"], + walltime=walltimes["reproject_single_shard"], # Command to run before starting worker - i.e. conda activate worker_init="", ), diff --git a/src/kbmod_wf/task_impls/ic_to_wu.py b/src/kbmod_wf/task_impls/ic_to_wu.py index 66559da..5ef9b06 100644 --- a/src/kbmod_wf/task_impls/ic_to_wu.py +++ b/src/kbmod_wf/task_impls/ic_to_wu.py @@ -83,4 +83,6 @@ def create_work_unit(self): elapsed = round(time.time() - last_time, 1) self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}") - return self.wu_filepath + wcs = list(orig_wu._per_image_wcs) + + return self.wu_filepath, wcs diff --git a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py index ce089c4..8f371b0 100644 --- a/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py +++ b/src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py @@ -1,16 +1,21 @@ +import os +import time +from logging import Logger + +import numpy as np +import astropy.io.fits as fitsio + import kbmod from kbmod.work_unit import WorkUnit - import kbmod.reprojection as reprojection +from reoproject import reproject_adaptive from reproject.mosaicking import find_optimal_celestial_wcs -import os -import time -from logging import Logger -def reproject_wu_shard( +def reproject_shard( original_wu_shard_filepath: str = None, + original_wcs=None, reprojected_wu_shard_filepath: str = None, runtime_config: dict = {}, logger: Logger = None, @@ -35,60 +40,71 @@ def reproject_wu_shard( The fully resolved filepath of the resulting WorkUnit file after reflex and reprojection. """ - wu_shard_reprojector = WUShardReprojector( - original_wu_filepath=original_wu_shard_filepath, - reprojected_wu_filepath=reprojected_wu_shard_filepath, - runtime_config=runtime_config, - logger=logger, - ) - - return wu_shard_reprojector.reproject_workunit() - - -class WUShardReprojector: - def __init__( - self, - original_wu_filepath: str = None, - reprojected_wu_filepath: str = None, - runtime_config: dict = {}, - logger: Logger = None, - ): - self.original_wu_filepath = original_wu_filepath - self.reprojected_wu_filepath = reprojected_wu_filepath - self.runtime_config = runtime_config - self.logger = logger - - # Default to 8 workers if not in the config. Value must be 0