Skip to content

Commit

Permalink
Add support for ingesting multi-night ImageCollections (#46)
Browse files Browse the repository at this point in the history
* Add multi_night_workflow for ingesting multi-night ImageCollections

* Specify guess distances in the parsl runtime config file.

* Comibine ic_to_wu and reproject steps

* Revert local klone config changes

* Clean up redundant task from earlier version
  • Loading branch information
wilsonbb authored Oct 25, 2024
1 parent b6f28c6 commit 839763b
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 192 deletions.
148 changes: 148 additions & 0 deletions src/kbmod_wf/multi_night_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import argparse
import os

import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu
guess_dist = inputs[1] # heliocentric guess distance in AU
logger.info(f"Starting reproject_ic for guess distance {guess_dist}")
with ErrorLogger(logger):
reproject_wu(
guess_dist,
ic_filepath=inputs[0],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
logger.info("Completed reproject_ic")
return outputs[0]


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.
Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

app_configs = runtime_config.get("apps", {})

dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "kbmod.log"))
logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
repro_wu_filenames = []
runtime_config=app_configs.get("reproject_wu", {})
with open(create_manifest_future.result(), "r") as f:
for line in f:
collection_file = File(line.strip())
wu_filename = line + ".wu"
# Get the requested heliocentric guess distances (in AU) for reflex correction.
# If none are provided, default to 42.0 AU.
distances = runtime_config["helio_guess_dists"] if "helio_guess_dists" in runtime_config else [42.0]
for dist in distances:
output_filename=wu_filename + f".{dist}.repro"
repro_wu_filenames.append(output_filename)
reproject_futures.append(
reproject_wu(
inputs=[collection_file, dist],
outputs=[File(output_filename)],
runtime_config=runtime_config,
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for i in range(len(reproject_futures)):
f = reproject_futures[i]
search_futures.append(
kbmod_search(
inputs=[f],
outputs=[File(repro_wu_filenames[i] + ".search.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
24 changes: 12 additions & 12 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def ic_to_wu(
ic_filepath: str = None, wu_filepath: str = None, runtime_config: dict = {}, logger: Logger = None
ic_filepath: str = None, wu_filepath: str = None, save: bool = True, runtime_config: dict = {}, logger: Logger = None
):
"""This task will convert an ImageCollection to a WorkUnit.
Expand All @@ -19,18 +19,20 @@ def ic_to_wu(
The fully resolved filepath to the input ImageCollection file, by default None
wu_filepath : str, optional
The fully resolved filepath for the output WorkUnit file, by default None
save : bool, optional
Flag to save the WorkUnit to disk, by default True. If False, the WorkUnit is returned.
runtime_config : dict, optional
Additional configuration parameters to be used at runtime, by default {}
logger : Logger, optional
Primary logger for the workflow, by default None
Returns
-------
str
The fully resolved filepath of the output WorkUnit file.
str | WorkUnit
The fully resolved filepath of the output WorkUnit file or the WorkUnit object itself if save=False.
"""
ic_to_wu_converter = ICtoWUConverter(
ic_filepath=ic_filepath, wu_filepath=wu_filepath, runtime_config=runtime_config, logger=logger
ic_filepath=ic_filepath, wu_filepath=wu_filepath, save=save, runtime_config=runtime_config, logger=logger
)

return ic_to_wu_converter.create_work_unit()
Expand All @@ -41,25 +43,19 @@ def __init__(
self,
ic_filepath: str = None,
wu_filepath: str = None,
save: bool = True,
runtime_config: dict = {},
logger: Logger = None,
):
self.ic_filepath = ic_filepath
self.wu_filepath = wu_filepath
self.save = save
self.runtime_config = runtime_config
self.logger = logger

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

Expand All @@ -76,6 +72,10 @@ def create_work_unit(self):
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

if not self.save:
self.logger.debug(f"Required {elapsed}[s] to create the WorkUnit.")
return orig_wu

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
Expand Down
Loading

0 comments on commit 839763b

Please sign in to comment.