diff --git a/notebooks/TrajectoryExplorer.ipynb b/notebooks/TrajectoryExplorer.ipynb index 98ec62866..68dd72c91 100644 --- a/notebooks/TrajectoryExplorer.ipynb +++ b/notebooks/TrajectoryExplorer.ipynb @@ -160,6 +160,45 @@ " axs[i, j].imshow(result.all_stamps[ind], cmap=\"gray\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using (RA, dec)\n", + "\n", + "You can also use the `TrajectoryExplorer` with a WCS and (RA, dec) coordinates. Both RA and dec are specified in degrees and the corresponding velocities are expressed as degrees per day.\n", + "\n", + "We start by creating a fake WCS to match our fake images. Then we evaluate the trajectory based on this WCS." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from astropy.wcs import WCS\n", + "\n", + "my_wcs = WCS(naxis=2)\n", + "my_wcs.wcs.crpix = [201.0, 251.0] # Reference point on the image (center of the image 1-indexed)\n", + "my_wcs.wcs.crval = [45.0, -15.0] # Reference pointing on the sky\n", + "my_wcs.wcs.cdelt = [0.1, 0.1] # Pixel step size\n", + "my_wcs.wcs.ctype = [\"RA---TAN-SIP\", \"DEC--TAN-SIP\"]\n", + "\n", + "# Use the WCS to compute the angular coordinates of the inserted object at two times.\n", + "sky_pos0 = my_wcs.pixel_to_world(50, 60)\n", + "sky_pos1 = my_wcs.pixel_to_world(55, 58)\n", + "\n", + "ra0 = sky_pos0.ra.deg\n", + "dec0 = sky_pos0.dec.deg\n", + "v_ra = sky_pos1.ra.deg - ra0\n", + "v_dec = sky_pos1.dec.deg - dec0\n", + "print(f\"Object starts at ({ra0}, {dec0}) with velocity ({v_ra}, {v_dec}).\")\n", + "\n", + "result = explorer.evaluate_angle_trajectory(ra0, dec0, v_ra, v_dec, my_wcs)\n", + "print(result.trajectory)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -178,13 +217,20 @@ "explorer.apply_sigma_g(result)\n", "print(f\"Valid time steps={result.valid_indices}\")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python (.conda-kbmod)", + "display_name": "Jeremy's KBMOD", "language": "python", - "name": "conda-env-.conda-kbmod-py" + "name": "kbmod_jk" }, "language_info": { "codemirror_mode": { @@ -196,7 +242,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.1" + "version": "3.12.2" } }, "nbformat": 4, diff --git a/src/kbmod/trajectory_explorer.py b/src/kbmod/trajectory_explorer.py index b50e281a7..512126e94 100644 --- a/src/kbmod/trajectory_explorer.py +++ b/src/kbmod/trajectory_explorer.py @@ -5,6 +5,7 @@ from kbmod.masking import apply_mask_operations from kbmod.result_list import ResultRow from kbmod.search import StackSearch, StampCreator, Logging +from kbmod.trajectory_utils import make_trajectory_from_ra_dec logger = Logging.getLogger(__name__) @@ -104,6 +105,31 @@ def evaluate_linear_trajectory(self, x, y, vx, vy): return result + def evaluate_angle_trajectory(self, ra, dec, v_ra, v_dec, wcs): + """Evaluate a single linear trajectory in angle space. Skips all the filtering + steps and returns the raw data. + + Parameters + ---------- + ra : `float` + The right ascension at time t0 (in degrees) + dec : `float` + The declination at time t0 (in degrees) + v_ra : `float` + The velocity in RA at t0 (in degrees/day) + v_dec : `float` + The velocity in declination at t0 (in degrees/day) + wcs : `astropy.wcs.WCS` + The WCS for the images. + + Returns + ------- + result : `ResultRow` + The result data with all fields filled out. + """ + trj = make_trajectory_from_ra_dec(ra, dec, v_ra, v_dec, wcs) + return self.evaluate_linear_trajectory(trj.x, trj.y, trj.vx, trj.vy) + def apply_sigma_g(self, result): """Apply sigma G clipping to a single ResultRow. Modifies the row in-place. diff --git a/src/kbmod/trajectory_utils.py b/src/kbmod/trajectory_utils.py index 0caa6a557..2604f2567 100644 --- a/src/kbmod/trajectory_utils.py +++ b/src/kbmod/trajectory_utils.py @@ -13,6 +13,7 @@ import numpy as np +from astropy.coordinates import SkyCoord from astropy.wcs import WCS from yaml import dump, safe_load @@ -56,6 +57,37 @@ def make_trajectory(x=0, y=0, vx=0.0, vy=0.0, flux=0.0, lh=0.0, obs_count=0): return trj +def make_trajectory_from_ra_dec(ra, dec, v_ra, v_dec, wcs): + """Create a trajectory object from (RA, dec) information. + + Parameters + ---------- + ra : `float` + The right ascension at time t0 (in degrees) + dec : `float` + The declination at time t0 (in degrees) + v_ra : `float` + The velocity in RA at t0 (in degrees/day) + v_dec : `float` + The velocity in declination at t0 (in degrees/day) + wcs : `astropy.wcs.WCS` + The WCS for the images. + + .. note:: + The motion is approximated as linear and will be approximately correct + only for small temporal range and spatial region. + + Returns + ------- + trj : `Trajectory` + The resulting Trajectory object. + """ + # Predict the pixel positions at t0 and t0 + 1 + x0, y0 = wcs.world_to_pixel(SkyCoord(ra, dec, unit="deg")) + x1, y1 = wcs.world_to_pixel(SkyCoord(ra + v_ra, dec + v_dec, unit="deg")) + return make_trajectory(x=x0, y=y0, vx=(x1 - x0), vy=(y1 - y0)) + + def trajectory_predict_skypos(trj, wcs, times): """Predict the (RA, dec) locations of the trajectory at different times. diff --git a/tests/test_trajectory_utils.py b/tests/test_trajectory_utils.py index 7933eb4ca..4dd75797d 100644 --- a/tests/test_trajectory_utils.py +++ b/tests/test_trajectory_utils.py @@ -17,6 +17,20 @@ def test_make_trajectory(self): self.assertEqual(trj.lh, 6.0) self.assertEqual(trj.obs_count, 7) + def test_predict_skypos(self): + # Create a fake WCS with a known pointing. + my_wcs = WCS(naxis=2) + my_wcs.wcs.crpix = [10.0, 10.0] # Reference point on the image (1-indexed) + my_wcs.wcs.crval = [45.0, -15.0] # Reference pointing on the sky + my_wcs.wcs.cdelt = [0.1, 0.1] # Pixel step size + my_wcs.wcs.ctype = ["RA---TAN-SIP", "DEC--TAN-SIP"] + + trj = make_trajectory_from_ra_dec(45.0, -15.0, 1.0, 0.5) + self.assertAlmostEqual(trj.x, 9.0) + self.assertAlmostEqual(trj.y, 9.0) + self.assertAlmostEqual(trj.vx, 9.9190138, delta=1e-6) + self.assertAlmostEqual(trj.vy, 4.97896903, delta=1e-6) + def test_predict_skypos(self): # Create a fake WCS with a known pointing. my_wcs = WCS(naxis=2)