Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dask compatibility in blackbody functions #74

Merged
merged 1 commit into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions pyspectral/blackbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@

"""Planck radiation equation"""
import numpy as np

import logging

try:
import dask.array as da
except ImportError:
da = np

LOG = logging.getLogger(__name__)

H_PLANCK = 6.62606957 * 1e-34 # SI-unit = [J*s]
Expand Down Expand Up @@ -84,37 +89,38 @@ def blackbody_wn_rad2temp(wavenumber, radiance):
function. Wavenumber space"""

if np.isscalar(radiance):
rad = np.array([radiance, ], dtype='float64')
else:
rad = np.array(radiance, dtype='float64')
radiance = np.array([radiance], dtype='float64')
elif isinstance(radiance, (list, tuple)):
radiance = np.array(radiance, dtype='float64')
if np.isscalar(wavenumber):
wavnum = np.array([wavenumber, ], dtype='float64')
else:
wavnum = np.array([wavenumber], dtype='float64')
elif isinstance(wavenumber, (list, tuple)):
wavnum = np.array(wavenumber, dtype='float64')

const1 = H_PLANCK * C_SPEED / K_BOLTZMANN
const2 = 2 * H_PLANCK * C_SPEED**2
res = const1 * wavnum / np.log(np.divide(const2 * wavnum**3, rad) + 1.0)
res = const1 * wavnum / np.log(
np.divide(const2 * wavnum**3, radiance) + 1.0)

shape = rad.shape
shape = radiance.shape
resshape = res.shape

if wavnum.shape[0] == 1:
if rad.shape[0] == 1:
if radiance.shape[0] == 1:
return res[0]
else:
return res[::].reshape(shape)
else:
if rad.shape[0] == 1:
if radiance.shape[0] == 1:
return res[0, :]
else:
if len(shape) == 1:
return np.reshape(res, (shape[0], resshape[1]))
return res.reshape((shape[0], resshape[1]))
else:
return np.reshape(res, (shape[0], shape[1], resshape[1]))
return res.reshape((shape[0], shape[1], resshape[1]))


def planck(wave, temp, wavelength=True):
def planck(wave, temperature, wavelength=True):
"""The Planck radiation or Blackbody radiation as a function of wavelength
or wavenumber. SI units.
_planck(wave, temperature, wavelength=True)
Expand All @@ -136,12 +142,12 @@ def planck(wave, temp, wavelength=True):
units = ['wavelengths', 'wavenumbers']
if wavelength:
LOG.debug("Using {0} when calculating the Blackbody radiance".format(
units[(wavelength == True) - 1]))
units[(wavelength is True) - 1]))

if np.isscalar(temp):
temperature = np.array([temp, ], dtype='float64')
else:
temperature = np.array(temp, dtype='float64')
if np.isscalar(temperature):
temperature = np.array([temperature, ], dtype='float64')
elif isinstance(temperature, (list, tuple)):
temperature = np.array(temperature, dtype='float64')

shape = temperature.shape
if np.isscalar(wave):
Expand All @@ -157,11 +163,17 @@ def planck(wave, temp, wavelength=True):
nom = 2 * H_PLANCK * (C_SPEED ** 2) * (wln ** 3)
arg1 = H_PLANCK * C_SPEED * wln / K_BOLTZMANN

arg2 = np.where(np.greater(np.abs(temperature), EPSILON),
np.array(1. / temperature), -9).reshape(-1, 1)
arg2 = np.ma.masked_array(arg2, mask=arg2 == -9)
LOG.debug("Max and min - arg1: %s %s", str(arg1.max()), str(arg1.min()))
LOG.debug("Max and min - arg2: %s %s", str(arg2.max()), str(arg2.min()))
# use dask functions when needed
np_ = np if isinstance(temperature, np.ndarray) else da
arg2 = np_.where(np.greater(np.abs(temperature), EPSILON),
(1. / temperature), np.nan).reshape(-1, 1)
if isinstance(arg2, np.ndarray):
# don't compute min/max if we have dask arrays
LOG.debug("Max and min - arg1: %s %s",
str(np.nanmax(arg1)), str(np.nanmin(arg1)))
LOG.debug("Max and min - arg2: %s %s",
str(np.nanmax(arg2)), str(np.nanmin(arg2)))

try:
exp_arg = np.multiply(arg1.astype('float32'), arg2.astype('float32'))
except MemoryError:
Expand All @@ -171,9 +183,9 @@ def planck(wave, temp, wavelength=True):
"and try running again"))
raise

LOG.debug("Max and min before exp: %s %s", str(exp_arg.max()),
str(exp_arg.min()))
if exp_arg.min() < 0:
if isinstance(exp_arg, np.ndarray) and exp_arg.min() < 0:
LOG.debug("Max and min before exp: %s %s",
str(exp_arg.max()), str(exp_arg.min()))
LOG.warning("Something is fishy: \n" +
"\tDenominator might be zero or negative in radiance derivation:")
dubious = np.where(exp_arg < 0)[0]
Expand All @@ -182,7 +194,6 @@ def planck(wave, temp, wavelength=True):

