Skip to content

Commit

Permalink
Use dask map_blocks() in _get_array()
Browse files Browse the repository at this point in the history
This includes:
- Extracting the `_get_array()` method so that it is now a function in the module and not a class method.
- Introduction of `NativeMSGFileHandler_make_dask_array_with_map_blocks()` method to utilize the dask `map_blocks()`.
- Introduction of a new method, namely `NativeMSGFileHandler._number_of_visir_channels` to facilitate testing and mock patching.
- Adapting the mock patches in tests accordingly.
  • Loading branch information
pkhalaj committed Jul 31, 2024
1 parent 57744bc commit 9fc2a05
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
42 changes: 33 additions & 9 deletions satpy/readers/seviri_l1b_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,27 @@ def __init__(self, filename, filename_info, filetype_info,
# Available channels are known only after the header has been read
self.header_type = get_native_header(has_archive_header(self.filename))
self._read_header()
self.dask_array = da.from_array(self._get_array(), chunks=(CHUNK_SIZE,))
self._make_dask_array_with_map_blocks()
self._read_trailer()
self.image_boundaries = ImageBoundaries(self.header, self.trailer, self.mda)

def _make_dask_array_with_map_blocks(self):
"""Makes the dask array using the ``da.map_blocks()`` functionality."""
dtype = self._get_data_dtype()
chunks = da.core.normalize_chunks(
"auto",
shape=(self.mda["number_of_lines"],),
dtype=dtype)
self.dask_array = da.map_blocks(
_get_array,
dtype=dtype,
chunks=chunks,
meta=np.zeros(1, dtype=dtype),
# The following will be passed as keyword arguments to the `_get_array()` function.
filename=self.filename,
hdr_size=self.header_type.itemsize
)

@property
def _repeat_cycle_duration(self):
"""Get repeat cycle duration from the trailer."""
Expand Down Expand Up @@ -266,21 +283,17 @@ def get_lrec(cols):
# each pixel is 10-bits -> one line of data has 25% more bytes
# than the number of columns suggest (10/8 = 1.25)
visir_rec = get_lrec(int(self.mda["number_of_columns"] * 1.25))
number_of_visir_channels = len(
[s for s in self.mda["channel_list"] if not s == "HRV"])
drec = [("visir", (visir_rec, number_of_visir_channels))]
drec = [("visir", (visir_rec, self._number_of_visir_channels()))]

if self.mda["available_channels"]["HRV"]:
hrv_rec = get_lrec(int(self.mda["hrv_number_of_columns"] * 1.25))
drec.append(("hrv", (hrv_rec, 3)))

return np.dtype(drec)

def _get_array(self):
"""Get the numpy array for the SEVIRI data."""
data_dtype = self._get_data_dtype()
hdr_size = self.header_type.itemsize
return fromfile(self.filename, dtype=data_dtype, offset=hdr_size, count=self.mda["number_of_lines"])
def _number_of_visir_channels(self):
"""Returns the number of visir channels, i.e. all channels excluding ``HRV``."""
return len([s for s in self.mda["channel_list"] if not s == "HRV"])

def _read_header(self):
"""Read the header info."""
Expand Down Expand Up @@ -891,3 +904,14 @@ def read_header(filename):
dtype = get_native_header(has_archive_header(filename))
hdr = fromfile(filename, dtype=dtype, count=1)
return recarray2dict(hdr)


def _get_array(filename=None, hdr_size=None, block_info=None):
"""Get the numpy array for the SEVIRI data."""
output_block_info = block_info[None]
data_dtype = output_block_info["dtype"]
return fromfile(
filename,
dtype=data_dtype,
offset=hdr_size + output_block_info["array-location"][0][0] * data_dtype.itemsize,
count=output_block_info["chunk-shape"][0])
21 changes: 17 additions & 4 deletions satpy/tests/reader_tests/test_seviri_l1b_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,9 @@ def prepare_area_definitions(test_dict):

with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \
mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \
mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \
mock.patch(
"satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \
mock.patch(
"satpy.readers.seviri_l1b_native.has_archive_header"
Expand All @@ -647,6 +649,7 @@ def prepare_area_definitions(test_dict):
fromfile.return_value = header
recarray2dict.side_effect = (lambda x: x)
_get_array.return_value = np.arange(3)
_n_visir_ch.return_value = 11
fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None)
fh.fill_disk = fill_disk
fh.header = header
Expand Down Expand Up @@ -722,7 +725,9 @@ def prepare_is_roi(test_dict):

with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \
mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \
mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \
mock.patch(
"satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \
mock.patch(
"satpy.readers.seviri_l1b_native.has_archive_header"
Expand All @@ -731,6 +736,7 @@ def prepare_is_roi(test_dict):
fromfile.return_value = header
recarray2dict.side_effect = (lambda x: x)
_get_array.return_value = np.arange(3)
_n_visir_ch.return_value = 11
fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None)
fh.header = header
fh.trailer = trailer
Expand Down Expand Up @@ -1172,12 +1178,15 @@ def test_header_type(file_content, exp_header_size):
header.pop("15_SECONDARY_PRODUCT_HEADER")
with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \
mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \
mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \
mock.patch(
"satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \
mock.patch("satpy.readers.seviri_l1b_native.generic_open", mock.mock_open(read_data=file_content)):
fromfile.return_value = header
recarray2dict.side_effect = (lambda x: x)
_get_array.return_value = np.arange(3)
_n_visir_ch.return_value = 11
fh = NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None)
assert fh.header_type.itemsize == exp_header_size
assert "15_SECONDARY_PRODUCT_HEADER" in fh.header
Expand All @@ -1202,7 +1211,9 @@ def test_header_warning():

with mock.patch("satpy.readers.seviri_l1b_native.fromfile") as fromfile, \
mock.patch("satpy.readers.seviri_l1b_native.recarray2dict") as recarray2dict, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._get_array") as _get_array, \
mock.patch("satpy.readers.seviri_l1b_native._get_array") as _get_array, \
mock.patch(
"satpy.readers.seviri_l1b_native.NativeMSGFileHandler._number_of_visir_channels") as _n_visir_ch, \
mock.patch("satpy.readers.seviri_l1b_native.NativeMSGFileHandler._read_trailer"), \
mock.patch("satpy.readers.seviri_l1b_native.generic_open", mock.mock_open(read_data=ASCII_STARTSWITH)):
recarray2dict.side_effect = (lambda x: x)
Expand All @@ -1211,6 +1222,8 @@ def test_header_warning():
exp_warning = "The quality flag for this file indicates not OK. Use this data with caution!"

fromfile.return_value = header_good
_n_visir_ch.return_value = 11

with warnings.catch_warnings():
warnings.simplefilter("error")
NativeMSGFileHandler(filename=None, filename_info={}, filetype_info=None)
Expand Down

0 comments on commit 9fc2a05

Please sign in to comment.