Skip to content

Commit

Permalink
Refactored parts of the code base
Browse files Browse the repository at this point in the history
  • Loading branch information
MetinSa committed Aug 18, 2021
1 parent 0704c28 commit 44b7ab0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 77 deletions.
2 changes: 1 addition & 1 deletion zodipy/_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def dR(self) -> np.ndarray:
}


CONFIGS = {
INTEGRATION_CONFIGS = {
'default' : DEFAULT,
'high' : HIGH,
}
6 changes: 3 additions & 3 deletions zodipy/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
class BaseComponent(ABC):
"""Base class for a component of the interplanetary dust.
This class contains a method to get the coordinates of a shell around an
observer in the reference frame of the component. Any component that
inherits from the class must implement a get_density method.
This class contains a method to get the coordinates of a shell around
an observer in the reference frame of the component. Any component
that inherits from the class must implement a `get_density` method.
Attributes
----------
Expand Down
58 changes: 26 additions & 32 deletions zodipy/simulation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from math import radians
from typing import Iterable, List
from typing import Union, Iterable, List
import warnings

import healpy as hp
Expand All @@ -13,7 +13,7 @@

@dataclass
class SimulationStrategy(ABC):
"""Base class that represents a certian simulation strategy.
"""Base class that represents a simulation strategy.
Attributes
----------
Expand All @@ -35,10 +35,10 @@ class SimulationStrategy(ABC):
earth_locations: Iterable

@abstractmethod
def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:
def simulate(self, nside: int, freq: float, solar_cut: float) -> np.ndarray:
"""Returns the simulated the Zodiacal emission.
The emission is computed given a nside and frequency, and outputted
The emission is computed given a nside and frequency and outputted
in units of MJy/sr.
Parameters
Expand All @@ -47,7 +47,7 @@ def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:
HEALPIX map resolution parameter.
freq
Frequency [GHz] at which to evaluate the IPD model.
mask
solar_cut
Angle [deg] between observer and the Sun for which all pixels
are masked (for each observation).
Expand All @@ -59,33 +59,27 @@ def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:

@staticmethod
def get_observed_pixels(
X_observer: np.ndarray, X_unit: np.ndarray, ang: float
X_observer: np.ndarray,
X_unit: np.ndarray,
solar_cut: Union[float, None]
) -> List[np.ndarray]:
"""Returns a list of observed pixels per observation.
All pixels that have an angular distance of larger than some angle
between the observer and the sun are masked.
Parameters
----------
X_observer
Array containing coordinates of the observer.
X_unit
Array containing heliocentric unit vectors.
ang
Angle for which all pixels are masked [deg].
Returns
-------
observed_pixels
List containing arrays of unmasked pixels per observation.
solar_cut between the observer and the sun are masked.
"""

angular_distance = [
if solar_cut is None:
return Ellipsis

angular_distance = (
hp.rotator.angdist(obs , X_unit) for obs in X_observer
)

observed_pixels = [
ang_dist < radians(solar_cut) for ang_dist in angular_distance
]

observed_pixels = [ang_dist < radians(ang) for ang_dist in angular_distance]
return observed_pixels


Expand All @@ -101,7 +95,7 @@ def __init__(
model, integration_config, observer_locations, earth_locations
)

def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:
def simulate(self, nside: int, freq: float, solar_cut: float) -> np.ndarray:
"""See base class for a description."""

npix = hp.nside2npix(nside)
Expand All @@ -113,19 +107,19 @@ def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:

n_observations = len(X_observer)

if mask is None:
pixels = [slice(0, npix) for _ in range(n_observations)]
else:
pixels = self.get_observed_pixels(X_observer, X_unit, ang=mask)
pixels = self.get_observed_pixels(X_observer, X_unit, solar_cut)

components = self.model.components
emissivities = self.model.emissivities

# The emission is initialized as NANs representing unobserved pixels
# The total emission is initialized as NANs representing unobserved pixels
emission = np.zeros((n_observations, len(components), npix)) + np.NAN

for observation_idx, (observer_pos, earth_pos) in enumerate(zip(X_observer, X_earth)):
observed_pixels = pixels[observation_idx]
if solar_cut is None:
observed_pixels = pixels
else:
observed_pixels = pixels[observation_idx]
unit_vectors = X_unit[:, observed_pixels]

for comp_idx, (comp_name, comp) in enumerate(components.items()):
Expand All @@ -147,8 +141,8 @@ def simulate(self, nside: int, freq: float, mask: float) -> np.ndarray:
with warnings.catch_warnings():
# np.nanmean throws a RuntimeWarning if all pixels along an
# axis is NANs. This may occur when parts of the sky is left
# unobserver over all observations. Here we manually disable
# the warning in the aforementioned scenario.
# unobserved over all observations. Here we manually disable
# the warning thay is thrown in the aforementioned scenario.
warnings.filterwarnings("ignore", category=RuntimeWarning)

return np.nanmean(emission, axis=0) * 1e20
70 changes: 29 additions & 41 deletions zodipy/zodi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@
import astropy.units as u
import numpy as np

from zodipy import models
from zodipy import simulation
from zodipy import _coordinates
from zodipy import _integration
from zodipy._coordinates import get_target_coordinates, change_coordinate_system
from zodipy._integration import INTEGRATION_CONFIGS
from zodipy.models import MODELS
from zodipy.simulation import InstantaneousStrategy


class Zodi:
"""The main Zodipy interface.
The initialization of this class is responsible for setting up the
simulation problem. The `get_emission` method is called to return a
simulation of the Zodiacal emission given the initial conditions.
Attributes
----------
simulation_strategy : `zodipy.simulations.SimulationStrategy`
Class representing the simulation aspect of Zodipy.
Methods
-------
get_emission
Initializing this class sets up the initial conditions for the
simulation problem. The `get_emission` method is called to perfrom
the simulation.
"""

def __init__(
Expand All @@ -37,10 +29,6 @@ def __init__(
) -> None:
"""Initializing the zodi interface.
The geometric setup of the simulation, the IPD model, and the
integration configuration used when integrating up the emission
along lines-of-sight are all configured here.
Parameters
----------
observer
Expand All @@ -59,34 +47,34 @@ def __init__(
simulation. Available options are 'planck 2013', 'planck 2015',
and 'planck 2018'. Defaults to 'planck 2018'.
integration_config
String referencing the integration_config object which determins
the integration details used in the simulation. Available options
are: 'default', and 'high'. Defaults to 'default'.
String referencing the integration_config object used when
calling `get_emission`. Available options are: 'default', and
'high'. Defaults to 'default'.
"""

observer_locations = _coordinates.get_target_coordinates(
observer_locations = get_target_coordinates(
observer, start, stop, step
)
earth_locations = _coordinates.get_target_coordinates(
earth_locations = get_target_coordinates(
'earth', start, stop, step
)

try:
model = models.MODELS[model.lower()]
model = MODELS[model.lower()]
except KeyError:
raise KeyError(
f"Model {model!r} not found. Available models are: "
f"{list(models.MODELS.keys())}"
f"{list(MODELS.keys())}"
)
try:
integration_config = _integration.CONFIGS[integration_config.lower()]
integration_config = INTEGRATION_CONFIGS[integration_config.lower()]
except KeyError:
raise KeyError(
f"Config {integration_config!r} not found. Available configs "
f"are: {list(_integration.CONFIGS.keys())}"
f"are: {list(INTEGRATION_CONFIGS.keys())}"
)

self.simulation_strategy = simulation.InstantaneousStrategy(
self.simulation_strategy = InstantaneousStrategy(
model, integration_config, observer_locations, earth_locations
)

Expand All @@ -96,29 +84,29 @@ def get_emission(
freq: Union[float, u.Quantity],
coord: Optional[str] = 'G',
return_comps: Optional[bool] = False,
mask: float = None
solar_cut: float = None
) -> np.ndarray:
"""Returns the simulated Zodiacal emission in units of MJy/sr.
"""Simulates the Zodiacal emission in units of MJy/sr.
Parameters
----------
nside
HEALPIX map resolution parameter.
freq
Frequency [GHz] at which to evaluate the Zodiacal emission.
The frequency should be in units of GHz unless an
Frequency at which to evaluate the Zodiacal emission. The
frequency should be in units of GHz unless an
`astropy.units.Quantity` object is passed for which it only
needs to be compatible with Hz.
coord
Coordinate system of the output map. Available options are:
'E', 'C', or 'G'. Defaults to 'G'.
return_comps
If True, the emission of each component in the model is returned
separatly in form of an array of shape (`n_comps`, `npix`).
If True, the emission of each component in the model is
returned separatly in the first dim of the output array.
Defaults to False.
mask
Angle [deg] between observer and the Sun for which all pixels
are masked at each observation.
solar_cut
Angle in degrees between observer and the Sun for which all
pixels are masked for each observation.
Returns
-------
Expand All @@ -129,8 +117,8 @@ def get_emission(
if isinstance(freq, u.Quantity):
freq = freq.to('GHz').value

emission = self.simulation_strategy.simulate(nside, freq, mask)
emission = self.simulation_strategy.simulate(nside, freq, solar_cut)

emission = _coordinates.change_coordinate_system(emission, coord)
emission = change_coordinate_system(emission, coord)

return emission if return_comps else emission.sum(axis=0)

0 comments on commit 44b7ab0

Please sign in to comment.