-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #470 from dirac-institute/single_trajectory
Create a tool for evaluating a single trajectory
- Loading branch information
Showing
3 changed files
with
213 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import numpy as np | ||
|
||
from kbmod.configuration import SearchConfiguration | ||
from kbmod.filters.sigma_g_filter import apply_single_clipped_sigma_g, SigmaGClipping | ||
from kbmod.masking import apply_mask_operations | ||
from kbmod.result_list import ResultRow | ||
from kbmod.search import StackSearch, StampCreator | ||
|
||
|
||
class TrajectoryExplorer: | ||
"""A class to interactively run test trajectories through KBMOD. | ||
Attributes | ||
---------- | ||
config : `SearchConfiguration` | ||
The configuration parameters. | ||
debug : `bool` | ||
Use verbose debug output. | ||
search : `kb.StackSearch` | ||
The search object (with cached data). | ||
""" | ||
|
||
def __init__(self, img_stack, config=None, debug=False): | ||
""" | ||
Parameters | ||
---------- | ||
im_stack : `ImageStack` | ||
The images to search. | ||
config : `SearchConfiguration`, optional | ||
The configuration parameters. If ``None`` uses the default | ||
configuration parameters. | ||
debug : `bool` | ||
Use verbose debug output. | ||
""" | ||
self._data_initalized = False | ||
self.im_stack = img_stack | ||
if config is None: | ||
self.config = SearchConfiguration() | ||
else: | ||
self.config = config | ||
self.debug = debug | ||
|
||
# Allocate and configure the StackSearch object. | ||
self.search = None | ||
|
||
def initialize_data(self): | ||
"""Perform any needed initialization and preprocessing on the images.""" | ||
if self._data_initalized: | ||
return | ||
|
||
# Check if we need to apply legacy masking. | ||
if self.config["do_mask"]: | ||
self.im_stack = apply_mask_operations(self.config, self.im_stack) | ||
|
||
# If we are using an encoded image representation on GPU, enable it and | ||
# set the parameters. | ||
if self.config["encode_num_bytes"] > 0: | ||
self.search.enable_gpu_encoding(self.config["encode_num_bytes"]) | ||
if self.debug: | ||
print(f"Setting encoding = {self.config['encode_num_bytes']}") | ||
|
||
# Allocate the search structure. | ||
self.search = StackSearch(self.im_stack) | ||
self.search.set_debug(self.debug) | ||
|
||
self._data_initalized = True | ||
|
||
def evaluate_linear_trajectory(self, x, y, vx, vy): | ||
"""Evaluate a single linear trajectory in pixel space. Skips all the filtering | ||
steps and returns the raw data. | ||
Parameters | ||
---------- | ||
x : `int` | ||
The starting x pixel of the trajectory. | ||
y : `int` | ||
The starting y pixel of the trajectory. | ||
vx : `float` | ||
The x velocity of the trajectory in pixels per day. | ||
vy : `float` | ||
The y velocity of the trajectory in pixels per day. | ||
Returns | ||
------- | ||
result : `ResultRow` | ||
The result data with all fields filled out. | ||
""" | ||
self.initialize_data() | ||
|
||
# Evaluate the trajectory. | ||
trj = self.search.search_linear_trajectory(x, y, vx, vy) | ||
result = ResultRow(trj, self.im_stack.img_count()) | ||
|
||
# Get the psi and phi curves and do the sigma_g filtering. | ||
psi_curve = np.array(self.search.get_psi_curves(trj)) | ||
phi_curve = np.array(self.search.get_phi_curves(trj)) | ||
result.set_psi_phi(psi_curve, phi_curve) | ||
|
||
# Get the individual stamps. | ||
stamps = StampCreator.get_stamps(self.im_stack, result.trajectory, self.config["stamp_radius"]) | ||
result.all_stamps = np.array([stamp.image for stamp in stamps]) | ||
|
||
return result | ||
|
||
def apply_sigma_g(self, result): | ||
"""Apply sigma G clipping to a single ResultRow. Modifies the row in-place. | ||
Parameters | ||
---------- | ||
result : `ResultRow` | ||
The row to test for filtering. | ||
""" | ||
clipper = SigmaGClipping( | ||
self.config["sigmaG_lims"][0], | ||
self.config["sigmaG_lims"][1], | ||
2, | ||
self.config["clip_negative"], | ||
) | ||
apply_single_clipped_sigma_g(clipper, result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import unittest | ||
|
||
import numpy as np | ||
|
||
from kbmod.fake_data.fake_data_creator import FakeDataSet | ||
from kbmod.search import HAS_GPU | ||
from kbmod.trajectory_explorer import TrajectoryExplorer | ||
from kbmod.trajectory_utils import make_trajectory | ||
|
||
|
||
class test_trajectory_explorer(unittest.TestCase): | ||
def setUp(self): | ||
# image properties | ||
self.img_count = 20 | ||
self.dim_x = 120 | ||
self.dim_y = 115 | ||
|
||
# create a Trajectory for the object | ||
self.x0 = 27 | ||
self.y0 = 50 | ||
self.vx = 21.0 | ||
self.vy = -5.0 | ||
self.trj = make_trajectory(self.x0, self.y0, self.vx, self.vy, flux=500.0) | ||
|
||
# create image set with single moving object | ||
fake_times = [i / self.img_count for i in range(self.img_count)] | ||
fake_ds = FakeDataSet( | ||
self.dim_x, | ||
self.dim_y, | ||
fake_times, | ||
noise_level=2.0, | ||
psf_val=1.0, | ||
use_seed=True, | ||
) | ||
fake_ds.insert_object(self.trj) | ||
|
||
# Remove at least observation from the trajectory. | ||
pred_x = int(self.x0 + fake_times[10] * self.vx + 0.5) | ||
pred_y = int(self.y0 + fake_times[10] * self.vy + 0.5) | ||
sci_t10 = fake_ds.stack.get_single_image(10).get_science() | ||
for dy in [-1, 0, 1]: | ||
for dx in [-1, 0, 1]: | ||
sci_t10.set_pixel(pred_y + dy, pred_x + dx, 0.0001) | ||
|
||
self.explorer = TrajectoryExplorer(fake_ds.stack) | ||
|
||
@unittest.skipIf(not HAS_GPU, "Skipping test (no GPU detected)") | ||
def test_evaluate_trajectory(self): | ||
result = self.explorer.evaluate_linear_trajectory(self.x0, self.y0, self.vx, self.vy) | ||
|
||
# We found the trajectory we were loooking for. | ||
self.assertIsNotNone(result.trajectory) | ||
self.assertEqual(result.trajectory.x, self.x0) | ||
self.assertEqual(result.trajectory.y, self.y0) | ||
self.assertEqual(result.trajectory.vx, self.vx) | ||
self.assertEqual(result.trajectory.vy, self.vy) | ||
|
||
# The statistics seem reasonable. | ||
self.assertGreater(result.trajectory.lh, 50.0) | ||
self.assertGreater(result.trajectory.flux, 50.0) | ||
self.assertGreater(result.trajectory.obs_count, 10) | ||
|
||
# We compute the rest of the data we need. | ||
self.assertEqual(result.num_times, 20) | ||
self.assertEqual(len(result.valid_indices), 20) | ||
self.assertEqual(len(result.psi_curve), 20) | ||
self.assertEqual(len(result.phi_curve), 20) | ||
self.assertEqual(len(result.all_stamps), 20) | ||
|
||
# At least one index 10 should be filtered by sigma G filtering. | ||
self.explorer.apply_sigma_g(result) | ||
self.assertLess(len(result.valid_indices), 20) | ||
self.assertFalse(10 in result.valid_indices) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |