From deb43c579cbecc0f50114133c8fa921516ad77b9 Mon Sep 17 00:00:00 2001 From: David Hoese Date: Fri, 3 May 2019 10:39:39 -0500 Subject: [PATCH] Fix dask compatibility in blackbody functions --- pyspectral/blackbody.py | 69 +++++++++++++++++------------- pyspectral/tests/test_blackbody.py | 67 +++++++++++++++++++++++------ 2 files changed, 93 insertions(+), 43 deletions(-) diff --git a/pyspectral/blackbody.py b/pyspectral/blackbody.py index af069b07..ba00b628 100644 --- a/pyspectral/blackbody.py +++ b/pyspectral/blackbody.py @@ -23,8 +23,13 @@ """Planck radiation equation""" import numpy as np - import logging + +try: + import dask.array as da +except ImportError: + da = np + LOG = logging.getLogger(__name__) H_PLANCK = 6.62606957 * 1e-34 # SI-unit = [J*s] @@ -84,37 +89,38 @@ def blackbody_wn_rad2temp(wavenumber, radiance): function. Wavenumber space""" if np.isscalar(radiance): - rad = np.array([radiance, ], dtype='float64') - else: - rad = np.array(radiance, dtype='float64') + radiance = np.array([radiance], dtype='float64') + elif isinstance(radiance, (list, tuple)): + radiance = np.array(radiance, dtype='float64') if np.isscalar(wavenumber): - wavnum = np.array([wavenumber, ], dtype='float64') - else: + wavnum = np.array([wavenumber], dtype='float64') + elif isinstance(wavenumber, (list, tuple)): wavnum = np.array(wavenumber, dtype='float64') const1 = H_PLANCK * C_SPEED / K_BOLTZMANN const2 = 2 * H_PLANCK * C_SPEED**2 - res = const1 * wavnum / np.log(np.divide(const2 * wavnum**3, rad) + 1.0) + res = const1 * wavnum / np.log( + np.divide(const2 * wavnum**3, radiance) + 1.0) - shape = rad.shape + shape = radiance.shape resshape = res.shape if wavnum.shape[0] == 1: - if rad.shape[0] == 1: + if radiance.shape[0] == 1: return res[0] else: return res[::].reshape(shape) else: - if rad.shape[0] == 1: + if radiance.shape[0] == 1: return res[0, :] else: if len(shape) == 1: - return np.reshape(res, (shape[0], resshape[1])) + return res.reshape((shape[0], resshape[1])) else: - return np.reshape(res, (shape[0], shape[1], resshape[1])) + return res.reshape((shape[0], shape[1], resshape[1])) -def planck(wave, temp, wavelength=True): +def planck(wave, temperature, wavelength=True): """The Planck radiation or Blackbody radiation as a function of wavelength or wavenumber. SI units. _planck(wave, temperature, wavelength=True) @@ -136,12 +142,12 @@ def planck(wave, temp, wavelength=True): units = ['wavelengths', 'wavenumbers'] if wavelength: LOG.debug("Using {0} when calculating the Blackbody radiance".format( - units[(wavelength == True) - 1])) + units[(wavelength is True) - 1])) - if np.isscalar(temp): - temperature = np.array([temp, ], dtype='float64') - else: - temperature = np.array(temp, dtype='float64') + if np.isscalar(temperature): + temperature = np.array([temperature, ], dtype='float64') + elif isinstance(temperature, (list, tuple)): + temperature = np.array(temperature, dtype='float64') shape = temperature.shape if np.isscalar(wave): @@ -157,11 +163,17 @@ def planck(wave, temp, wavelength=True): nom = 2 * H_PLANCK * (C_SPEED ** 2) * (wln ** 3) arg1 = H_PLANCK * C_SPEED * wln / K_BOLTZMANN - arg2 = np.where(np.greater(np.abs(temperature), EPSILON), - np.array(1. / temperature), -9).reshape(-1, 1) - arg2 = np.ma.masked_array(arg2, mask=arg2 == -9) - LOG.debug("Max and min - arg1: %s %s", str(arg1.max()), str(arg1.min())) - LOG.debug("Max and min - arg2: %s %s", str(arg2.max()), str(arg2.min())) + # use dask functions when needed + np_ = np if isinstance(temperature, np.ndarray) else da + arg2 = np_.where(np.greater(np.abs(temperature), EPSILON), + (1. / temperature), np.nan).reshape(-1, 1) + if isinstance(arg2, np.ndarray): + # don't compute min/max if we have dask arrays + LOG.debug("Max and min - arg1: %s %s", + str(np.nanmax(arg1)), str(np.nanmin(arg1))) + LOG.debug("Max and min - arg2: %s %s", + str(np.nanmax(arg2)), str(np.nanmin(arg2))) + try: exp_arg = np.multiply(arg1.astype('float32'), arg2.astype('float32')) except MemoryError: @@ -171,9 +183,9 @@ def planck(wave, temp, wavelength=True): "and try running again")) raise - LOG.debug("Max and min before exp: %s %s", str(exp_arg.max()), - str(exp_arg.min())) - if exp_arg.min() < 0: + if isinstance(exp_arg, np.ndarray) and exp_arg.min() < 0: + LOG.debug("Max and min before exp: %s %s", + str(exp_arg.max()), str(exp_arg.min())) LOG.warning("Something is fishy: \n" + "\tDenominator might be zero or negative in radiance derivation:") dubious = np.where(exp_arg < 0)[0] @@ -182,7 +194,6 @@ def planck(wave, temp, wavelength=True): denom = np.exp(exp_arg) - 1 rad = nom / denom - rad = np.where(rad.mask, np.nan, rad.data) radshape = rad.shape if wln.shape[0] == 1: if temperature.shape[0] == 1: @@ -194,9 +205,9 @@ def planck(wave, temp, wavelength=True): return rad[0, :] else: if len(shape) == 1: - return np.reshape(rad, (shape[0], radshape[1])) + return rad.reshape((shape[0], radshape[1])) else: - return np.reshape(rad, (shape[0], shape[1], radshape[1])) + return rad.reshape((shape[0], shape[1], radshape[1])) def blackbody_wn(wavenumber, temp): diff --git a/pyspectral/tests/test_blackbody.py b/pyspectral/tests/test_blackbody.py index 908f3fef..9ac29e00 100644 --- a/pyspectral/tests/test_blackbody.py +++ b/pyspectral/tests/test_blackbody.py @@ -43,18 +43,28 @@ __unittest = True -class TestBlackbody(unittest.TestCase): +class CustomScheduler(object): + """Custom dask scheduler that raises an exception if dask is computed too many times.""" + + def __init__(self, max_computes=1): + """Set starting and maximum compute counts.""" + self.max_computes = max_computes + self.total_computes = 0 + + def __call__(self, dsk, keys, **kwargs): + """Compute dask task and keep track of number of times we do so.""" + import dask + self.total_computes += 1 + if self.total_computes > self.max_computes: + raise RuntimeError("Too many dask computations were scheduled: {}".format(self.total_computes)) + return dask.get(dsk, keys, **kwargs) - """Unit testing the blackbody function""" - def setUp(self): - """Set up""" - return +class TestBlackbody(unittest.TestCase): + """Unit testing the blackbody function""" def test_blackbody(self): - """Calculate the blackbody radiation from wavelengths and - temperatures - """ + """Calculate the blackbody radiation from wavelengths and temperatures.""" wavel = 11. * 1E-6 black = blackbody((wavel, ), [300., 301]) self.assertEqual(black.shape[0], 2) @@ -71,14 +81,28 @@ def test_blackbody(self): tb_therm = np.array([[300., 301], [299, 298], [279, 286]]) black = blackbody((10. * 1E-6, 11.e-6), tb_therm) + self.assertIsInstance(black, np.ndarray) tb_therm = np.array([[300., 301], [0., 298], [279, 286]]) black = blackbody((10. * 1E-6, 11.e-6), tb_therm) + self.assertIsInstance(black, np.ndarray) + + def test_blackbody_dask(self): + """Calculate the blackbody radiation from wavelengths and temperatures with dask arrays.""" + import dask + import dask.array as da + tb_therm = da.from_array([[300., 301], [299, 298], [279, 286]], chunks=2) + with dask.config.set(scheduler=CustomScheduler(0)): + black = blackbody((10. * 1E-6, 11.e-6), tb_therm) + self.assertIsInstance(black, da.Array) + + tb_therm = da.from_array([[300., 301], [0., 298], [279, 286]], chunks=2) + with dask.config.set(scheduler=CustomScheduler(0)): + black = blackbody((10. * 1E-6, 11.e-6), tb_therm) + self.assertIsInstance(black, da.Array) def test_blackbody_wn(self): - """Calculate the blackbody radiation from wavenumbers and - temperatures - """ + """Calculate the blackbody radiation from wavenumbers and temperatures.""" wavenumber = 90909.1 # 11 micron band black = blackbody_wn((wavenumber, ), [300., 301]) self.assertEqual(black.shape[0], 2) @@ -106,9 +130,24 @@ def test_blackbody_wn(self): assertNumpyArraysEqual(t__, expected) - def tearDown(self): - """Clean up""" - return + def test_blackbody_wn_dask(self): + """Test that blackbody rad2temp preserves dask arrays.""" + import dask + import dask.array as da + wavenumber = 90909.1 # 11 micron band + radiances = da.from_array([0.001, 0.0009, 0.0012, 0.0018], chunks=2).reshape(2, 2) + with dask.config.set(scheduler=CustomScheduler(0)): + t__ = blackbody_wn_rad2temp(wavenumber, radiances) + self.assertIsInstance(t__, da.Array) + t__ = t__.compute() + expected = np.array([290.3276916, 283.76115441, + 302.4181330, 333.1414164]).reshape(2, 2) + self.assertAlmostEqual(t__[1, 1], expected[1, 1], 5) + self.assertAlmostEqual(t__[0, 0], expected[0, 0], 5) + self.assertAlmostEqual(t__[0, 1], expected[0, 1], 5) + self.assertAlmostEqual(t__[1, 0], expected[1, 0], 5) + + assertNumpyArraysEqual(t__, expected) def suite():