diff --git a/changes/8782.outlier_detection.0.rst b/changes/8782.outlier_detection.0.rst new file mode 100644 index 0000000000..469e927e44 --- /dev/null +++ b/changes/8782.outlier_detection.0.rst @@ -0,0 +1 @@ +Decrease the amount of file I/O required to compute the median when in_memory is set to False. \ No newline at end of file diff --git a/changes/8782.outlier_detection.1.rst b/changes/8782.outlier_detection.1.rst new file mode 100644 index 0000000000..a1f9bd8e55 --- /dev/null +++ b/changes/8782.outlier_detection.1.rst @@ -0,0 +1 @@ +Fix a bug that caused intermediate files to conflict for different slits when a MultiSlitModel was processed. \ No newline at end of file diff --git a/changes/8782.resample.rst b/changes/8782.resample.rst new file mode 100644 index 0000000000..05b3e87052 --- /dev/null +++ b/changes/8782.resample.rst @@ -0,0 +1 @@ +Permit creating drizzled models one at a time in many-to-many mode. diff --git a/docs/jwst/outlier_detection/outlier_detection_imaging.rst b/docs/jwst/outlier_detection/outlier_detection_imaging.rst index c2cd6318b9..a6a6ec78e7 100644 --- a/docs/jwst/outlier_detection/outlier_detection_imaging.rst +++ b/docs/jwst/outlier_detection/outlier_detection_imaging.rst @@ -58,7 +58,8 @@ Specifically, this routine performs the following operations: should be used when resampling to create the output mosaic. Any pixel with a DQ value not included in this value (or list of values) will be ignored when resampling. - * Resampled images will be written out to disk with the suffix ``__outlier_i2d.fits`` + * When the ``save_intermediate_results`` parameter is set to True, + resampled images will be written out to disk with the suffix ``__outlier_i2d.fits`` if the input model container has an , otherwise the suffix will be ``_outlier_i2d.fits`` by default. * **If resampling is turned off** through the use of the ``resample_data`` parameter, @@ -162,9 +163,12 @@ during processing includes: :py:class:`~jwst.resample.ResampleStep` as well, to set whether or not to keep the resampled images in memory or not. -#. Computing the median image works section-by-section by only keeping 1Mb of each input - in memory at a time. As a result, only the final output product array for the final - median image along with a stack of 1Mb image sections are kept in memory. +#. Computing the median image works by writing the resampled data frames to appendable files + on disk that are split into sections spatially but contain the entire ngroups (i.e., time) + axis. The section size is set to use roughly the same amount of memory as a single resampled + model, and since the resampled models are discarded from memory after this write operation this + choice avoids increasing the memory usage beyond a single resampled model. + Those sections are then read in one at a time to compute the median image. These changes result in a minimum amount of memory usage during processing at the obvious expense of reading and writing the products from disk. diff --git a/jwst/outlier_detection/_fileio.py b/jwst/outlier_detection/_fileio.py index 6975ac1717..25dc3a26bd 100644 --- a/jwst/outlier_detection/_fileio.py +++ b/jwst/outlier_detection/_fileio.py @@ -1,17 +1,9 @@ -import os - import logging log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) -def remove_file(fn): - if isinstance(fn, str) and os.path.isfile(fn): - os.remove(fn) - log.info(f"Removing file {fn}") - - -def save_median(median_model, make_output_path, asn_id=None): +def save_median(median_model, make_output_path): ''' Save median if requested by user @@ -20,13 +12,46 @@ def save_median(median_model, make_output_path, asn_id=None): median_model : ~jwst.datamodels.ImageModel The median ImageModel or CubeModel to save ''' - default_suffix = "_outlier_i2d.fits" - if asn_id is None: - suffix_to_remove = default_suffix - else: - suffix_to_remove = f"_{asn_id}{default_suffix}" - median_model_output_path = make_output_path( - basepath=median_model.meta.filename.replace(suffix_to_remove, '.fits'), - suffix='median') - median_model.save(median_model_output_path) - log.info(f"Saved model in {median_model_output_path}") + _save_intermediate_output(median_model, "median", make_output_path) + + +def save_drizzled(drizzled_model, make_output_path): + expected_tail = "outlier_?2d.fits" + suffix = drizzled_model.meta.filename[-len(expected_tail):-5] + _save_intermediate_output(drizzled_model, suffix, make_output_path) + + +def save_blot(input_model, blot, make_output_path): + blot_model = _make_blot_model(input_model, blot) + _save_intermediate_output(blot_model, "blot", make_output_path) + + +def _make_blot_model(input_model, blot): + blot_model = type(input_model)() + blot_model.data = blot + blot_model.update(input_model) + return blot_model + + +def _save_intermediate_output(model, suffix, make_output_path): + """ + Ensure all intermediate outputs from OutlierDetectionStep have consistent file naming conventions + + Notes + ----- + self.make_output_path() is updated globally for the step in the main pipeline + to include the asn_id in the output path, so no need to handle it here. + """ + + # outlier_?2d is not a known suffix, and make_output_path cannot handle an + # underscore in an unknown suffix, so do a manual string replacement + input_path = model.meta.filename.replace("_outlier_", "_") + + # Add a slit name to the output path for MultiSlitModel data if not present + if hasattr(model, "name") and model.name is not None: + if "_"+model.name.lower() not in input_path: + suffix = f"{model.name.lower()}_{suffix}" + + output_path = make_output_path(input_path, suffix=suffix) + model.save(output_path) + log.info(f"Saved {suffix} model in {output_path}") diff --git a/jwst/outlier_detection/coron.py b/jwst/outlier_detection/coron.py index 500ad49602..9a3bf9e984 100644 --- a/jwst/outlier_detection/coron.py +++ b/jwst/outlier_detection/coron.py @@ -2,7 +2,6 @@ Submodule for performing outlier detection on coronagraphy data. """ - import logging import numpy as np @@ -27,7 +26,6 @@ def detect_outliers( good_bits, maskpt, snr, - asn_id, make_output_path, ): """ @@ -56,7 +54,7 @@ def detect_outliers( median_model.update(input_model) median_model.meta.wcs = input_model.meta.wcs - save_median(median_model, make_output_path, asn_id) + save_median(median_model, make_output_path) del median_model # Perform outlier detection using statistical comparisons between diff --git a/jwst/outlier_detection/imaging.py b/jwst/outlier_detection/imaging.py index 5678b45802..0dacd5ec43 100644 --- a/jwst/outlier_detection/imaging.py +++ b/jwst/outlier_detection/imaging.py @@ -2,19 +2,16 @@ Submodule for performing outlier detection on imaging data. """ -import copy import logging -import os - -from stdatamodels.jwst import datamodels from jwst.datamodels import ModelLibrary from jwst.resample import resample -from jwst.resample.resample_utils import build_driz_weight from jwst.stpipe.utilities import record_step_status -from .utils import create_median, flag_model_crs, flag_resampled_model_crs -from ._fileio import remove_file, save_median +from .utils import (flag_model_crs, + flag_resampled_model_crs, + median_without_resampling, + median_with_resampling) log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) @@ -40,7 +37,6 @@ def detect_outliers( fillval, allowed_memory, in_memory, - asn_id, make_output_path, ): """ @@ -58,20 +54,10 @@ def detect_outliers( log.warning("Outlier detection will be skipped") record_step_status(input_models, "outlier_detection", False) return input_models - + if resample_data: - # Start by creating resampled/mosaic images for - # each group of exposures - with input_models: - example_model = input_models.borrow(0) - output_path = make_output_path(basepath=example_model.meta.filename, - suffix='') - input_models.shelve(example_model, modify=False) - del example_model - output_path = os.path.dirname(output_path) resamp = resample.ResampleData( input_models, - output=output_path, single=True, blendheaders=False, wht_type=weight_type, @@ -80,46 +66,21 @@ def detect_outliers( fillval=fillval, good_bits=good_bits, in_memory=in_memory, - asn_id=asn_id, allowed_memory=allowed_memory, ) - median_wcs = resamp.output_wcs - drizzled_models = resamp.do_drizzle(input_models) + median_data, median_wcs = median_with_resampling(input_models, + resamp, + maskpt, + save_intermediate_results=save_intermediate_results, + make_output_path=make_output_path,) else: - # for non-dithered data, the resampled image is just the original image - drizzled_models = input_models - with input_models: - for i, model in enumerate(input_models): - model.wht = build_driz_weight( - model, - weight_type=weight_type, - good_bits=good_bits) - # copy for when saving median and input is a filename? - if i == 0: - median_wcs = copy.deepcopy(model.meta.wcs) - input_models.shelve(model, modify=True) - - # Perform median combination on set of drizzled mosaics - median_data = create_median(drizzled_models, maskpt) + median_data, median_wcs = median_without_resampling(input_models, + maskpt, + weight_type, + good_bits, + save_intermediate_results=save_intermediate_results, + make_output_path=make_output_path,) - if save_intermediate_results: - # make a median model - with drizzled_models: - example_model = drizzled_models.borrow(0) - drizzled_models.shelve(example_model, modify=False) - median_model = datamodels.ImageModel(median_data) - median_model.update(example_model) - median_model.meta.wcs = median_wcs - del example_model - - save_median(median_model, make_output_path, asn_id) - del median_model - else: - # since we're not saving intermediate results if the drizzled models - # were written to disk, remove them - if not in_memory: - for fn in drizzled_models.asn["products"][0]["members"]: - remove_file(fn["expname"]) # Perform outlier detection using statistical comparisons between # each original input image and its blotted version of the median image diff --git a/jwst/outlier_detection/outlier_detection_step.py b/jwst/outlier_detection/outlier_detection_step.py index f18fb054e3..5ecc90a231 100644 --- a/jwst/outlier_detection/outlier_detection_step.py +++ b/jwst/outlier_detection/outlier_detection_step.py @@ -80,8 +80,7 @@ def process(self, input_data): self.log.info(f"Outlier Detection mode: {mode}") # determine the asn_id (if not set by the pipeline) - asn_id = self._get_asn_id(input_data) - self.log.info(f"Outlier Detection asn_id: {asn_id}") + self._get_asn_id(input_data) snr1, snr2 = [float(v) for v in self.snr.split()] scale1, scale2 = [float(v) for v in self.scale.split()] @@ -94,7 +93,6 @@ def process(self, input_data): self.maskpt, self.rolling_window_width, snr1, - asn_id, self.make_output_path, ) elif mode == 'coron': @@ -104,7 +102,6 @@ def process(self, input_data): self.good_bits, self.maskpt, snr1, - asn_id, self.make_output_path, ) elif mode == 'imaging': @@ -125,7 +122,6 @@ def process(self, input_data): self.fillval, self.allowed_memory, self.in_memory, - asn_id, self.make_output_path, ) elif mode == 'spec': @@ -145,7 +141,6 @@ def process(self, input_data): self.kernel, self.fillval, self.in_memory, - asn_id, self.make_output_path, ) elif mode == 'ifu': @@ -203,6 +198,9 @@ def _guess_mode(self, input_models): return None def _get_asn_id(self, input_models): + """Find association ID for any allowed input model type, + and update make_output_path such that the association ID + is included in intermediate and output file names.""" # handle if input_models isn't open if isinstance(input_models, (str, dict)): input_models = datamodels.open(input_models, asn_n_members=1) @@ -227,7 +225,8 @@ def _get_asn_id(self, input_models): _make_output_path, asn_id=asn_id ) - return asn_id + self.log.info(f"Outlier Detection asn_id: {asn_id}") + return def _set_status(self, input_models, status): # this might be called with the input which might be a filename or path diff --git a/jwst/outlier_detection/spec.py b/jwst/outlier_detection/spec.py index c482aad8b9..ad94551976 100644 --- a/jwst/outlier_detection/spec.py +++ b/jwst/outlier_detection/spec.py @@ -1,16 +1,14 @@ """ Submodule for performing outlier detection on spectra. """ -import copy -import os - -from stdatamodels.jwst import datamodels from jwst.datamodels import ModelContainer, ModelLibrary from jwst.stpipe.utilities import record_step_status -from ..resample import resample_spec, resample_utils -from .utils import create_median, flag_crs_in_models, flag_crs_in_models_with_resampling -from ._fileio import remove_file +from ..resample import resample_spec +from .utils import (flag_crs_in_models, + flag_crs_in_models_with_resampling, + median_with_resampling, + median_without_resampling) import logging log = logging.getLogger(__name__) @@ -36,7 +34,6 @@ def detect_outliers( kernel, fillval, in_memory, - asn_id, make_output_path, ): """ @@ -53,15 +50,15 @@ def detect_outliers( record_step_status(input_models, "outlier_detection", False) return input_models + # convert to library for resample + # for compatibility with image3 pipeline + library = ModelLibrary(input_models, on_disk=False) + if resample_data is True: # Start by creating resampled/mosaic images for # each group of exposures - output_path = make_output_path( - basepath=input_models[0].meta.filename, suffix='') - output_path = os.path.dirname(output_path) resamp = resample_spec.ResampleSpecData( input_models, - output=output_path, single=True, blendheaders=False, wht_type=weight_type, @@ -70,54 +67,24 @@ def detect_outliers( fillval=fillval, good_bits=good_bits, in_memory=in_memory, - asn_id=asn_id, ) - median_wcs = resamp.output_wcs - - # convert to library for resample - # for compatibility with image3 pipeline - library = ModelLibrary(input_models, on_disk=False) - drizzled_models = resamp.do_drizzle(library) + median_data, median_wcs = median_with_resampling( + library, + resamp, + maskpt, + save_intermediate_results=save_intermediate_results, + make_output_path=make_output_path,) else: - drizzled_models = ModelLibrary(input_models) - with drizzled_models: - for i, model in enumerate(drizzled_models): - model.wht = resample_utils.build_driz_weight( - input_models[i], - weight_type=weight_type, - good_bits=good_bits) - drizzled_models.shelve(model) - # copy for when saving median and input is a filename? - median_wcs = copy.deepcopy(input_models[0].meta.wcs) - - # Perform median combination on set of drizzled mosaics - # create_median should be called as a method from parent class - median_data = create_median(drizzled_models, maskpt) - - if save_intermediate_results: - # Initialize intermediate products used in the outlier detection - median_model = datamodels.ImageModel(median_data) - with drizzled_models: - example_model = drizzled_models.borrow(0) - drizzled_models.shelve(example_model, 0, modify=False) - median_model.meta = example_model.meta - median_model.meta.filename = make_output_path( - basepath=input_models[0].meta.filename, - suffix='median' + median_data, median_wcs = median_without_resampling( + library, + maskpt, + weight_type, + good_bits, + save_intermediate_results=save_intermediate_results, + make_output_path=make_output_path, ) - log.info("Writing out MEDIAN image to: {}".format( - median_model.meta.filename)) - median_model.save(median_model.meta.filename) - del median_model - else: - # since we're not saving intermediate results if the drizzled models - # were written to disk, remove them - if not in_memory: - for fn in drizzled_models._members: - remove_file(fn["expname"]) - # Perform outlier detection using statistical comparisons between # each original input image and its blotted version of the median image if resample_data: diff --git a/jwst/outlier_detection/tests/test_algorithms.py b/jwst/outlier_detection/tests/test_algorithms.py index b3b9255bb4..d1c8ffd85d 100644 --- a/jwst/outlier_detection/tests/test_algorithms.py +++ b/jwst/outlier_detection/tests/test_algorithms.py @@ -1,6 +1,7 @@ import numpy as np from jwst.outlier_detection.tso import moving_median_over_zeroth_axis +from jwst.outlier_detection.utils import nanmedian3D def test_rolling_median(): @@ -14,3 +15,14 @@ def test_rolling_median(): result = moving_median_over_zeroth_axis(arr, w) expected = expected_time_axis[:, np.newaxis, np.newaxis] * spatial_axis[np.newaxis, :, :] assert np.allclose(result, expected) + + +def test_nanmedian3D(): + + shp = (11, 50, 60) + cube = np.random.normal(size=shp) + cube[5, 5:7, 5:8] = np.nan + med = nanmedian3D(cube) + + assert med.dtype == np.float32 + assert np.allclose(med, np.nanmedian(cube, axis=0), equal_nan=True) \ No newline at end of file diff --git a/jwst/outlier_detection/tests/test_fileio.py b/jwst/outlier_detection/tests/test_fileio.py new file mode 100644 index 0000000000..6e01557c8e --- /dev/null +++ b/jwst/outlier_detection/tests/test_fileio.py @@ -0,0 +1,40 @@ +import pytest +from jwst.outlier_detection._fileio import _save_intermediate_output +from jwst.datamodels import ImageModel +from jwst.step import OutlierDetectionStep +import os +import numpy as np +from functools import partial + + +SLIT_ID = "slit007" +ASN_ID = "a008" +INPUT_FILENAME = "foo_outlier_s2d.fits" +SUFFIX = "test" + +@pytest.fixture(scope="module") +def model(): + model = ImageModel(data=np.zeros((10,10))) + model.meta.filename = INPUT_FILENAME + return model + + +@pytest.fixture(scope="module") +def make_output_path(): + return OutlierDetectionStep().make_output_path + + +@pytest.mark.parametrize("asn_id", [None, ASN_ID]) +@pytest.mark.parametrize("slit_id", [None, SLIT_ID]) +def test_save(tmp_cwd, model, make_output_path, asn_id, slit_id): + + this_model = model.copy() + if slit_id is not None: + this_model.name = slit_id + make_output_path = partial(make_output_path, asn_id=asn_id) + _save_intermediate_output(this_model, SUFFIX, make_output_path) + + stem = model.meta.filename.split("_")[0] + inputs = [val for val in [stem, asn_id, slit_id, SUFFIX] if val is not None] + expected_filename = "_".join(inputs)+".fits" + assert os.path.isfile(expected_filename) \ No newline at end of file diff --git a/jwst/outlier_detection/tests/test_outlier_detection.py b/jwst/outlier_detection/tests/test_outlier_detection.py index ec469f4fb4..3155824fc8 100644 --- a/jwst/outlier_detection/tests/test_outlier_detection.py +++ b/jwst/outlier_detection/tests/test_outlier_detection.py @@ -4,13 +4,14 @@ from glob import glob import os +from gwcs.wcs import WCS from stdatamodels.jwst import datamodels from jwst.datamodels import ModelContainer, ModelLibrary from jwst.assign_wcs import AssignWcsStep from jwst.assign_wcs.pointing import create_fitswcs from jwst.outlier_detection import OutlierDetectionStep -from jwst.outlier_detection.utils import _flag_resampled_model_crs, create_median +from jwst.outlier_detection.utils import _flag_resampled_model_crs from jwst.outlier_detection.outlier_detection_step import ( IMAGE_MODES, TSO_SPEC_MODES, @@ -18,6 +19,8 @@ CORON_IMAGE_MODES, ) from jwst.resample.tests.test_resample_step import miri_rate_model +from jwst.outlier_detection.utils import median_with_resampling, median_without_resampling +from jwst.resample.resample import ResampleData OUTLIER_DO_NOT_USE = np.bitwise_or( datamodels.dqflags.pixel["DO_NOT_USE"], datamodels.dqflags.pixel["OUTLIER"] @@ -109,7 +112,7 @@ def we_many_sci( ): """Provide numsci science images with different noise but identical source and same background level""" - shape = (20, 20) + shape = (21, 20) sci1 = datamodels.ImageModel(shape) @@ -595,7 +598,7 @@ def test_outlier_step_weak_cr_tso(exptype, tsovisit): assert result.dq[cr_timestep, 12, 12] == OUTLIER_DO_NOT_USE -def test_create_median(three_sci_as_asn, tmp_cwd): +def test_same_median_on_disk(three_sci_as_asn, tmp_cwd): """Test creation of median on disk vs in memory""" lib_on_disk = ModelLibrary(three_sci_as_asn, on_disk=True) lib_in_memory = ModelLibrary(three_sci_as_asn, on_disk=False) @@ -604,14 +607,71 @@ def test_create_median(three_sci_as_asn, tmp_cwd): with (lib_on_disk, lib_in_memory): for lib in [lib_on_disk, lib_in_memory]: for model in lib: - model.wht = np.ones_like(model.data) - model.wht[4,9] = 0.5 + model.var_rnoise = np.ones_like(model.data) + model.var_rnoise[4,9] = 2.0 lib.shelve(model, modify=True) - median_on_disk = create_median(lib_on_disk, 0.7) - median_in_memory = create_median(lib_in_memory, 0.7) + # 32-bit floats are 4 bytes each, min buffer size is one row of 20 pixels + # arbitrarily use 5 times that + buffer_size = 4 * 20 * 5 + median_on_disk, _ = median_without_resampling( + lib_on_disk, + 0.7, + "ivm", + "~DO_NOT_USE", + buffer_size=buffer_size,) + median_in_memory, _ = median_without_resampling( + lib_in_memory, + 0.7, + "ivm", + "~DO_NOT_USE", + buffer_size=buffer_size,) + + # Make sure the high-variance (low-weight) pixel is set to NaN assert np.isnan(median_in_memory[4,9]) # Make sure the median library is the same for on-disk and in-memory - assert np.allclose(median_on_disk, median_in_memory, equal_nan=True) \ No newline at end of file + assert np.allclose(median_on_disk, median_in_memory, equal_nan=True) + + +def test_drizzle_and_median_with_resample(three_sci_as_asn, tmp_cwd): + lib = ModelLibrary(three_sci_as_asn, on_disk=False) + + resamp = make_resamp(lib) + median, wcs = median_with_resampling( + lib, + resamp, + 0.7) + + assert isinstance(wcs, WCS) + assert median.shape == (21,20) + + resamp.single = False + with pytest.raises(ValueError): + # ensure failure if try to call when resamp.single is False + median_with_resampling( + lib, + resamp, + 0.7, + save_intermediate_results=True) + + +def make_resamp(input_models): + """All defaults are same as what is run by default by outlier detection""" + in_memory = not input_models._on_disk + resamp = ResampleData( + input_models, + output="", + single=True, + blendheaders=False, + wht_type="ivm", + pixfrac=1.0, + kernel="square", + fillval="INDEF", + good_bits="~DO_NOT_USE", + in_memory=in_memory, + asn_id="test", + allowed_memory=None, + ) + return resamp \ No newline at end of file diff --git a/jwst/outlier_detection/tests/test_utils.py b/jwst/outlier_detection/tests/test_utils.py new file mode 100644 index 0000000000..d1fc0bd576 --- /dev/null +++ b/jwst/outlier_detection/tests/test_utils.py @@ -0,0 +1,164 @@ +import pytest +from jwst.outlier_detection.utils import DiskAppendableArray, OnDiskMedian +from pathlib import Path +import numpy as np +import os + + +def test_disk_appendable_array(tmp_cwd): + + slice_shape = (8,7) + dtype = "float32" + tempdir = tmp_cwd / Path("tmptest") + os.mkdir(tempdir) + fname = tempdir / "test.bin" + + arr = DiskAppendableArray(slice_shape, dtype, fname) + + # check temporary file setup + assert str(arr._filename).split("/")[-1] in os.listdir(tempdir) + assert len(os.listdir(tempdir)) == 1 + assert arr.shape == (0,) + slice_shape + + # check cwd contains no files + assert all([not os.path.isfile(f) for f in os.listdir(tmp_cwd)]) + + # check expected append failures + with pytest.raises(ValueError): + candidate = np.zeros((7,7), dtype=dtype) + arr.append(candidate) + with pytest.raises(ValueError): + candidate = np.zeros((8,7), dtype='float64') + arr.append(candidate) + + # check append and read + candidate0 = np.zeros(slice_shape, dtype=dtype) + candidate1 = np.full(slice_shape, 2, dtype=dtype) + candidate2 = np.full(slice_shape, np.nan, dtype=dtype) + for candidate in [candidate0, candidate1, candidate2]: + arr.append(candidate) + + arr_in_memory = arr.read() + + assert arr_in_memory.shape == (3,) + slice_shape + assert np.all(arr_in_memory[0] == candidate0) + assert np.all(arr_in_memory[1] == candidate1) + assert np.allclose(arr_in_memory[2], candidate2, equal_nan=True) + + +def test_disk_appendable_array_bad_inputs(tmp_cwd): + + slice_shape = (8,7) + dtype = "float32" + tempdir = tmp_cwd / Path("tmptest") + fname = "test.bin" + + # test input directory does not exist + with pytest.raises(FileNotFoundError): + DiskAppendableArray(slice_shape, dtype, tempdir / fname) + + # make the input directory + os.mkdir(tempdir) + + # ensure failure if slice_shape is not 2-D + with pytest.raises(ValueError): + DiskAppendableArray((3,5,7), dtype, tempdir / fname) + + # ensure failure if dtype is not valid + with pytest.raises(TypeError): + DiskAppendableArray(slice_shape, "float3", tempdir / fname) + + # ensure failure if pass directory instead of filename + with pytest.raises(IsADirectoryError): + DiskAppendableArray(slice_shape, "float3", tempdir) + + +# filter warnings such that "ResourceWarning: Implictly cleaning up" is raised as error +# unfortunately need to do this indirectly by finding "finalize" string in PytestUnraisableException +@pytest.mark.filterwarnings("error:.*finalize.*:pytest.PytestUnraisableExceptionWarning") +def test_on_disk_median(tmp_cwd): + + library_length = 3 + frame_shape = (21,20) + dtype = 'float32' + tempdir = tmp_cwd / Path("tmptest") + os.mkdir(tempdir) + shape = (library_length,) + frame_shape + + median_computer = OnDiskMedian(shape, dtype=dtype, tempdir=tempdir) + + # test compute buffer indices + # buffer size equals size of single input model by default + # which means we expect same number of sections as library length + # in reality there is often one more section than that because + # of necessity of integer number of rows per section, but math is exact in this case + expected_buffer_size = frame_shape[0] * frame_shape[1] * np.dtype(dtype).itemsize + expected_section_nrows = frame_shape[0] // library_length + assert median_computer.nsections == library_length + assert median_computer.section_nrows == expected_section_nrows + assert median_computer.buffer_size == expected_buffer_size + + # test temp file setup + assert len(os.listdir(tempdir)) == 1 + assert str(median_computer._temp_path).startswith(str(tempdir)) + assert len(os.listdir(median_computer._temp_path)) == library_length + # check cwd and parent tempdir contain no files + assert all([not os.path.isfile(f) for f in os.listdir(tmp_cwd)]) + assert all([not os.path.isfile(f) for f in os.listdir(tempdir)]) + + # test validate data + with pytest.raises(ValueError): + candidate = np.zeros((20,20), dtype=dtype) + median_computer.add_image(candidate) + with pytest.raises(ValueError): + candidate = np.zeros((21,20), dtype='float64') + median_computer.add_image(candidate) + + # test add and compute + candidate0 = np.full(frame_shape, 3, dtype=dtype) + candidate1 = np.full(frame_shape, 2, dtype=dtype) + candidate2 = np.full(frame_shape, np.nan, dtype=dtype) + for candidate in [candidate0, candidate1, candidate2]: + median_computer.add_image(candidate) + median = median_computer.compute_median() + median_computer.cleanup() + assert median.shape == frame_shape + assert np.all(median == 2.5) + + # test expected error trying to add too many frames + # for loop to ensure always happens, not just the first time + candidate3 = np.zeros_like(candidate0) + for _ in range(2): + with pytest.raises(IndexError): + median_computer.add_image(candidate3) + + # test cleanup of tmpdir and everything else + assert not os.path.exists(median_computer._temp_path) + assert len(os.listdir(tempdir)) == 0 + assert len(os.listdir(os.getcwd())) == 1 # this is the "tmptest" directory + + +@pytest.mark.filterwarnings("error:.*finalize.*:pytest.PytestUnraisableExceptionWarning") +def test_on_disk_median_bad_inputs(tmp_cwd): + + library_length = 3 + frame_shape = (21,20) + dtype = 'float32' + tempdir = tmp_cwd / Path("tmptest") + os.mkdir(tempdir) + shape = (library_length,) + frame_shape + + with pytest.raises(ValueError): + OnDiskMedian(frame_shape, dtype=dtype, tempdir=tempdir) + + with pytest.raises(TypeError): + OnDiskMedian(shape, dtype="float3", tempdir=tempdir) + + with pytest.raises(FileNotFoundError): + OnDiskMedian(shape, dtype="float32", tempdir="dne") + + # ensure unreasonable buffer size will get set to minimum reasonable buffer + min_buffer = np.dtype(dtype).itemsize*frame_shape[1]*library_length + median_computer = OnDiskMedian(shape, dtype=dtype, tempdir=tempdir, buffer_size=-1) + assert median_computer.buffer_size == min_buffer + median_computer.cleanup() \ No newline at end of file diff --git a/jwst/outlier_detection/tso.py b/jwst/outlier_detection/tso.py index 58aedd19b0..d0161dffe7 100644 --- a/jwst/outlier_detection/tso.py +++ b/jwst/outlier_detection/tso.py @@ -22,7 +22,6 @@ def detect_outliers( maskpt, rolling_window_width, snr, - asn_id, make_output_path, ): """ @@ -53,7 +52,7 @@ def detect_outliers( median_model = dm.CubeModel(data=medians) with dm.open(weighted_cube) as dm0: median_model.update(dm0) - save_median(median_model, make_output_path, asn_id) + save_median(median_model, make_output_path) del median_model # no need for blotting, resample is turned off for TSO diff --git a/jwst/outlier_detection/utils.py b/jwst/outlier_detection/utils.py index 2bc6b2ff75..03446aac5e 100644 --- a/jwst/outlier_detection/utils.py +++ b/jwst/outlier_detection/utils.py @@ -1,14 +1,20 @@ """ The ever-present utils sub-module. A home for all... """ -import warnings +import copy +from functools import partial +import warnings import numpy as np +import tempfile +from pathlib import Path from jwst.lib.pipe_utils import match_nans_and_flags from jwst.resample.resample import compute_image_pixel_area +from jwst.resample.resample_utils import build_driz_weight from stcal.outlier_detection.utils import compute_weight_threshold, gwcs_blot, flag_crs, flag_resampled_crs from stdatamodels.jwst import datamodels +from . import _fileio import logging log = logging.getLogger(__name__) @@ -20,177 +26,422 @@ _ONE_MB = 1 << 20 +def nanmedian3D(cube, overwrite_input=True): + """Compute the median of a cube ignoring warnings and with memory efficiency. + np.nanmedian always uses at least 64-bit precision internally, and this is too + memory-intensive. Instead, loop over the median calculation to avoid the + memory usage of the internal upcasting and temporary array allocation. + The additional runtime of this loop is indistinguishable from zero, + but this loop cuts overall step memory usage roughly in half for at least one + test association. + """ + with warnings.catch_warnings(): + warnings.filterwarnings(action="ignore", + message="All-NaN slice encountered", + category=RuntimeWarning) + output_arr = np.empty(cube.shape[1:], dtype=np.float32) + for i in range(output_arr.shape[0]): + # this for loop looks silly, but see docstring above + np.nanmedian(cube[:,i,:], axis=0, overwrite_input=overwrite_input, out=output_arr[i,:]) + return output_arr + + def create_cube_median(cube_model, maskpt): log.info("Computing median") weight_threshold = compute_weight_threshold(cube_model.wht, maskpt) - median = np.nanmedian( + # not safe to use overwrite_input=True here because we are operating on model.data directly + return nanmedian3D( np.ma.masked_array(cube_model.data, np.less(cube_model.wht, weight_threshold), fill_value=np.nan), - axis=0, - ) - return median + overwrite_input=False) -def create_median(resampled_models, maskpt, buffer_size=10.0): - """Create a median image from the singly resampled images. +def median_without_resampling(input_models, + maskpt, + weight_type, + good_bits, + save_intermediate_results=False, + make_output_path=None, + buffer_size=None): + """ + Shared code between imaging and spec modes for resampling and median computation Parameters ---------- - resampled_models : ModelLibrary - The singly resampled images. + input_models : ModelLibrary + The input datamodels. maskpt : float The weight threshold for masking out low weight pixels. - buffer_size : float - The size of chunk in MB, per input model, that will be read into memory. + weight_type : str + The type of weighting to use when combining images. Options are: + 'ivm' (inverse variance) or 'exptime' (exposure time). + + good_bits : int + The bit values that are considered good when determining the + data quality of the input. + + save_intermediate_results : bool + if True, save the drizzled models and median model to fits. + + make_output_path : function + The functools.partial instance to pass to save_median. Must be + specified if save_intermediate_results is True. Default None. + + buffer_size : int + The size of chunk in bytes that will be read into memory when computing the median. This parameter has no effect if the input library has its on_disk attribute set to False. + """ + in_memory = not input_models._on_disk + ngroups = len(input_models) + + with input_models: + for i in range(len(input_models)): + + drizzled_model = input_models.borrow(i) + drizzled_model.wht = build_driz_weight(drizzled_model, + weight_type=weight_type, + good_bits=good_bits) + median_wcs = copy.deepcopy(drizzled_model.meta.wcs) + input_models.shelve(drizzled_model, i, modify=True) + + if save_intermediate_results: + # write the drizzled model to file + _fileio.save_drizzled(drizzled_model, make_output_path) + + if i == 0: + input_shape = (ngroups,)+drizzled_model.data.shape + dtype = drizzled_model.data.dtype + median_computer = _make_median_computer(input_shape, in_memory, buffer_size, dtype) + + weight_threshold = compute_weight_threshold(drizzled_model.wht, maskpt) + drizzled_model.data[drizzled_model.wht < weight_threshold] = np.nan + _append_to_median_computer(median_computer, i, drizzled_model.data, in_memory) + + # Perform median combination on set of drizzled mosaics + median_data = _evaluate_median_computer(median_computer, in_memory) + + if save_intermediate_results: + # Save median model to fits + median_model = datamodels.ImageModel(median_data) + median_model.update(drizzled_model) + median_model.meta.wcs = median_wcs + _fileio.save_median(median_model, make_output_path) + del drizzled_model + + return median_data, median_wcs + + +def median_with_resampling(input_models, + resamp, + maskpt, + save_intermediate_results=False, + make_output_path=None, + buffer_size=None): + """ + Shared code between imaging and spec modes for resampling and median computation - Returns - ------- - median_image : ndarray - The median image. + Parameters + ---------- + input_models : ModelLibrary + The input datamodels. + + resamp : resample.resample.ResampleData object + The controlling object for the resampling process. + + maskpt : float + The weight threshold for masking out low weight pixels. + + save_intermediate_results : bool + if True, save the drizzled models and median model to fits. + + make_output_path : function + The functools.partial instance to pass to save_median. Must be + specified if save_intermediate_results is True. Default None. + + buffer_size : int + The size of chunk in bytes that will be read into memory when computing the median. + This parameter has no effect if the input library has its on_disk attribute + set to False. """ - on_disk = resampled_models._on_disk - - # Compute the weight threshold for each input model - weight_thresholds = [] - model_list = [] - with resampled_models: - for resampled in resampled_models: - weight_threshold = compute_weight_threshold(resampled.wht, maskpt) - if not on_disk: - # handle weights right away for in-memory case - data = resampled.data.copy() - data[resampled.wht < weight_threshold] = np.nan - model_list.append(data) - del data - else: - weight_thresholds.append(weight_threshold) - resampled_models.shelve(resampled, modify=False) - del resampled + if not resamp.single: + raise ValueError("median_with_resampling should only be used for resample_many_to_many") - # easier case: all models in library can be loaded into memory at once - if not on_disk: - with warnings.catch_warnings(): - warnings.filterwarnings(action="ignore", - message="All-NaN slice encountered", - category=RuntimeWarning) - return np.nanmedian(np.array(model_list), axis=0) - else: - # set up buffered access to all input models - with resampled_models: - example_model = resampled_models.borrow(0) - shp = example_model.data.shape - dtype = example_model.data.dtype - nsections, section_nrows = _compute_buffer_indices(example_model, buffer_size) - resampled_models.shelve(example_model, modify=False) - del example_model + in_memory = not input_models._on_disk + indices_by_group = list(input_models.group_indices.values()) + ngroups = len(indices_by_group) - # get spatial sections of library and compute timewise median, one by one - resampled_sections = _get_sections(resampled_models, nsections, section_nrows, shp[0]) - median_image_empty = np.empty(shp, dtype) * np.nan - return _create_median(resampled_sections, resampled_models, weight_thresholds, median_image_empty) + with input_models: + for i, indices in enumerate(indices_by_group): + median_wcs = resamp.output_wcs + drizzled_model = resamp.resample_group(input_models, indices) -def _get_sections(library, nsections, section_nrows, imrows): - """Iterator to return sections from a ModelLibrary. + if save_intermediate_results: + # write the drizzled model to file + _fileio.save_drizzled(drizzled_model, make_output_path) + + if i == 0: + input_shape = (ngroups,)+drizzled_model.data.shape + dtype = drizzled_model.data.dtype + median_computer = _make_median_computer(input_shape, in_memory, buffer_size, dtype) + + weight_threshold = compute_weight_threshold(drizzled_model.wht, maskpt) + drizzled_model.data[drizzled_model.wht < weight_threshold] = np.nan + _append_to_median_computer(median_computer, i, drizzled_model.data, in_memory) - Parameters - ---------- - library : ModelLibrary - The input data models. - nsections : int - The number of spatial sections in each model + # Perform median combination on set of drizzled mosaics + median_data = _evaluate_median_computer(median_computer, in_memory) - section_nrows : int - The number of rows in each section + if save_intermediate_results: + # Save median model to fits + median_model = datamodels.ImageModel(median_data) + median_model.update(drizzled_model) + median_model.meta.wcs = median_wcs + # drizzled model already contains asn_id + make_output_path = partial(make_output_path, asn_id=None) + _fileio.save_median(median_model, make_output_path) + del drizzled_model - imrows : int - The total number of rows in the image + return median_data, median_wcs - Yields - ------ - data_subset : ndarray - array of shape (len(library), section_nrows, ncols) representing a spatial - subset of all the data arrays in the library - weight_subset : ndarray - weights corresponding to data_list +def _make_median_computer(full_shape, in_memory, buffer_size, dtype): + + if in_memory: + # allocate memory for data arrays that go into median + median_computer = np.empty(full_shape, dtype=np.float32) + else: + # set up temporary storage for data arrays that go into median + median_computer = OnDiskMedian(full_shape, + dtype=dtype, + buffer_size=buffer_size) + return median_computer + + +def _append_to_median_computer(median_computer, idx, data, in_memory): + if in_memory: + # populate pre-allocated memory with the drizzled data + median_computer[idx] = data + else: + # distribute the drizzled data into the temporary storage + median_computer.add_image(data) + + +def _evaluate_median_computer(median_computer, in_memory): + if in_memory: + median_data = nanmedian3D(median_computer) + del median_computer + else: + median_data = median_computer.compute_median() + median_computer.cleanup() + return median_data + - row_range : tuple - The range of rows in the image covered by the data arrays +class DiskAppendableArray: """ - with library: - example_model = library.borrow(0) - dtype = example_model.data.dtype - dtype_wht = example_model.wht.dtype - shp = example_model.data.shape - library.shelve(example_model, 0, modify=False) - del example_model - for i in range(nsections): - row1 = i * section_nrows - row2 = min(row1 + section_nrows, imrows) + Creates a temporary file to which to append data, in order to perform + timewise operations on a stack of input images without holding all of them + in memory. + + This class is purpose-built for the median computation during outlier detection + and is not very flexible. It is assumed that each data array passed to `append` + represents the same spatial segment of the full dataset. It is also assumed that + each data array passed to `append` represents only a single instant in time; + the append operation will stack them along a new axis. + + The `read` operation is only capable of loading the full array back into memory. + When working with large datasets that do not fit in memory, the + required workflow is to create many DiskAppendableArray objects, each holding + a small spatial segment of the full dataset. + """ + + def __init__(self, slice_shape, dtype, filename): + """ + Parameters + ---------- + slice_shape : tuple + The shape of the spatial section of input data to be appended to the array. + + dtype : str + The data type of the array. Must be a valid numpy array datatype. + + filename : str + The full file path in which to store the array + """ + if len(slice_shape) != 2: + raise ValueError(f"Invalid slice_shape {slice_shape}. Only 2-D arrays are supported.") + self._filename = Path(filename) + with open(filename, "wb") as f: # noqa: F841 + pass + self._slice_shape = slice_shape + self._dtype = np.dtype(dtype) + self._append_count = 0 + + + @property + def shape(self): + return (self._append_count,) + self._slice_shape + + + def append(self, data): + """Add a new slice to the temporary file""" + if data.shape != self._slice_shape: + raise ValueError(f"Data shape {data.shape} does not match slice shape {self._slice_shape}") + if data.dtype != self._dtype: + raise ValueError(f"Data dtype {data.dtype} does not match array dtype {self._dtype}") + with open(self._filename, "ab") as f: + data.tofile(f, sep="") + self._append_count += 1 + + + def read(self): + """Read the 3-D array into memory""" + shp = (self._append_count,) + self._slice_shape + with open(self._filename, "rb") as f: + output = np.fromfile(f, dtype=self._dtype).reshape(shp) + return output + + +class OnDiskMedian: + + def __init__(self, shape, dtype='float32', tempdir="", buffer_size=None): + """ + Set up temporary files to perform operations on a stack of 2-D input arrays + along the stacking axis (e.g., a time axis) without + holding all of them in memory. Currently the only supported operation + is the median. + + Parameters + ---------- + shape: tuple + The shape of the entire input, (n_images, imrows, imcols). + + dtype : str + The data type of the input data. + + tempdir : str + The parent directory in which to create the temporary directory, + which itself holds all the DiskAppendableArray tempfiles. + Default is the current working directory. + + buffer_size : int, optional + The buffer size, units of bytes. + Default is the size of one input image. + """ + if len(shape) != 3: + raise ValueError(f"Invalid input shape {shape}; only three-dimensional data are supported.") + self._expected_nframes = shape[0] + self.frame_shape = shape[1:] + self.dtype = np.dtype(dtype) + self.itemsize = self.dtype.itemsize + self._temp_dir = tempfile.TemporaryDirectory(dir=tempdir) + self._temp_path = Path(self._temp_dir.name) + + # figure out number of sections and rows per section that are needed + self.nsections, self.section_nrows = self._get_buffer_indices(buffer_size=buffer_size) + self.slice_shape = (self.section_nrows, shape[2]) + self._n_adds = 0 + + # instantiate a temporary DiskAppendableArray for each section + self._temp_arrays = self._temparray_setup(dtype) + + + def _get_buffer_indices(self, buffer_size=None): + """ + Parameters + ---------- + buffer_size : int, optional + The buffer size for the median computation, units of bytes. + + Returns + ------- + nsections : int + The number of sections to divide the input data into. + + section_nrows : int + The number of rows in each section (except the last one). + """ + imrows, imcols = self.frame_shape + if buffer_size is None: + buffer_size = imrows * imcols * self.itemsize + per_model_buffer_size = buffer_size / self._expected_nframes + min_buffer_size = imcols * self.itemsize + section_nrows = min(imrows, int(per_model_buffer_size // min_buffer_size)) + + if section_nrows <= 0: + buffer_size = min_buffer_size * self._expected_nframes + log.warning("Buffer size is too small to hold a single row." + f"Increasing buffer size to {buffer_size / _ONE_MB}MB") + section_nrows = 1 + self.buffer_size = buffer_size + + nsections = int(np.ceil(imrows / section_nrows)) + log.info(f"Computing median over {self._expected_nframes} groups in {nsections} " + f"sections with total memory buffer {buffer_size / _ONE_MB} MB") + return nsections, section_nrows + + + def _temparray_setup(self, dtype): + """Set up temp file handlers for each spatial section""" + temp_arrays = [] + for i in range(0, self.nsections): + shp = self.slice_shape + if i == self.nsections - 1: + # last section has whatever shape is left over + shp = (self.frame_shape[0] - (self.nsections-1) * self.section_nrows, self.frame_shape[1]) + arr = DiskAppendableArray(shp, dtype, self._temp_path / f"{i}.bin") + temp_arrays.append(arr) + return temp_arrays + + + def add_image(self, data): + """Split resampled model data into spatial sections and write to disk.""" + if self._n_adds >= self.nsections: + raise IndexError(f"Too many calls to add_image. Expected at most {self.nsections} input models.") + self._validate_data(data) + self._n_adds += 1 + for i in range(self.nsections): + row1 = i * self.section_nrows + row2 = min(row1 + self.section_nrows, self.frame_shape[0]) + arr = self._temp_arrays[i] + arr.append(data[row1:row2]) + + + def _validate_data(self, data): + if data.shape != self.frame_shape: + raise ValueError(f"Data shape {data.shape} does not match expected shape {self.frame_shape}") + if data.dtype != self.dtype: + raise ValueError(f"Data dtype {data.dtype} does not match expected dtype {self.dtype}") - data_subset = np.empty((len(library), row2 - row1, shp[1]), dtype) - weight_subset = np.empty((len(library), row2 - row1, shp[1]), dtype_wht) - with library: - for j, model in enumerate(library): - data_subset[j] = model.data[row1:row2] - weight_subset[j] = model.wht[row1:row2] - library.shelve(model, j, modify=False) - del model - yield (data_subset, weight_subset, (row1, row2)) - - -def _compute_buffer_indices(model, buffer_size=None): - - imrows, imcols = model.data.shape - data_item_size = model.data.itemsize - min_buffer_size = imcols * data_item_size - buffer_size = min_buffer_size if buffer_size is None else (buffer_size * _ONE_MB) - section_nrows = min(imrows, int(buffer_size // min_buffer_size)) - if section_nrows == 0: - buffer_size = min_buffer_size - log.warning("WARNING: Buffer size is too small to hold a single row." - f"Increasing buffer size to {buffer_size / _ONE_MB}MB") - section_nrows = 1 - nsections = int(np.ceil(imrows / section_nrows)) - return nsections, section_nrows - - -def _create_median(resampled_sections, resampled_models, weight_thresholds, median_image_empty): - median_image = median_image_empty - for (resampled_sci, resampled_weight, (row1, row2)) in resampled_sections: - # Create a mask for each input image, masking out areas where there is - # no data or the data has very low weight - badmasks = [] - for weight, weight_threshold in zip(resampled_weight, weight_thresholds): - badmask = np.less(weight, weight_threshold) - log.debug("Percentage of pixels with low weight: {}".format( - np.sum(badmask) / len(weight.flat) * 100)) - badmasks.append(badmask) - - # Fill resampled_sci array with nan's where mask values are True - for f1, f2 in zip(resampled_sci, badmasks): - for elem1, elem2 in zip(f1, f2): - elem1[elem2] = np.nan - - del badmasks - - # For a stack of images with "bad" data replaced with Nan - # use np.nanmedian to compute the median. - with warnings.catch_warnings(): - warnings.filterwarnings(action="ignore", - message="All-NaN slice encountered", - category=RuntimeWarning) - median_image[row1:row2] = np.nanmedian(resampled_sci, axis=0) - del resampled_sci, resampled_weight - - return median_image + + def cleanup(self): + """Remove the temporary files and directory when finished""" + self._temp_dir.cleanup() + return + + + def compute_median(self): + """Read spatial sections from disk and compute the median across groups + (median over number of exposures on a per-pixel basis)""" + row_indices = [(i * self.section_nrows, min((i+1) * self.section_nrows, self.frame_shape[0])) + for i in range(self.nsections)] + + output_rows = row_indices[-1][1] + output_cols = self._temp_arrays[0].shape[2] + median_image = np.full((output_rows, output_cols), np.nan, dtype=self.dtype) + + for i, disk_arr in enumerate(self._temp_arrays): + row1, row2 = row_indices[i] + arr = disk_arr.read() + median_image[row1:row2] = nanmedian3D(arr) + del arr, disk_arr + + return median_image def flag_crs_in_models( @@ -226,14 +477,7 @@ def flag_resampled_model_crs( blot = gwcs_blot(median_data, median_wcs, input_model.data.shape, input_model.meta.wcs, pix_ratio) if save_blot: - if make_output_path is None: - raise ValueError("make_output_path must be provided if save_blot is True") - model_path = make_output_path(input_model.meta.filename, suffix='blot') - blot_model = _make_blot_model(input_model, blot) - blot_model.meta.filename = model_path - blot_model.save(model_path) - log.info(f"Saved model in {model_path}") - del blot_model + _fileio.save_blot(input_model, blot, make_output_path) # dq flags will be updated in-place _flag_resampled_model_crs(input_model, blot, snr1, snr2, scale1, scale2, backg) @@ -304,10 +548,3 @@ def flag_model_crs(image, blot, snr): match_nans_and_flags(image) log.info(f"{np.count_nonzero(cr_mask)} pixels marked as outliers") - - -def _make_blot_model(input_model, blot): - blot_model = type(input_model)() - blot_model.data = blot - blot_model.update(input_model) - return blot_model \ No newline at end of file diff --git a/jwst/regtest/test_nirspec_fs_spec3.py b/jwst/regtest/test_nirspec_fs_spec3.py index 4f3bca7edd..4fd2d62cf5 100644 --- a/jwst/regtest/test_nirspec_fs_spec3.py +++ b/jwst/regtest/test_nirspec_fs_spec3.py @@ -1,6 +1,7 @@ from astropy.io.fits.diff import FITSDiff import pytest import numpy as np +import os from gwcs import wcstools from jwst.stpipe import Step @@ -20,20 +21,27 @@ def run_pipeline(rtdata_module): # Run the calwebb_spec3 pipeline; save results from intermediate steps args = ["calwebb_spec3", rtdata.input, "--steps.outlier_detection.save_results=true", + "--steps.outlier_detection.save_intermediate_results=true", "--steps.resample_spec.save_results=true", "--steps.extract_1d.save_results=true"] Step.from_cmdline(args) @pytest.mark.bigdata -@pytest.mark.parametrize("suffix", ["cal", "crf", "s2d", "x1d"]) +@pytest.mark.parametrize("suffix", ["cal", "crf", "s2d", "x1d", "median"]) @pytest.mark.parametrize("source_id,slit_name", [("s000000001","s200a2"), ("s000000021","s200a1"), ("s000000023","s400a1"), ("s000000024","s1600a1"), ("s000000025","s200b1")]) def test_nirspec_fs_spec3(run_pipeline, rtdata_module, fitsdiff_default_kwargs, suffix, source_id, slit_name): """Test spec3 pipeline on a set of NIRSpec FS exposures.""" rtdata = rtdata_module - output = f"jw01309-o022_{source_id}_nirspec_f290lp-g395h-{slit_name}-allslits_{suffix}.fits" + if suffix == "median": + output = f"jw01309022001_04102_00002_nrs2_{slit_name}_{suffix}.fits" + # also ensure drizzled and blot models were created with the correct names + assert os.path.isfile(output.replace("median", "outlier_s2d")) + assert os.path.isfile(output.replace("median", "blot")) + else: + output = f"jw01309-o022_{source_id}_nirspec_f290lp-g395h-{slit_name}-allslits_{suffix}.fits" rtdata.output = output rtdata.get_truth(f"truth/test_nirspec_fs_spec3/{output}") diff --git a/jwst/resample/resample.py b/jwst/resample/resample.py index 6350c3496e..7d70ae840d 100644 --- a/jwst/resample/resample.py +++ b/jwst/resample/resample.py @@ -251,6 +251,82 @@ def _get_intensity_scale(self, img): else: iscale = 1.0 return iscale + + def resample_group(self, input_models, indices): + """Apply resample_many_to_many for one group + + Parameters + ---------- + input_models : ModelLibrary + + indices : list + """ + output_model = self.blank_output.copy() + + copy_asn_info_from_library(input_models, output_model) + + with input_models: + example_image = input_models.borrow(indices[0]) + + # Determine output file type from input exposure filenames + # Use this for defining the output filename + indx = example_image.meta.filename.rfind('.') + output_type = example_image.meta.filename[indx:] + output_root = '_'.join(example_image.meta.filename.replace( + output_type, '').split('_')[:-1]) + output_model.meta.filename = ( + f'{output_root}_' + f'{self.intermediate_suffix}{output_type}') + input_models.shelve(example_image, indices[0], modify=False) + del example_image + + # Initialize the output with the wcs + driz = gwcs_drizzle.GWCSDrizzle(output_model, pixfrac=self.pixfrac, + kernel=self.kernel, fillval=self.fillval) + + log.info(f"{len(indices)} exposures to drizzle together") + for index in indices: + img = input_models.borrow(index) + if isinstance(img, datamodels.SlitModel): + # must call this explicitly to populate area extension + # although the existence of this extension may not be necessary + img.area = img.area + iscale = self._get_intensity_scale(img) + log.debug(f'Using intensity scale iscale={iscale}') + + inwht = resample_utils.build_driz_weight( + img, + weight_type=self.weight_type, + good_bits=self.good_bits + ) + + # apply sky subtraction + blevel = img.meta.background.level + if not img.meta.background.subtracted and blevel is not None: + data = img.data - blevel + else: + data = img.data + + xmin, xmax, ymin, ymax = resample_utils._resample_range( + data.shape, + img.meta.wcs.bounding_box + ) + + driz.add_image( + data, + img.meta.wcs, + iscale=iscale, + inwht=inwht, + xmin=xmin, + xmax=xmax, + ymin=ymin, + ymax=ymax + ) + del data + input_models.shelve(img, index, modify=False) + del img + + return output_model def resample_many_to_many(self, input_models): """Resample many inputs to many outputs where outputs have a common frame. @@ -263,75 +339,8 @@ def resample_many_to_many(self, input_models): """ output_models = [] for group_id, indices in input_models.group_indices.items(): - output_model = self.blank_output - - copy_asn_info_from_library(input_models, output_model) - - with input_models: - example_image = input_models.borrow(indices[0]) - - # Determine output file type from input exposure filenames - # Use this for defining the output filename - indx = example_image.meta.filename.rfind('.') - output_type = example_image.meta.filename[indx:] - output_root = '_'.join(example_image.meta.filename.replace( - output_type, '').split('_')[:-1]) - if self.asn_id is not None: - output_model.meta.filename = ( - f'{output_root}_{self.asn_id}_' - f'{self.intermediate_suffix}{output_type}') - else: - output_model.meta.filename = ( - f'{output_root}_' - f'{self.intermediate_suffix}{output_type}') - input_models.shelve(example_image, indices[0], modify=False) - del example_image - - # Initialize the output with the wcs - driz = gwcs_drizzle.GWCSDrizzle(output_model, pixfrac=self.pixfrac, - kernel=self.kernel, fillval=self.fillval) - - log.info(f"{len(indices)} exposures to drizzle together") - for index in indices: - img = input_models.borrow(index) - if isinstance(img, datamodels.SlitModel): - # must call this explicitly to populate area extension - # although the existence of this extension may not be necessary - img.area = img.area - iscale = self._get_intensity_scale(img) - log.debug(f'Using intensity scale iscale={iscale}') - - inwht = resample_utils.build_driz_weight( - img, - weight_type=self.weight_type, - good_bits=self.good_bits - ) - - # apply sky subtraction - blevel = img.meta.background.level - if not img.meta.background.subtracted and blevel is not None: - data = img.data - blevel - else: - data = img.data - - xmin, xmax, ymin, ymax = resample_utils._resample_range( - data.shape, - img.meta.wcs.bounding_box - ) - - driz.add_image( - data, - img.meta.wcs, - iscale=iscale, - inwht=inwht, - xmin=xmin, - xmax=xmax, - ymin=ymin, - ymax=ymax - ) - del data - input_models.shelve(img, index, modify=False) - del img + + output_model = self.resample_group(input_models, indices) if not self.in_memory: # Write out model to disk, then return filename @@ -342,9 +351,7 @@ def resample_many_to_many(self, input_models): log.info(f"Saved model in {output_name}") output_models.append(output_name) else: - output_models.append(output_model.copy()) - output_model.data *= 0. - output_model.wht *= 0. + output_models.append(output_model) if not self.in_memory: # build ModelLibrary as an association from the output files