Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use temporary folder container context manager for wsclean files #145

Merged
merged 7 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Change log

# dev
- if `-temp-dir` used in wsclean then imaging products are produced here and then copied over to the same directory as the MS. This is intended to make use of compute nodes and fast local storage, like memory tmpfs or local disks.

# 0.2.5
- added in skip rounds for masking and selfcal
- Basic handling of CASDA measurement sets (preprocessing)
Expand Down
42 changes: 35 additions & 7 deletions flint/imager/wsclean.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from flint.logging import logger
from flint.ms import MS
from flint.sclient import run_singularity_command
from flint.utils import get_environment_variable
from flint.utils import get_environment_variable, hold_then_move_into


class ImageSet(NamedTuple):
Expand Down Expand Up @@ -40,6 +40,10 @@ class WSCleanOptions(NamedTuple):
Basic support for environment variables is available. Should a value start
with `$` it is assumed to be a environment variable, it is will be looked up.
Some basic attempts to deterimine if it is a path is made.

Should the `temp_dir` options be specified then all images will be
created in this location, and then moved over to the same parent directory
as the imaged MS. This is done by setting the wsclean `-name` argument.
"""

abs_mem: int = 100
Expand Down Expand Up @@ -317,6 +321,9 @@ def create_wsclean_cmd(
bind_dir_paths = []
bind_dir_options = ("temp-dir",)

move_directory = ms.path.parent
hold_directory: Optional[Path] = None

cmd = "wsclean "
unknowns: List[Tuple[Any, Any]] = []
logger.info("Creating wsclean command.")
Expand Down Expand Up @@ -356,6 +363,11 @@ def create_wsclean_cmd(
else:
unknowns.append((key, value))

if key == "temp-dir" and isinstance(value, (Path, str)):
hold_directory = Path(value)
name_str = hold_directory / ms.path.stem
cmd += f"-name {str(name_str)} "

if key in bind_dir_options and isinstance(value, (str, Path)):
bind_dir_paths.append(Path(value))

Expand All @@ -376,6 +388,7 @@ def create_wsclean_cmd(
wsclean_cmd=wsclean_cmd,
container=container,
bind_dirs=tuple(bind_dir_paths),
move_hold_directories=(move_directory, hold_directory),
)

return wsclean_cmd
Expand All @@ -385,6 +398,7 @@ def run_wsclean_imager(
wsclean_cmd: WSCleanCommand,
container: Path,
bind_dirs: Optional[Tuple[Path]] = None,
move_hold_directories: Optional[Tuple[Path, Optional[Path]]] = None,
) -> WSCleanCommand:
"""Run a provided wsclean command. Optionally will clean up files,
including the dirty beams, psfs and other assorted things.
Expand All @@ -393,6 +407,7 @@ def run_wsclean_imager(
wsclean_cmd (WSCleanCommand): The command to run, and other properties (cleanup.)
container (Path): Path to the container with wsclean available in it
bind_dirs (Optional[Tuple[Path]], optional): Additional directories to include when binding to the wsclean container. Defaults to None.
move_hold_directories (Optional[Tuple[Path,Optional[Path]]], optional): The `move_directory` and `hold_directory` passed to the temporary context manger. If None no `hold_then_move_into` manager is used. Defaults to None.

Returns:
WSCleanCommand: The executed wsclean command with a populated imageset properter.
Expand All @@ -406,12 +421,25 @@ def run_wsclean_imager(
if bind_dirs:
sclient_bind_dirs = sclient_bind_dirs + list(bind_dirs)

run_singularity_command(
image=container,
command=wsclean_cmd.cmd,
bind_dirs=sclient_bind_dirs,
stream_callback_func=_wsclean_output_callback,
)
if move_hold_directories:
with hold_then_move_into(
move_directory=move_hold_directories[0],
hold_directory=move_hold_directories[1],
) as directory:
sclient_bind_dirs.append(directory)
run_singularity_command(
image=container,
command=wsclean_cmd.cmd,
bind_dirs=sclient_bind_dirs,
stream_callback_func=_wsclean_output_callback,
)
else:
run_singularity_command(
image=container,
command=wsclean_cmd.cmd,
bind_dirs=sclient_bind_dirs,
stream_callback_func=_wsclean_output_callback,
)

prefix = wsclean_cmd.options.name
if prefix is None:
Expand Down
17 changes: 12 additions & 5 deletions flint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@

@contextmanager
def hold_then_move_into(
hold_directory: Path, move_directory: Path, delete_hold_on_exist: bool = True
move_directory: Path,
hold_directory: Optional[Path],
delete_hold_on_exist: bool = True,
) -> Path:
"""Create a temporary directory such that anything within it one the
"""Create a temporary directory such that anything within it on the
exit of the context manager is copied over to `move_directory`.

If `hold_directory` and `move_directory` are the same or `hold_directory` is None, then `move_directory`
is immediatedly returned and no output files are copied or deleted. `move_directory` will be
created if it does not exist.

Args:
hold_directory (Path): Location of directory to temporarily base work from
move_directory (Path): Final directort location to move items into
hold_directory (Optional[Path], optional): Location of directory to temporarily base work from. If None provided `move_directory` is returned and no copying/deleting is performed on exit. Defaults to None.
delete_hold_on_exist (bool, optional): Whether `hold_directory` is deleted on exit of the context. Defaults to True.

Returns:
Expand All @@ -47,10 +53,11 @@ def hold_then_move_into(
"""
# TODO: except extra files and folders to copy into `hold_directory` that are
# also placed back on exit
hold_directory = Path(hold_directory)
hold_directory = Path(hold_directory) if hold_directory else None
move_directory = Path(move_directory)

if hold_directory == move_directory:
if hold_directory == move_directory or hold_directory is None:
move_directory.mkdir(parents=True, exist_ok=True)
yield move_directory
else:
for directory in (hold_directory, move_directory):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ def test_hold_then_test_errors(tmpdir):
logger.info("This will not be here")


def test_hold_then_move_into_none(tmpdir):
"""See whether the context manager behaves as expected when the temporary hold
directory is None. This should just do thing in the move_directory."""

tmpdir = Path(tmpdir)

hold_directory = None
move_directory = Path(tmpdir / "new/the/final/location")

no_files = 45
with hold_then_move_into(
hold_directory=hold_directory, move_directory=move_directory
) as put_dir:
assert put_dir.exists()
assert put_dir == move_directory
for i in range(no_files):
file: Path = put_dir / f"some_file_{i}.txt"
file.write_text(f"This is a file {i}")

assert len(list(put_dir.glob("*"))) == no_files
assert move_directory.exists()

assert len(list(move_directory.glob("*"))) == no_files
assert put_dir.exists()


def test_hold_then_move_into(tmpdir):
"""See whether the hold directory can have things dumped into it, then
moved into place on exit of the context manager"""
Expand Down
Loading