Skip to content

Commit

Permalink
Fix bugs with hit_maps
Browse files Browse the repository at this point in the history
  • Loading branch information
MetinSa committed Aug 20, 2021
1 parent 289ec02 commit 2f95b3f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
3 changes: 1 addition & 2 deletions zodipy/_coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def get_target_coordinates(
target = TARGET_ALIASES[target.lower()]
else:
warnings.warn(
'The K98 model is only valid in the immediate surroundings of'
'the earth'
'The K98 model is only valid when observed from a near earth orbit'
)

query = Horizons(
Expand Down
63 changes: 38 additions & 25 deletions zodipy/simulation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterable
import warnings

import healpy as hp
import numpy as np
Expand All @@ -25,6 +26,8 @@ class SimulationStrategy(ABC):
The location(s) of the observer.
earth_location
The location(s) of the Earth.
hit_maps
The number of times each pixel is observed for a given observation.
"""

model: InterplanetaryDustModel
Expand All @@ -35,7 +38,7 @@ class SimulationStrategy(ABC):


@abstractmethod
def simulate(self, nside: int, freq: float, solar_cut: float) -> np.ndarray:
def simulate(self, nside: int, freq: float) -> np.ndarray:
"""Returns the simulated the Zodiacal emission.
The emission is computed given a nside and frequency and outputted
Expand All @@ -47,9 +50,6 @@ def simulate(self, nside: int, freq: float, solar_cut: float) -> np.ndarray:
HEALPIX map resolution parameter.
freq
Frequency [GHz] at which to evaluate the IPD model.
solar_cut
Angle [deg] between observer and the Sun for which all pixels
are masked (for each observation).
Returns
-------
Expand All @@ -60,26 +60,28 @@ def simulate(self, nside: int, freq: float, solar_cut: float) -> np.ndarray:

@dataclass
class InstantaneousStrategy(SimulationStrategy):
"""Simulation strategy for instantaneous emission."""
"""Simulation strategy for instantaneous emission.
This strategy simulates the sky as seen at an instant in time.
"""

def simulate(self, nside: int, freq: float) -> np.ndarray:
"""See base class for a description."""

components = self.model.components
emissivities = self.model.emissivities

X_observer = self.observer_locations
X_earth = self.earth_locations

if (hit_map := self.hit_maps) is not None:
pixels = np.flatnonzero(hit_map)
else:
pixels = Ellipsis

npix = hp.nside2npix(nside)
X_unit = np.asarray(hp.pix2vec(nside, np.arange(npix)))[pixels]
if (hit_maps := self.hit_maps) is None:
hit_maps = np.ones(npix)
elif hp.get_nside(hit_maps) != nside:
hit_maps = hp.ud_grade(hit_maps, nside, power=-2)

emission = np.zeros((len(components), npix))
pixels = np.flatnonzero(hit_maps)
X_unit = np.asarray(hp.pix2vec(nside, np.arange(npix)))[:, pixels]
emission = np.zeros((len(components), npix)) + np.NAN

for comp_idx, (comp_name, comp) in enumerate(components.items()):
integration_config = self.integration_config[comp_name]
Expand All @@ -102,24 +104,28 @@ def simulate(self, nside: int, freq: float) -> np.ndarray:

@dataclass
class TimeOrderedStrategy(SimulationStrategy):
"""Simulation strategy for time-ordered emission."""
"""Simulation strategy for time-ordered emission.
This returns the pixel weighted average of multiple simultaneous
observations.
"""

def simulate(self, nside: int, freq: float) -> np.ndarray:
"""See base class for a description."""

npix = hp.nside2npix(nside)

hit_maps = self.hit_maps
if hp.get_nside(hit_maps) != nside:
hit_maps = hp.ud_grade(self.hit_maps, nside, power=-2)

components = self.model.components
emissivities = self.model.emissivities
X_observer = self.observer_locations
X_earth = self.earth_locations
X_unit = np.asarray(hp.pix2vec(nside, np.arange(npix)))

components = self.model.components
emissivities = self.model.emissivities
npix = hp.nside2npix(nside)
if (hit_maps := self.hit_maps) is None:
hits = np.ones(npix)
hit_maps = np.asarray([hits for _ in range(len(X_observer))])
elif hp.get_nside(hit_maps := self.hit_maps) != nside:
hit_maps = hp.ud_grade(self.hit_maps, nside, power=-2)

X_unit = np.asarray(hp.pix2vec(nside, np.arange(npix)))
emission = np.zeros((len(components), npix))

for observer_pos, earth_pos, hit_map in zip(X_observer, X_earth, hit_maps):
Expand All @@ -142,5 +148,12 @@ def simulate(self, nside: int, freq: float) -> np.ndarray:
emission[comp_idx, pixels] += (
integrated_comp_emission * hit_map[pixels]
)

return emission / hit_maps.sum(axis=0) * 1e20

with warnings.catch_warnings():
# Unobserved pixels will be divided by 0 in the below return
# statement. This is fine since we want to return unobserved
# pixels as np.NAN. However, a RuntimeWarning is raise, which
# we silence in this context manager.
warnings.filterwarnings("ignore", category=RuntimeWarning)

return emission / hit_maps.sum(axis=0) * 1e20
44 changes: 27 additions & 17 deletions zodipy/zodi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@


class Zodi:
"""The main Zodipy interface.
Initializing this class sets up the initial conditions for the
simulation problem. The `get_emission` method is called to perform
the simulation.
"""
"""The Zodipy interface."""

def __init__(
self,
Expand All @@ -25,20 +20,22 @@ def __init__(
model: Optional[str] = 'planck 2018',
integration_config: Optional[str] = 'default'
) -> None:
"""Initializing the zodi interface.
"""Setting up initial conditions and other simulation specifics.
Parameters
----------
observer
The observer. Defaults to 'L2'.
epochs : scalar, list-like, or dictionary, optional
epochs
Either a list of epochs in JD or MJD format or a dictionary
defining a range of times and dates; the range dictionary has to
be of the form {``'start'``:'YYYY-MM-DD [HH:MM:SS]',
``'stop'``:'YYYY-MM-DD [HH:MM:SS]', ``'step'``:'n[y|d|m|s]'}.
``'stop'``:'YYYY-MM-DD [HH:MM:SS]', ``'step'``:'n[y|d|h|m|s]'}.
Epoch timescales depend on the type of query performed: UTC for
ephemerides queries, TDB for element queries, CT for vector queries.
If no epochs are provided, the current time is used.
hit_maps
The number of times each pixel is observed for a given observation.
model
String referencing the Interplanetary dust model used in the
simulation. Available options are 'planck 2013', 'planck 2015',
Expand Down Expand Up @@ -66,20 +63,33 @@ def __init__(

observer_locations = get_target_coordinates(observer, epochs)
earth_locations = get_target_coordinates('earth', epochs)

if hit_maps is not None and len(hit_maps) != len(observer_locations):
raise ValueError(
f"Lenght of 'hit_maps' ({len(hit_maps)}) are not matching "
f"the number of observations ({len(observer_locations)})"
)

# We check that the dimensons of hit_map corresponds to the number
# of observations.
if hit_maps is not None:
if np.ndim(hit_maps) == 1:
hit_maps = np.expand_dims(hit_maps, axis=0)
elif len(hit_maps) != len(observer_locations):
raise ValueError(
f"Lenght of 'hit_maps' ({len(hit_maps)}) are not matching "
f"the number of observations ({len(observer_locations)})"
)

if len(observer_locations) == 1:
self._simulation_strategy = InstantaneousStrategy(
model, integration_config, observer_locations[0], earth_locations[0], hit_maps
model,
integration_config,
observer_locations[0],
earth_locations[0],
hit_maps
)
else:
self._simulation_strategy = TimeOrderedStrategy(
model, integration_config, observer_locations, earth_locations, hit_maps
model,
integration_config,
observer_locations,
earth_locations,
hit_maps
)

def get_emission(
Expand Down

0 comments on commit 2f95b3f

Please sign in to comment.