Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for channel-specific SEVIRI calibration #2997

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 144 additions & 96 deletions satpy/readers/seviri_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python

Check warning on line 1 in satpy/readers/seviri_base.py

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ Getting worse: Low Cohesion

The number of different responsibilities increases from 9 to 11, threshold = 4. Cohesion is calculated using the LCOM4 metric. Low cohesion means that the module/class has multiple unrelated responsibilities, doing too many things and breaking the Single Responsibility Principle.
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2018 Satpy developers
#
Expand Down Expand Up @@ -201,14 +201,15 @@

import datetime as dt
import warnings
from collections import namedtuple

import dask.array as da
import numpy as np
import pyproj
from numpy.polynomial.chebyshev import Chebyshev

import satpy.readers.utils as utils
from satpy.readers.eum_base import issue_revision, time_cds_short
from satpy.readers.utils import apply_earthsun_distance_correction
from satpy.utils import get_legacy_chunk_size

CHUNK_SIZE = get_legacy_chunk_size()
Expand Down Expand Up @@ -436,41 +437,6 @@
}


def get_meirink_slope(meirink_coefs, acquisition_time):
"""Compute the slope for the visible channel calibration according to Meirink 2013.

S = A + B * 1.e-3* Day

S is here in µW m-2 sr-1 (cm-1)-1

EUMETSAT calibration is given in mW m-2 sr-1 (cm-1)-1, so an extra factor of 1/1000 must
be applied.
"""
A = meirink_coefs[0]
B = meirink_coefs[1]
delta_t = (acquisition_time - MEIRINK_EPOCH).total_seconds()
S = A + B * delta_t / (3600*24) / 1000.
return S/1000


def should_apply_meirink(calib_mode, channel_name):
"""Decide whether to use the Meirink calibration coefficients."""
return "MEIRINK" in calib_mode and channel_name in ["VIS006", "VIS008", "IR_016"]


class MeirinkCalibrationHandler:
"""Re-calibration of the SEVIRI visible channels slope (see Meirink 2013)."""

def __init__(self, calib_mode):
"""Initialize the calibration handler."""
self.coefs = MEIRINK_COEFS[calib_mode.split("-")[1]]

def get_slope(self, platform, channel, time):
"""Return the slope using the provided calibration coefficients."""
coefs = self.coefs[platform][channel]
return get_meirink_slope(coefs, time)


def get_cds_time(days, msecs):
"""Compute timestamp given the days since epoch and milliseconds of the day.

Expand Down Expand Up @@ -663,7 +629,11 @@
reflectances for SEVIRI warm channels: https://www-cdn.eumetsat.int/files/2020-04/pdf_msg_seviri_rad2refl.pdf
"""
reflectance = np.pi * data * 100.0 / solar_irradiance
return apply_earthsun_distance_correction(reflectance, self._scan_time)
return utils.apply_earthsun_distance_correction(reflectance, self._scan_time)


CalibParams = namedtuple("CalibParams", ["mode", "internal_coefs", "external_coefs", "radiance_type"])
ScanParams = namedtuple("ScanParams", ["platform_id", "channel_name", "scan_time"])


class SEVIRICalibrationHandler:
Expand All @@ -673,23 +643,22 @@
calibration algorithm.
"""

def __init__(self, platform_id, channel_name, coefs, calib_mode, scan_time):
def __init__(self, calib_params, scan_params):
"""Initialize the calibration handler."""
self._platform_id = platform_id
self._channel_name = channel_name
self._coefs = coefs
self._calib_mode = calib_mode.upper()
self._scan_time = scan_time
self._calib_params = calib_params
self._scan_params = scan_params
self._algo = SEVIRICalibrationAlgorithm(
platform_id=self._platform_id,
scan_time=self._scan_time
platform_id=scan_params.platform_id,
scan_time=scan_params.scan_time
)
self._check_calib_mode(calib_params.mode)

def _check_calib_mode(self, calib_mode):
valid_modes = ("NOMINAL", "GSICS", "MEIRINK-2023")
if self._calib_mode not in valid_modes:
if calib_mode not in valid_modes:
raise ValueError(
"Invalid calibration mode: {}. Choose one of {}".format(
self._calib_mode, valid_modes)
calib_mode, valid_modes)

Check notice on line 661 in satpy/readers/seviri_base.py

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

✅ No longer an issue: Excess Number of Function Arguments

SEVIRICalibrationHandler.__init__ is no longer above the threshold for number of arguments
)

def calibrate(self, data, calibration):
Expand All @@ -698,57 +667,42 @@
res = data
elif calibration in ["radiance", "reflectance",
"brightness_temperature"]:
gain, offset = self.get_gain_offset()
coefs = self.get_coefs()
res = self._algo.convert_to_radiance(
data.astype(np.float32), np.float32(gain), np.float32(offset)
data.astype(np.float32),
np.float32(coefs["coefs"]["gain"]),
np.float32(coefs["coefs"]["offset"])
)
else:
raise ValueError(
"Invalid calibration {} for channel {}".format(
calibration, self._channel_name
calibration, self._scan_params.channel_name
)
)

if calibration == "reflectance":
solar_irradiance = CALIB[self._platform_id][self._channel_name]["F"]
solar_irradiance = CALIB[self._scan_params.platform_id][self._scan_params.channel_name]["F"]
res = self._algo.vis_calibrate(res, solar_irradiance)
elif calibration == "brightness_temperature":
res = self._algo.ir_calibrate(
res, self._channel_name, self._coefs["radiance_type"]
res, self._scan_params.channel_name, self._calib_params.radiance_type
)

return res

def get_gain_offset(self):
"""Get gain & offset for calibration from counts to radiance.
def get_coefs(self):
"""Get calibration coefficients."""
picker = utils.CalibrationCoefficientPicker(self._calib_params.internal_coefs,
self._get_calib_wishlist(),
default="NOMINAL",
fallback="NOMINAL")
return picker.get_coefs(self._scan_params.channel_name)

