Skip to content

Commit

Permalink
WIP - Expect several bugs, but need to test on Klone now.
Browse files Browse the repository at this point in the history
  • Loading branch information
drewoldag committed Sep 5, 2024
1 parent 47f1aaf commit fb2b9a4
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 124 deletions.
4 changes: 2 additions & 2 deletions example_runtime_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ checkpoint_mode = 'task_exit'
[apps.create_manifest]
# The path to the staging directory
# e.g. "/gscratch/dirac/kbmod/workflow/staging"
staging_directory = "/home/drew/code/kbmod-wf/dev_staging"
output_directory = "/home/drew/code/kbmod-wf/dev_staging/processing"
staging_directory = "/Users/drew/code/kbmod-wf/dev_staging"
output_directory = "/Users/drew/code/kbmod-wf/dev_staging/processing"
file_pattern = "*.collection"


Expand Down
79 changes: 26 additions & 53 deletions src/kbmod_wf/parallel_repro_single_chip_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import toml
import parsl
from parsl import join_app, python_app, File
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities import (
Expand All @@ -14,50 +14,26 @@
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search


# There's still a ton of duplicated code here and in kbmod_wf.workflow_tasks.reproject_wu
# that should be refactored.
# The only difference is the import of reproject_single_chip_single_night_wu here.
@join_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

logger.info("Starting reproject_ic")
with ErrorLogger(logger):
future = sharded_reproject(
original_wu_filepath=inputs[0].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
logger.info("Completed reproject_ic")
return future
from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu_return_shards, kbmod_search


@python_app(
cache=True,
executors=get_executors(["local_dev_testing", "sharded_reproject"]),
executors=get_executors(["local_dev_testing", "reproject_single_shard"]),
ignore_for_cache=["logging_file"],
)
def sharded_reproject(inputs=(), outputs=(), runtime_config={}, logging_file=None):
def reproject_shard(inputs=(), outputs=(), runtime_config={}, logging_file=None):
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = get_configured_logger("task.sharded_reproject", logging_file.filepath)

from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_wu_shard
from kbmod_wf.task_impls.reproject_single_chip_single_night_wu_shard import reproject_shard

logger.info("Starting reproject_ic")
with ErrorLogger(logger):
reproject_wu_shard(
reproject_shard(
original_wu_filepath=inputs[0].filepath,
original_wcs=inputs[1],
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
Expand Down Expand Up @@ -94,64 +70,61 @@ def workflow_runner(env=None, runtime_config={}):

# gather all the *.collection files that are staged for processing
create_manifest_config = app_configs.get("create_manifest", {})
manifest_file = File(
os.path.join(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")
)
manifest_file_path = Path(create_manifest_config.get("output_directory", os.getcwd()), "manifest.txt")

create_manifest_future = create_manifest(
inputs=[],
outputs=[manifest_file],
outputs=[File(manifest_file_path)],
runtime_config=app_configs.get("create_manifest", {}),
logging_file=logging_file,
)

with open(create_manifest_future.result(), "r") as f:
# process each .collection file in the manifest into a .wu file
with open(create_manifest_future.result(), "r") as manifest:
# process each .collection file in the manifest
original_work_unit_futures = []
for line in f:
for line in manifest:
# Create path object for the line in the manifest
input_file = Path(line.strip())

# Create a directory for the sharded work unit files
# Create a directory to contain each work unit's shards
sharded_directory = Path(input_file.parent, input_file.stem)
sharded_directory.mkdir(exist_ok=True)

# Create the work unit filepath
# Construct the work unit filepath
output_workunit_filepath = Path(sharded_directory, input_file.stem + ".wu")

# Create the work unit future
original_work_unit_futures.append(
ic_to_wu(
ic_to_wu_return_shards(
inputs=[input_file],
outputs=[File(output_workunit_filepath)],
runtime_config=app_configs.get("ic_to_wu", {}),
logging_file=logging_file,
)
)

# reproject each WorkUnit
# reproject each WorkUnit shard individually
# For chip-by-chip, this isn't really necessary, so hardcoding to 0.
reproject_futures = []
for f in original_work_unit_futures:
distance = 0

unique_obstimes, unique_obstimes_indices = work_unit.get_unique_obstimes_and_indices()

reproject_futures.append(
reproject_wu(
inputs=[f.result()],
outputs=[File(f.result().filepath + f".{distance}.repro")],
shard_futures = []
for i in f.result():
shard_future = reproject_shard(
inputs=[i],
outputs=[File(i.parent / (i.stem + ".repro"))],
runtime_config=app_configs.get("reproject_wu", {}),
logging_file=logging_file,
)
)
shard_futures.append(shard_future)
reproject_futures.append(shard_futures)

# run kbmod search on each reprojected WorkUnit
search_futures = []
for f in reproject_futures:
search_futures.append(
kbmod_search(
inputs=[f.result()],
outputs=[File(f.result().filepath + ".search.ecsv")],
inputs=[i.result() for i in f],
outputs=[],
runtime_config=app_configs.get("kbmod_search", {}),
logging_file=logging_file,
)
Expand Down
10 changes: 5 additions & 5 deletions src/kbmod_wf/resource_configs/klone_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"compute_bigmem": "01:00:00",
"large_mem": "04:00:00",
"sharded_reproject": "04:00:00",
"parallel_reproject": "00:30:00",
"reproject_single_shard": "00:30:00",
"gpu_max": "08:00:00",
}

Expand Down Expand Up @@ -82,20 +82,20 @@ def klone_resource_config():
),
),
HighThroughputExecutor(
label="parallel_reproject",
label="reproject_single_shard",
max_workers=1,
provider=SlurmProvider(
partition="ckpt-g2",
account="astro",
min_blocks=0,
max_blocks=2,
max_blocks=256,
init_blocks=0,
parallelism=1,
nodes_per_block=1,
cores_per_node=1,
mem_per_node=2, # ~2-4 GB per core
mem_per_node=1, # only working on 1 image, so <1 GB should be required
exclusive=False,
walltime=walltimes["parallel_reproject"],
walltime=walltimes["reproject_single_shard"],
# Command to run before starting worker - i.e. conda activate <special_env>
worker_init="",
),
Expand Down
4 changes: 3 additions & 1 deletion src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ def create_work_unit(self):
elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to write WorkUnit to disk: {self.wu_filepath}")

return self.wu_filepath
wcs = list(orig_wu._per_image_wcs)

return self.wu_filepath, wcs
140 changes: 78 additions & 62 deletions src/kbmod_wf/task_impls/reproject_single_chip_single_night_wu_shard.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import os
import time
from logging import Logger

import numpy as np
import astropy.io.fits as fitsio

import kbmod
from kbmod.work_unit import WorkUnit

import kbmod.reprojection as reprojection

from reoproject import reproject_adaptive
from reproject.mosaicking import find_optimal_celestial_wcs
import os
import time
from logging import Logger


def reproject_wu_shard(
def reproject_shard(
original_wu_shard_filepath: str = None,
original_wcs=None,
reprojected_wu_shard_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
Expand All @@ -35,60 +40,71 @@ def reproject_wu_shard(
The fully resolved filepath of the resulting WorkUnit file after reflex
and reprojection.
"""
wu_shard_reprojector = WUShardReprojector(
original_wu_filepath=original_wu_shard_filepath,
reprojected_wu_filepath=reprojected_wu_shard_filepath,
runtime_config=runtime_config,
logger=logger,
)

return wu_shard_reprojector.reproject_workunit()


class WUShardReprojector:
def __init__(
self,
original_wu_filepath: str = None,
reprojected_wu_filepath: str = None,
runtime_config: dict = {},
logger: Logger = None,
):
self.original_wu_filepath = original_wu_filepath
self.reprojected_wu_filepath = reprojected_wu_filepath
self.runtime_config = runtime_config
self.logger = logger

# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

def reproject_workunit_shard(self):
last_time = time.time()
self.logger.info(f"Lazy reading existing WorkUnit from disk: {self.original_wu_filepath}")
directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
elapsed = round(time.time() - last_time, 1)
self.logger.info(f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}.")

directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
self.reprojected_wu_filepath
)

# Reproject to a common WCS using the WCS for our patch
self.logger.info(f"Reprojecting WorkUnit with {self.n_workers} workers...")
last_time = time.time()

opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs))
opt_wcs.array_shape = shape
reprojection.reproject_work_unit(
wu,
opt_wcs,
max_parallel_processes=self.n_workers,
write_output=True,
directory=directory_containing_reprojected_shards,
filename=reprojected_wu_filename,
)

elapsed = round(time.time() - last_time, 1)
self.logger.info(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

return self.reprojected_wu_filepath

opt_wcs, shape = find_optimal_celestial_wcs(original_wcs)
opt_wcs.array_shape = shape

shard = fitsio.open(original_wu_shard_filepath)
sci = reproject_adaptive(shard, opt_wcs, hdu_in=0)
var = reproject_adaptive(shard, opt_wcs, hdu_in=1)
mask = reproject_adaptive(shard, opt_wcs, hdu_in=2)

shard[0].data = sci.astype(np.float32)
shard[1].data = var.astype(np.float32)
shard[2].data = mask.astype(np.float32)

shard.write(original_wu_shard_filepath)

with open(reprojected_wu_shard_filepath, "w") as f:
f.write(f"Reprojected: {original_wu_shard_filepath}")

return original_wu_shard_filepath


# class WUShardReprojector:
# def __init__(
# self,
# original_wu_filepath: str = None,
# reprojected_wu_filepath: str = None,
# runtime_config: dict = {},
# logger: Logger = None,
# ):
# self.original_wu_filepath = original_wu_filepath
# self.reprojected_wu_filepath = reprojected_wu_filepath
# self.runtime_config = runtime_config
# self.logger = logger

# # Default to 8 workers if not in the config. Value must be 0<num workers<65.
# self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))

# def reproject_workunit_shard(self):
# last_time = time.time()
# self.logger.info(f"Lazy reading existing WorkUnit from disk: {self.original_wu_filepath}")
# directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
# wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
# elapsed = round(time.time() - last_time, 1)
# self.logger.info(f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}.")

# directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
# self.reprojected_wu_filepath
# )

# # Reproject to a common WCS using the WCS for our patch
# self.logger.info(f"Reprojecting WorkUnit with {self.n_workers} workers...")
# last_time = time.time()

# opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs))
# opt_wcs.array_shape = shape
# reprojection.reproject_work_unit(
# wu,
# opt_wcs,
# max_parallel_processes=self.n_workers,
# write_output=True,
# directory=directory_containing_reprojected_shards,
# filename=reprojected_wu_filename,
# )

# elapsed = round(time.time() - last_time, 1)
# self.logger.info(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

# return self.reprojected_wu_filepath
2 changes: 1 addition & 1 deletion src/kbmod_wf/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def workflow_runner(env=None, runtime_config={}):
runtime_config = {}

#! Don't forget to remove this hardcoded path!!!
args.runtime_config = "/home/drew/code/kbmod-wf/example_runtime_config.toml"
args.runtime_config = "/Users/drew/code/kbmod-wf/example_runtime_config.toml"
if args.runtime_config is not None and os.path.exists(args.runtime_config):
with open(args.runtime_config, "r") as toml_runtime_config:
runtime_config = toml.load(toml_runtime_config)
Expand Down
1 change: 1 addition & 0 deletions src/kbmod_wf/workflow_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic
from .ic_to_wu_return_shards import ic_to_wu_return_shards
Loading

0 comments on commit fb2b9a4

Please sign in to comment.