denom = np.exp(exp_arg) - 1
rad = nom / denom
rad = np.where(rad.mask, np.nan, rad.data)
radshape = rad.shape
if wln.shape[0] == 1:
if temperature.shape[0] == 1:
Expand All @@ -194,9 +205,9 @@ def planck(wave, temp, wavelength=True):
return rad[0, :]
else:
if len(shape) == 1:
return np.reshape(rad, (shape[0], radshape[1]))
return rad.reshape((shape[0], radshape[1]))
else:
return np.reshape(rad, (shape[0], shape[1], radshape[1]))
return rad.reshape((shape[0], shape[1], radshape[1]))


def blackbody_wn(wavenumber, temp):
Expand Down
67 changes: 53 additions & 14 deletions pyspectral/tests/test_blackbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,28 @@
__unittest = True


class TestBlackbody(unittest.TestCase):
class CustomScheduler(object):
"""Custom dask scheduler that raises an exception if dask is computed too many times."""

def __init__(self, max_computes=1):
"""Set starting and maximum compute counts."""
self.max_computes = max_computes
self.total_computes = 0

def __call__(self, dsk, keys, **kwargs):
"""Compute dask task and keep track of number of times we do so."""
import dask
self.total_computes += 1
if self.total_computes > self.max_computes:
raise RuntimeError("Too many dask computations were scheduled: {}".format(self.total_computes))
return dask.get(dsk, keys, **kwargs)

"""Unit testing the blackbody function"""

def setUp(self):
"""Set up"""
return
class TestBlackbody(unittest.TestCase):
"""Unit testing the blackbody function"""

def test_blackbody(self):
"""Calculate the blackbody radiation from wavelengths and
temperatures
"""
"""Calculate the blackbody radiation from wavelengths and temperatures."""
wavel = 11. * 1E-6
black = blackbody((wavel, ), [300., 301])
self.assertEqual(black.shape[0], 2)
Expand All @@ -71,14 +81,28 @@ def test_blackbody(self):

tb_therm = np.array([[300., 301], [299, 298], [279, 286]])
black = blackbody((10. * 1E-6, 11.e-6), tb_therm)
self.assertIsInstance(black, np.ndarray)

tb_therm = np.array([[300., 301], [0., 298], [279, 286]])
black = blackbody((10. * 1E-6, 11.e-6), tb_therm)
self.assertIsInstance(black, np.ndarray)

def test_blackbody_dask(self):
"""Calculate the blackbody radiation from wavelengths and temperatures with dask arrays."""
import dask
import dask.array as da
tb_therm = da.from_array([[300., 301], [299, 298], [279, 286]], chunks=2)
with dask.config.set(scheduler=CustomScheduler(0)):
black = blackbody((10. * 1E-6, 11.e-6), tb_therm)
self.assertIsInstance(black, da.Array)

tb_therm = da.from_array([[300., 301], [0., 298], [279, 286]], chunks=2)
with dask.config.set(scheduler=CustomScheduler(0)):
black = blackbody((10. * 1E-6, 11.e-6), tb_therm)
self.assertIsInstance(black, da.Array)

def test_blackbody_wn(self):
"""Calculate the blackbody radiation from wavenumbers and
temperatures
"""
"""Calculate the blackbody radiation from wavenumbers and temperatures."""
wavenumber = 90909.1 # 11 micron band
black = blackbody_wn((wavenumber, ), [300., 301])
self.assertEqual(black.shape[0], 2)
Expand Down Expand Up @@ -106,9 +130,24 @@ def test_blackbody_wn(self):

assertNumpyArraysEqual(t__, expected)

def tearDown(self):
"""Clean up"""
return
def test_blackbody_wn_dask(self):
"""Test that blackbody rad2temp preserves dask arrays."""
import dask
import dask.array as da
wavenumber = 90909.1 # 11 micron band
radiances = da.from_array([0.001, 0.0009, 0.0012, 0.0018], chunks=2).reshape(2, 2)
with dask.config.set(scheduler=CustomScheduler(0)):
t__ = blackbody_wn_rad2temp(wavenumber, radiances)
self.assertIsInstance(t__, da.Array)
t__ = t__.compute()
expected = np.array([290.3276916, 283.76115441,
302.4181330, 333.1414164]).reshape(2, 2)
self.assertAlmostEqual(t__[1, 1], expected[1, 1], 5)
self.assertAlmostEqual(t__[0, 0], expected[0, 0], 5)
self.assertAlmostEqual(t__[0, 1], expected[0, 1], 5)
self.assertAlmostEqual(t__[1, 0], expected[1, 0], 5)

assertNumpyArraysEqual(t__, expected)


def suite():
Expand Down