Skip to content

Commit

Permalink
Updates to workflow to support older version of parsl.
Browse files Browse the repository at this point in the history
  • Loading branch information
drewoldag committed Sep 5, 2024
1 parent fb2b9a4 commit 5a40a35
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/kbmod_wf/parallel_repro_single_chip_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
executors=get_executors(["local_dev_testing", "reproject_single_shard"]),
ignore_for_cache=["logging_file"],
)
def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None):
def reproject_shard(inputs=(), outputs=(), wcs=None, runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.sharded_reproject", logging_file.filepath)
Expand All @@ -33,7 +33,7 @@ def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None)
with ErrorLogger(logger):
reproject_shard(
original_wu_filepath=inputs[0].filepath,
original_wcs=inputs[1],
original_wcs=wcs,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
Expand Down Expand Up @@ -70,11 +70,13 @@ def workflow_runner(env=None, runtime_config={}):

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)

create_manifest_future = create_manifest(
inputs=[],
outputs=[File(manifest_file_path)],
outputs=[manifest_file],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)
Expand All @@ -96,8 +98,8 @@ def workflow_runner(env=None, runtime_config={}):
# Create the work unit future
original_work_unit_futures.append(
ic_to_wu_return_shards(
inputs=[input_file],
outputs=[File(output_workunit_filepath)],
inputs=[File(str(input_file))],
outputs=[File(str(output_workunit_filepath))],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
Expand All @@ -108,10 +110,13 @@ def workflow_runner(env=None, runtime_config={}):
reproject_futures = []
for f in original_work_unit_futures:
shard_futures = []
for i in f.result():
shard_files, wcs = f.result()
for i in shard_files:
shard_file = Path(i)
shard_future = reproject_shard(
inputs=[i],
outputs=[File(i.parent / (i.stem + ".repro"))],
inputs=[File(str(shard_file))],
outputs=[File(str(shard_file.parent / (shard_file.stem + ".repro")))],
wcs=wcs,
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
Expand Down

0 comments on commit 5a40a35

Please sign in to comment.