Choices for internal coefficients are nominal or GSICS. If no
GSICS coefficients are available for a certain channel, fall back to
nominal coefficients. External coefficients take precedence over
internal coefficients.
"""
coefs = self._coefs["coefs"]

# Select internal coefficients for the given calibration mode
internal_gain = coefs["NOMINAL"]["gain"]
internal_offset = coefs["NOMINAL"]["offset"]
if self._calib_mode == "GSICS":
gsics_gain = coefs["GSICS"]["gain"]
gsics_offset = coefs["GSICS"]["offset"] * gsics_gain
if gsics_gain != 0 and gsics_offset != 0:
# If no GSICS coefficients are available for a certain channel,
# they are set to zero in the file.
internal_gain = gsics_gain
internal_offset = gsics_offset

if should_apply_meirink(self._calib_mode, self._channel_name):
meirink = MeirinkCalibrationHandler(calib_mode=self._calib_mode)
internal_gain = meirink.get_slope(self._platform_id, self._channel_name, self._scan_time)

# Override with external coefficients, if any.
gain = coefs["EXTERNAL"].get("gain", internal_gain)
offset = coefs["EXTERNAL"].get("offset", internal_offset)
return gain, offset
def _get_calib_wishlist(self):
ext_coefs = self._calib_params.external_coefs or {}
wishlist = {
ch: self._calib_params.mode for ch in CHANNEL_NAMES.values()
}
return wishlist | ext_coefs


def chebyshev(coefs, time, domain):
Expand Down Expand Up @@ -1006,23 +960,117 @@
return (ll_c, ll_l, ur_c, ur_l)


def create_coef_dict(coefs_nominal, coefs_gsics, radiance_type, ext_coefs):
def create_coef_dict(nominal_coefs, gsics_coefs=None, meirink_coefs=None):
"""Create coefficient dictionary expected by calibration class."""
return {
"coefs": {
coefs = nominal_coefs.get_coefs()
if gsics_coefs:
coefs.update(gsics_coefs.get_coefs())
if meirink_coefs:
coefs.update(meirink_coefs.get_coefs(nominal_coefs.offset))
return coefs


class NominalCoefficients:
"""Nominal calibration coefficients."""
def __init__(self, channel_name, gain, offset):
"""Initialize coefficients."""
self.channel_name = channel_name
self.gain = gain
self.offset = offset

def get_coefs(self):
"""Get coefficient dictionary."""
return {
"NOMINAL": {
"gain": coefs_nominal[0],
"offset": coefs_nominal[1],
},
"GSICS": {
"gain": coefs_gsics[0],
"offset": coefs_gsics[1]
},
"EXTERNAL": ext_coefs
},
"radiance_type": radiance_type
}
self.channel_name: {
"gain": self.gain,
"offset": self.offset
}
}
}


class GsicsCoefficients:
"""GSICS calibration coefficients."""
def __init__(self, channel_name, gain, offset):
"""Initialize coefficients."""
self.channel_name = channel_name
self.gain = gain
self.offset = offset

def get_coefs(self):
"""Get coefficient dictionary."""
coefs = {"GSICS": {}}
if self._is_available():
coefs["GSICS"][self.channel_name] = {
"gain": self.gain,
"offset": self.offset * self.gain
}
return coefs

def _is_available(self):
# If no GSICS coefficients are available they are set to zero in
# the file.
return self.gain != 0 and self.offset != 0


class MeirinkCoefficients:
"""Re-calibration of the SEVIRI visible channels slope (see Meirink 2013)."""

def __init__(self, platform_id, channel_name, scan_time):
"""Initialize coefficients."""
self.platform_id = platform_id
self.channel_name = channel_name
self.scan_time = scan_time

def get_coefs(self, offset):
"""Get coefficient dictionary.

Args:
offset: Nominal calibration offset.
"""
gain = self._get_gain()
return self._combine_gain_and_offset(gain, offset)

def _get_gain(self):
res = {}
for version, coefs in MEIRINK_COEFS.items():
gain = self._get_gain_single_channel(coefs)
if gain:
res[f"MEIRINK-{version}"] = gain
return res

def _get_gain_single_channel(self, coefs):
try:
coefs_ch = coefs[self.platform_id][self.channel_name]
return self.get_slope(coefs_ch, self.scan_time)
except KeyError:
return None

@staticmethod
def get_slope(coefs_single_channel, acquisition_time):
"""Compute the slope for the visible channel calibration according to Meirink 2013.

S = A + B * 1.e-3* Day

S is here in µW m-2 sr-1 (cm-1)-1

EUMETSAT calibration is given in mW m-2 sr-1 (cm-1)-1, so an extra factor of 1/1000 must
be applied.
"""
A = coefs_single_channel[0]
B = coefs_single_channel[1]
delta_t = (acquisition_time - MEIRINK_EPOCH).total_seconds()
S = A + B * delta_t / (3600*24) / 1000.
return S/1000

def _combine_gain_and_offset(self, gain, offset):
return {
calib_mode: {
self.channel_name: {"gain": gain_, "offset": offset}
}
for calib_mode, gain_ in gain.items()
}

def get_padding_area(shape, dtype):
"""Create a padding area filled with no data."""
Expand Down
Loading
Loading