Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ingesting multi-night ImageCollections #46

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions src/kbmod_wf/multi_night_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import argparse
import os

import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

from kbmod_wf.task_impls.reproject_multi_chip_multi_night_wu import reproject_wu
guess_dist = inputs[1] # heliocentric guess distance in AU
logger.info(f"Starting reproject_ic for guess distance {guess_dist}")
with ErrorLogger(logger):
reproject_wu(
guess_dist,
ic_filepath=inputs[0],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
logger.info("Completed reproject_ic")
return outputs[0]


def workflow_runner(env=None, runtime_config={}):
"""This function will load and configure Parsl, and run the workflow.

Parameters
----------
env : str, optional
Environment string used to define which resource configuration to use,
by default None
runtime_config : dict, optional
Dictionary of assorted runtime configuration parameters, by default {}
"""
resource_config = get_resource_config(env=env)
resource_config = apply_runtime_updates(resource_config, runtime_config)

app_configs = runtime_config.get("apps", {})

dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "kbmod.log"))
logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")

logger.info("Starting workflow")

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

# reproject each WorkUnit for a range of distances
reproject_futures = []
repro_wu_filenames = []
runtime_config=app_configs.get("reproject_wu", {})
with open(create_manifest_future.result(), "r") as f:
for line in f:
collection_file = File(line.strip())
wu_filename = line + ".wu"
# Get the requested heliocentric guess distances (in AU) for reflex correction.
# If none are provided, default to 42.0 AU.
distances = runtime_config["helio_guess_dists"] if "helio_guess_dists" in runtime_config else [42.0]
for dist in distances:
output_filename=wu_filename + f".{dist}.repro"
repro_wu_filenames.append(output_filename)
reproject_futures.append(
reproject_wu(
inputs=[collection_file, dist],
outputs=[File(output_filename)],
runtime_config=runtime_config,
logging_file=logging_file,
)
)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for i in range(len(reproject_futures)):
f = reproject_futures[i]
search_futures.append(
kbmod_search(
inputs=[f],
outputs=[File(repro_wu_filenames[i] + ".search.ecsv")],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
)

[f.result() for f in search_futures]

logger.info("Workflow complete")

parsl.clear()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
type=str,
choices=["dev", "klone"],
help="The environment to run the workflow in.",
)

parser.add_argument(
"--runtime-config",
type=str,
help="The complete runtime configuration filepath to use for the workflow.",
)

args = parser.parse_args()

# if a runtime_config file was provided and exists, load the toml as a dict.
runtime_config = {}
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)

workflow_runner(env=args.env, runtime_config=runtime_config)
24 changes: 12 additions & 12 deletions src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def ic_to_wu(
ic_filepath: str = None, wu_filepath: str = None, runtime_config: dict = {}, logger: Logger = None
ic_filepath: str = None, wu_filepath: str = None, save: bool = True, runtime_config: dict = {}, logger: Logger = None
):
"""This task will convert an ImageCollection to a WorkUnit.

Expand All @@ -19,18 +19,20 @@ def ic_to_wu(
The fully resolved filepath to the input ImageCollection file, by default None
wu_filepath : str, optional
The fully resolved filepath for the output WorkUnit file, by default None
save : bool, optional
Flag to save the WorkUnit to disk, by default True. If False, the WorkUnit is returned.
runtime_config : dict, optional
Additional configuration parameters to be used at runtime, by default {}
logger : Logger, optional
Primary logger for the workflow, by default None

Returns
-------
str
The fully resolved filepath of the output WorkUnit file.
str | WorkUnit
The fully resolved filepath of the output WorkUnit file or the WorkUnit object itself if save=False.
"""
ic_to_wu_converter = ICtoWUConverter(
ic_filepath=ic_filepath, wu_filepath=wu_filepath, runtime_config=runtime_config, logger=logger
ic_filepath=ic_filepath, wu_filepath=wu_filepath, save=save, runtime_config=runtime_config, logger=logger
)

return ic_to_wu_converter.create_work_unit()
Expand All @@ -41,25 +43,19 @@ def __init__(
self,
ic_filepath: str = None,
wu_filepath: str = None,
save: bool = True,
runtime_config: dict = {},
logger: Logger = None,
):
self.ic_filepath = ic_filepath
self.wu_filepath = wu_filepath
self.save = save
self.runtime_config = runtime_config
self.logger = logger

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)

def create_work_unit(self):
if len(glob.glob(self.wu_filepath)):
if self.overwrite:
self.logger.info(f"Overwrite was {self.overwrite}. Deleting existing {self.wu_filepath}.")
os.remove(self.wu_filepath)
else:
make_wu = False

ic = ImageCollection.read(self.ic_filepath, format="ascii.ecsv")
self.logger.info(f"ImageCollection read from {self.ic_filepath}, creating work unit next.")

Expand All @@ -76,6 +72,10 @@ def create_work_unit(self):
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create WorkUnit.")

if not self.save:
self.logger.debug(f"Required {elapsed}[s] to create the WorkUnit.")
return orig_wu

self.logger.info(f"Saving sharded work unit to: {self.wu_filepath}")
last_time = time.time()
directory_containing_shards, wu_filename = os.path.split(self.wu_filepath)
Expand Down
Loading
Loading