From 9fc2a05ef9624640fd2c7a82ae8352ce7383ac0f Mon Sep 17 00:00:00 2001 From: Pouria Khalaj Date: Tue, 30 Jul 2024 15:53:50 +0200 Subject: [PATCH] Use dask `map_blocks()` in `_get_array()` This includes: - Extracting the `_get_array()` method so that it is now a function in the module and not a class method. - Introduction of `NativeMSGFileHandler_make_dask_array_with_map_blocks()` method to utilize the dask `map_blocks()`. - Introduction of a new method, namely `NativeMSGFileHandler._number_of_visir_channels` to facilitate testing and mock patching. - Adapting the mock patches in tests accordingly. --- satpy/readers/seviri_l1b_native.py | 42 +++++++++++++++---- .../reader_tests/test_seviri_l1b_native.py | 21 ++++++++-- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/satpy/readers/seviri_l1b_native.py b/satpy/readers/seviri_l1b_native.py index d5f3ba2692..b09a947e1c 100644 --- a/satpy/readers/seviri_l1b_native.py +++ b/satpy/readers/seviri_l1b_native.py @@ -193,10 +193,27 @@ def __init__(self, filename, filename_info, filetype_info, # Available channels are known only after the header has been read self.header_type = get_native_header(has_archive_header(self.filename)) self._read_header() - self.dask_array = da.from_array(self._get_array(), chunks=(CHUNK_SIZE,)) + self._make_dask_array_with_map_blocks() self._read_trailer() self.image_boundaries = ImageBoundaries(self.header, self.trailer, self.mda) + def _make_dask_array_with_map_blocks(self): + """Makes the dask array using the ``da.map_blocks()`` functionality.""" + dtype = self._get_data_dtype() + chunks = da.core.normalize_chunks( + "auto", + shape=(self.mda["number_of_lines"],), + dtype=dtype) + self.dask_array = da.map_blocks( + _get_array, + dtype=dtype, + chunks=chunks, + meta=np.zeros(1, dtype=dtype), + # The following will be passed as keyword arguments to the `_get_array()` function. + filename=self.filename, + hdr_size=self.header_type.itemsize + ) + @property def _repeat_cycle_duration(self): """Get repeat cycle duration from the trailer.""" @@ -266,9 +283,7 @@ def get_lrec(cols): # each pixel is 10-bits -> one line of data has 25% more bytes # than the number of columns suggest (10/8 = 1.25) visir_rec = get_lrec(int(self.mda["number_of_columns"] * 1.25)) - number_of_visir_channels = len( - [s for s in self.mda["channel_list"] if not s == "HRV"]) - drec = [("visir", (visir_rec, number_of_visir_channels))] + drec = [("visir", (visir_rec, self._number_of_visir_channels()))] if self.mda["available_channels"]["HRV"]: hrv_rec = get_lrec(int(self.mda["hrv_number_of_columns"] * 1.25)) @@ -276,11 +291,9 @@ def get_lrec(cols): return np.dtype(drec) - def _get_array(self): - """Get the numpy array for the SEVIRI data.""" - data_dtype = self._get_data_dtype() - hdr_size = self.header_type.itemsize - return fromfile(self.filename, dtype=data_dtype, offset=hdr_size, count=self.mda["number_of_lines"]) + def _number_of_visir_channels(self): + """Returns the number of visir channels, i.e. all channels excluding ``HRV``.""" + return len([s for s in self.mda["channel_list"] if not s == "HRV"]) def _read_header(self): """Read the header info.""" @@ -891,3 +904,14 @@ def read_header(filename): dtype = get_native_header(has_archive_header(filename)) hdr = fromfile(filename, dtype=dtype, count=1) return recarray2dict(hdr) + + +def _get_array(filename=None, hdr_size=None, block_info=None): + """Get the numpy array for the SEVIRI data.""" + output_block_info = block_info[None] + data_dtype = output_block_info["dtype"] + return fromfile( + filename, + dtype=data_dtype, + offset=hdr_size + output_block_info["array-location"][0][0] * data_dtype.itemsize, + count=output_block_info["chunk-shape"][0]) diff --git a/satpy/tests/reader_tests/test_seviri_l1b_native.py b/satpy/tests/reader_tests/test_seviri_l1b_native.py index e88ff059b3..4c09ca0381 100644 --- a/satpy/tests/reader_tests/test_seviri_l1b_native.py +++ b/satpy/tests/reader_tests/test_seviri_l1b_native.py @@ -638,7 +638,9 @@ def prepare_area_definitions(test_dict): with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \ mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \ - mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \ + mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \ + mock.patch( + "satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \ mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \ mock.patch( "satpy.readers.seviri_l1b_native.has_archive_header" @@ -647,6 +649,7 @@ def prepare_area_definitions(test_dict): fromfile.return_value = header recarray2dict.side_effect = (lambda x: x) _get_array.return_value = np.arange(3) + _n_visir_ch.return_value = 11 fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None) fh.fill_disk = fill_disk fh.header = header @@ -722,7 +725,9 @@ def prepare_is_roi(test_dict): with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \ mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \ - mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \ + mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \ + mock.patch( + "satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \ mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \ mock.patch( "satpy.readers.seviri_l1b_native.has_archive_header" @@ -731,6 +736,7 @@ def prepare_is_roi(test_dict): fromfile.return_value = header recarray2dict.side_effect = (lambda x: x) _get_array.return_value = np.arange(3) + _n_visir_ch.return_value = 11 fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None) fh.header = header fh.trailer = trailer @@ -1172,12 +1178,15 @@ def test_header_type(file_content, exp_header_size): header.pop("15_SECONDARY_PRODUCT_HEADER") with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \ mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \ - mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \ + mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \ + mock.patch( + "satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \ mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \ mock.patch("satpy.readers.seviri_l1b_native.generic_open", mock.mock_open(read_data=file_content)): fromfile.return_value = header recarray2dict.side_effect = (lambda x: x) _get_array.return_value = np.arange(3) + _n_visir_ch.return_value = 11 fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None) assert fh.header_type.itemsize == exp_header_size assert "15_SECONDARY_PRODUCT_HEADER" in fh.header @@ -1202,7 +1211,9 @@ def test_header_warning(): with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \ mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \ - mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \ + mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \ + mock.patch( + "satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \ mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \ mock.patch("satpy.readers.seviri_l1b_native.generic_open", mock.mock_open(read_data=ASCII_STARTSWITH)): recarray2dict.side_effect = (lambda x: x) @@ -1211,6 +1222,8 @@ def test_header_warning(): exp_warning = "The quality flag for this file indicates not OK. Use this data with caution!" fromfile.return_value = header_good + _n_visir_ch.return_value = 11 + with warnings.catch_warnings(): warnings.simplefilter("error") NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None)