From a426f2098febaa547e819db19aca1ad91c29902c Mon Sep 17 00:00:00 2001 From: edparris Date: Tue, 12 Sep 2023 10:43:01 -0400 Subject: [PATCH] feat: add HAE and DEM projections to SICD sensor model --- src/aws/osml/formats/sicd/__init__.py | 1 - src/aws/osml/gdal/gdal_dem_tile_factory.py | 39 ++- src/aws/osml/photogrammetry/__init__.py | 3 +- .../photogrammetry/digital_elevation_model.py | 35 +- .../osml/photogrammetry/elevation_model.py | 37 ++ .../osml/photogrammetry/sicd_sensor_model.py | 320 +++++++++++++++++- .../osml/gdal/test_gdal_dem_tile_factory.py | 8 +- .../osml/gdal/test_sensor_model_factory.py | 6 +- .../test_digital_elevation_model.py | 6 +- .../photogrammetry/test_sicd_sensor_model.py | 30 +- 10 files changed, 446 insertions(+), 39 deletions(-) diff --git a/src/aws/osml/formats/sicd/__init__.py b/src/aws/osml/formats/sicd/__init__.py index b2a4ba5..e69de29 100644 --- a/src/aws/osml/formats/sicd/__init__.py +++ b/src/aws/osml/formats/sicd/__init__.py @@ -1 +0,0 @@ -# nothing here diff --git a/src/aws/osml/gdal/gdal_dem_tile_factory.py b/src/aws/osml/gdal/gdal_dem_tile_factory.py index f89fb6a..6ec3b53 100644 --- a/src/aws/osml/gdal/gdal_dem_tile_factory.py +++ b/src/aws/osml/gdal/gdal_dem_tile_factory.py @@ -1,9 +1,16 @@ import logging from typing import Any, Optional, Tuple +import numpy as np from osgeo import gdal -from aws.osml.photogrammetry import DigitalElevationModelTileFactory, GDALAffineSensorModel +from aws.osml.photogrammetry import ( + DigitalElevationModelTileFactory, + ElevationRegionSummary, + GDALAffineSensorModel, + ImageCoordinate, + geodetic_to_geocentric, +) class GDALDigitalElevationModelTileFactory(DigitalElevationModelTileFactory): @@ -24,7 +31,9 @@ def __init__(self, tile_directory: str) -> None: super().__init__() self.tile_directory = tile_directory - def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[GDALAffineSensorModel]]: + def get_tile( + self, tile_path: str + ) -> Tuple[Optional[Any], Optional[GDALAffineSensorModel], Optional[ElevationRegionSummary]]: """ Retrieve a numpy array of elevation values and a sensor model. @@ -32,7 +41,7 @@ def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[GDALAffineSe :param tile_path: the location of the tile to load - :return: an array of elevation values and a sensor model or (None, None) + :return: an array of elevation values, a sensor model, and a summary or (None, None, None) """ tile_location = f"{self.tile_directory}/{tile_path}" tile_location = tile_location.replace("s3:/", "/vsis3", 1) @@ -43,15 +52,31 @@ def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[GDALAffineSe # information isn't available. if not ds: logging.debug(f"No DEM tile available for {tile_path}. Checked {tile_location}") - return None, None + return None, None, None # If the raster exists but doesn't have a geo transform then it is likely invalid input data. geo_transform = ds.GetGeoTransform(can_return_null=True) if not geo_transform: logging.warning(f"DEM tile does not have geo transform metadata and can't be used: {tile_location}") - return None, None + return None, None, None - band_as_array = ds.GetRasterBand(1).ReadAsArray(0, 0, ds.RasterXSize, ds.RasterYSize) + raster_band = ds.GetRasterBand(1) + band_as_array = raster_band.ReadAsArray(0, 0, ds.RasterXSize, ds.RasterYSize) + height, width = band_as_array.shape sensor_model = GDALAffineSensorModel(geo_transform) - return band_as_array, sensor_model + # Compute the distance in meters from the upper left to the lower right corner. Divide that by the distance + # in pixels to get an approximate pixel size in meters + ul_corner_ecf = geodetic_to_geocentric(sensor_model.image_to_world(ImageCoordinate([0, 0]))).coordinate + lr_corner_ecf = geodetic_to_geocentric(sensor_model.image_to_world(ImageCoordinate([width, height]))).coordinate + post_spacing = np.linalg.norm(ul_corner_ecf - lr_corner_ecf) / np.sqrt(width * width + height * height) + + stats = raster_band.GetStatistics(True, True) + summary = ElevationRegionSummary( + min_elevation=stats[0], + max_elevation=stats[1], + no_data_value=raster_band.GetNoDataValue(), + post_spacing=post_spacing, + ) + + return band_as_array, sensor_model, summary diff --git a/src/aws/osml/photogrammetry/__init__.py b/src/aws/osml/photogrammetry/__init__.py index f62e01e..2fc0fac 100644 --- a/src/aws/osml/photogrammetry/__init__.py +++ b/src/aws/osml/photogrammetry/__init__.py @@ -16,7 +16,7 @@ geodetic_to_geocentric, ) from .digital_elevation_model import DigitalElevationModel, DigitalElevationModelTileFactory, DigitalElevationModelTileSet -from .elevation_model import ConstantElevationModel, ElevationModel +from .elevation_model import ConstantElevationModel, ElevationModel, ElevationRegionSummary from .gdal_sensor_model import GDALAffineSensorModel from .projective_sensor_model import ProjectiveSensorModel from .replacement_sensor_model import ( @@ -57,6 +57,7 @@ "DigitalElevationModelTileSet", "ConstantElevationModel", "ElevationModel", + "ElevationRegionSummary", "GDALAffineSensorModel", "SARImageCoordConverter", "INCAProjectionSet", diff --git a/src/aws/osml/photogrammetry/digital_elevation_model.py b/src/aws/osml/photogrammetry/digital_elevation_model.py index b5577c3..a843cd9 100644 --- a/src/aws/osml/photogrammetry/digital_elevation_model.py +++ b/src/aws/osml/photogrammetry/digital_elevation_model.py @@ -9,7 +9,7 @@ from scipy.interpolate import RectBivariateSpline from .coordinates import GeodeticWorldCoordinate -from .elevation_model import ElevationModel +from .elevation_model import ElevationModel, ElevationRegionSummary from .sensor_model import SensorModel @@ -52,7 +52,7 @@ def __init__(self) -> None: pass @abstractmethod - def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[SensorModel]]: + def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[SensorModel], Optional[ElevationRegionSummary]]: """ Retrieve a numpy array of elevation values and a sensor model. @@ -60,7 +60,7 @@ def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[SensorModel] :param tile_path: the location of the tile to load - :return: an array of elevation values and a sensor model + :return: an array of elevation values, a sensor model, and a summary """ @@ -115,14 +115,31 @@ def set_elevation(self, geodetic_world_coordinate: GeodeticWorldCoordinate) -> N if not tile_id: return - interpolation_grid, sensor_model = self.get_interpolation_grid(tile_id) + interpolation_grid, sensor_model, summary = self.get_interpolation_grid(tile_id) if interpolation_grid is not None and sensor_model is not None: image_coordinate = sensor_model.world_to_image(geodetic_world_coordinate) geodetic_world_coordinate.elevation = interpolation_grid(image_coordinate.x, image_coordinate.y)[0][0] + def describe_region(self, geodetic_world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]: + """ + Get a summary of the region near the provided world coordinate + + :param geodetic_world_coordinate: the coordinate at the center of the region of interest + :return: a summary of the elevation data in this tile + """ + + tile_id = self.tile_set.find_tile_id(geodetic_world_coordinate) + if not tile_id: + return + + interpolation_grid, sensor_model, summary = self.get_interpolation_grid(tile_id) + return summary + @cachedmethod(operator.attrgetter("raster_cache")) - def get_interpolation_grid(self, tile_path: str) -> Tuple[Optional[RectBivariateSpline], Optional[SensorModel]]: + def get_interpolation_grid( + self, tile_path: str + ) -> Tuple[Optional[RectBivariateSpline], Optional[SensorModel], Optional[ElevationRegionSummary]]: """ This method loads and converts an array of elevation values into a class that can interpolate values that lie between measured elevations. The sensor model is also @@ -135,13 +152,13 @@ def get_interpolation_grid(self, tile_path: str) -> Tuple[Optional[RectBivariate :param tile_path: the location of the tile to load - :return: the cached interpolation object and sensor model + :return: the cached interpolation object, sensor model, and summary """ - elevations_array, sensor_model = self.tile_factory.get_tile(tile_path) + elevations_array, sensor_model, summary = self.tile_factory.get_tile(tile_path) if elevations_array is not None and sensor_model is not None: height, width = elevations_array.shape x = range(0, width) y = range(0, height) - return RectBivariateSpline(x, y, elevations_array.T, kx=1, ky=1), sensor_model + return RectBivariateSpline(x, y, elevations_array.T, kx=1, ky=1), sensor_model, summary else: - return None, None + return None, None, None diff --git a/src/aws/osml/photogrammetry/elevation_model.py b/src/aws/osml/photogrammetry/elevation_model.py index 894dc0c..907cb1b 100644 --- a/src/aws/osml/photogrammetry/elevation_model.py +++ b/src/aws/osml/photogrammetry/elevation_model.py @@ -1,8 +1,22 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional from .coordinates import GeodeticWorldCoordinate +@dataclass +class ElevationRegionSummary: + """ + This class contains a general summary of an elevation tile. + """ + + min_elevation: float + max_elevation: float + no_data_value: int + post_spacing: float + + class ElevationModel(ABC): """ An elevation model associates a height z for a given x, y of a world coordinate. It typically provides information @@ -24,6 +38,15 @@ def set_elevation(self, world_coordinate: GeodeticWorldCoordinate) -> None: :return: None """ + @abstractmethod + def describe_region(self, world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]: + """ + Get a summary of the region near the provided world coordinate + + :param world_coordinate: the coordinate at the center of the region of interest + :return: the summary information + """ + class ConstantElevationModel(ElevationModel): """ @@ -50,3 +73,17 @@ def set_elevation(self, world_coordinate: GeodeticWorldCoordinate) -> None: :return: None """ world_coordinate.elevation = self.constant_elevation + + def describe_region(self, world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]: + """ + Get a summary of the region near the provided world coordinate + + :param world_coordinate: the coordinate at the center of the region of interest + :return: [min elevation value, max elevation value, no elevation data value, post spacing] + """ + return ElevationRegionSummary( + min_elevation=self.constant_elevation, + max_elevation=self.constant_elevation, + no_data_value=-32767, + post_spacing=30.0, + ) diff --git a/src/aws/osml/photogrammetry/sicd_sensor_model.py b/src/aws/osml/photogrammetry/sicd_sensor_model.py index 29f8ef6..5edb348 100644 --- a/src/aws/osml/photogrammetry/sicd_sensor_model.py +++ b/src/aws/osml/photogrammetry/sicd_sensor_model.py @@ -677,6 +677,294 @@ def rrdot_to_ground(self, r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity) return agpn + uvect_x * (gp_distance * cos_az) + uvect_y * (gp_distance * sin_az) +class HAERRDotSurfaceProjection(RRDotSurfaceProjection): + """ + This class implements the Precise R/RDot Height Above Ellipsioid (HAE) Projection described in Section 9 of the + SICD Specification Volume 3 (v1.3.0). + """ + + def __init__( + self, + scp_ecf: WorldCoordinate, + side_of_track: str, + hae: float, + height_threshold: float = 1.0, + max_number_iterations: int = 3, + ): + """ + Constructor for the projection that takes the image parameters and a height above theWGS-84 ellipsoid. + The parameter defaults are the recommended values for the user selectable parameters in section 9.2. + + :param scp_ecf: Scene Center Point position in ECF coordinates + :param side_of_track: side of track imaged + :param hae: the surface height (m) above the WGS-84 reference ellipsoid + :param height_threshold: the height threshold for convergence of iterative projection sequence + :param max_number_iterations: maximum number of iterations allowed + """ + self.scp_ecf = scp_ecf + self.scp_lle = geocentric_to_geodetic(scp_ecf) + self.side_of_track = side_of_track + self.hae = hae + + # Integer based on side of track parameter + self.look = 1.0 + if side_of_track == "R": + self.look = -1.0 + + self.delta_hae_max = height_threshold + self.nlim = max_number_iterations + + def rrdot_to_ground(self, r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity) -> np.ndarray: + """ + This method implements the R/RDot Contour Ground Plane Intersection described in section 9.2. + + The precise projection to a surface of constant height above the WGS-84 reference ellipsoid along an + R/Rdot contour. The R/Rdot contour is relative to an ARP Center Of Aperture position and velocity. The + algorithm computes the R/Rdot projection to one or more ground planes that are tangent to the constant + height surface. Each ground plane projection point computed is slightly above the constant HAE surface. + The final surface position is computed by projecting from the final ground plane projection point down + to the HAE surface. + + :param r_tgt_coa: target COA range + :param r_dot_tgt_coa: target COA range rate + :param arp_position: ARP position + :param arp_velocity: ARP velocity + :return: the intersection between the R/Rdot Contour and the ground plane + """ + # (1) Compute the geodetic ground plane normal at the SCP. Compute the parameters for the initial ground plane. + # The reference point position is gref and the unit normal is u_gpn. + u_gpn = np.array( + [ + np.cos(self.scp_lle.latitude) * np.cos(self.scp_lle.longitude), + np.cos(self.scp_lle.latitude) * np.sin(self.scp_lle.longitude), + np.sin(self.scp_lle.latitude), + ] + ) + gref = self.scp_ecf.coordinate + (self.hae - self.scp_ecf.z) * u_gpn + + cont = True + n = 1 + while cont: + # (2) Compute the precise projection along the R/Rdot contour to Ground Plane. The result is ground plane + # point position gpp_ecf. Convert from ECF coordinates to geodetic coordinates (gpp_lle). + gp_surface_projection = GroundPlaneRRDotSurfaceProjection(ref_ecf=WorldCoordinate(gref), gpn=u_gpn) + gpp_ecf = gp_surface_projection.rrdot_to_ground(r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity)[0] + gpp_lle = geocentric_to_geodetic(WorldCoordinate(coordinate=gpp_ecf)) + + # (3) Compute the unit vector in the increasing height direction at point gpp_lle, (u_up). Also + # compute the height difference at point gpp_lle relative to the desired surface height (delta_hae). + u_up = np.array( + [ + np.cos(gpp_lle.latitude) * np.cos(gpp_lle.longitude), + np.cos(gpp_lle.latitude) * np.sin(gpp_lle.longitude), + np.sin(gpp_lle.latitude), + ] + ) + delta_hae = gpp_lle.elevation - self.hae + + # (4) Test to see if the point is sufficiently close the surface or if the maximum number of iterations + # has been reached. Otherwise, compute a new ground reference point (gref) and unit normal (u_up); repeat + # Steps 2, 3 and 4. + if delta_hae <= self.delta_hae_max or n >= self.nlim: + cont = False + else: + gref = gpp_ecf - delta_hae * u_up + u_gpn = u_up + n += 1 + + # (5) Compute the unit slant plane normal vector, u_spn, that is tangent to the R/Rdot contour at point gpp. + # Unit vector u_spn points away from the center of the earth and in a direction of increasing HAE at gpp. + spn = np.cross(self.look * arp_velocity, gpp_ecf - arp_position) + u_spn = spn / np.linalg.norm(spn) + + # (6) Compute the straight line projection from point gpp_ecf along the slant plane normal to point slp. + # Point slp is very close to the precise R/Rdot contour intersection with the constant height surface. + # Convert the position of point slp from ECF coordinates to geodetic coordinates (slp_lle). + sf = np.dot(u_up, u_spn) + slp = gpp_ecf - (delta_hae / sf) * u_spn + slp_lle = geocentric_to_geodetic(WorldCoordinate(slp)) + + # (7) Assign surface point spp position by adjusting the HAE to be on the desired surface. Convert from + # geodetic coordinates to ECF coordinates. + spp_lle = GeodeticWorldCoordinate([slp_lle.longitude, slp_lle.latitude, self.hae]) + spp_ecf = geodetic_to_geocentric(spp_lle) + + return np.array([spp_ecf.coordinate]) + + +class DEMRRDotSurfaceProjection(RRDotSurfaceProjection): + """ + This class implements the Precise R/RDot Height Above a Digital Elevation Model (DEM) Projection described in + Section 10 of the SICD Specification Volume 3 (v1.3.0). + """ + + def __init__( + self, + scp_ecf: WorldCoordinate, + side_of_track: str, + elevation_model: ElevationModel, + max_adjacent_point_distance: float = 10.0, + height_threshold: float = 0.001, + ): + """ + Constructor for the projection that takes the image parameters and a digital elevation model (DEM). + The parameter defaults are the recommended values for the user selectable parameters in section 10.3. + + :param scp_ecf: Scene Center Point position in ECF coordinates + :param side_of_track: side of track imaged + :param elevation_model: the digital elevation model + :param max_adjacent_point_distance: Maximum distance between adjacent points along the R/Rdot contour + :param height_threshold: threshold for determining if a R/Rdot contour point is on the DEM surface (m) + """ + self.scp_ecf = scp_ecf + self.scp_lle = geocentric_to_geodetic(scp_ecf) + self.side_of_track = side_of_track + self.elevation_model = elevation_model + + # Integer based on Side of Track parameter. + self.look = 1.0 + if side_of_track == "R": + self.look = -1.0 + + elevation_summary = elevation_model.describe_region(self.scp_lle) + self.hae_min = elevation_summary.min_elevation + self.hae_max = elevation_summary.max_elevation + self.hae_max_surface_projection = HAERRDotSurfaceProjection( + scp_ecf=scp_ecf, side_of_track=side_of_track, hae=self.hae_max + ) + self.hae_min_surface_projection = HAERRDotSurfaceProjection( + scp_ecf=scp_ecf, side_of_track=side_of_track, hae=self.hae_min + ) + self.delta_dist_dem = 0.5 * elevation_summary.post_spacing + + self.delta_dist_rrc = max_adjacent_point_distance + self.delta_hd_lim = height_threshold + + def rrdot_to_ground(self, r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity) -> np.ndarray: + """ + This method implements the R/RDot Contour Ground Plane Intersection described in section 10.3 + + The R/Rdot contour is relative to an ARP Center Of Aperture position and velocity. The earth surface is + described by a Digital Elevation Model (DEM) that defines a unique surface height as a function of two + horizontal coordinates. The projection computation may yield one or more surface points that lie along + the R/Rdot contour. + + :param r_tgt_coa: target COA range + :param r_dot_tgt_coa: target COA range rate + :param arp_position: ARP position + :param arp_velocity: ARP velocity + :return: the intersection between the R/Rdot Contour and the DEM, if multiple intersections occur they will + be returned in order of increasing height above the WGS-84 ellipsoid. + """ + + # (1) Compute the center point (ctr) and the radius of the R/Rdot projection contour (rrrc). + v_mag = np.linalg.norm(arp_velocity) + u_vel = arp_velocity / v_mag + cos_dca = -1.0 * r_dot_tgt_coa / v_mag + sin_dca = np.sqrt(1 - cos_dca * cos_dca) + ctr = arp_position + r_tgt_coa * cos_dca * u_vel + rrrc = r_tgt_coa * sin_dca + + # (2) Compute the unit vectors u_rrx and u_rry to be used to compute points located on the R/Rdot contour. + dec_arp = np.linalg.norm(arp_position) + u_up = arp_position / dec_arp + rry = np.cross(u_up, u_vel) + u_rry = rry / np.linalg.norm(rry) + u_rrx = np.cross(u_rry, u_vel) + + # (3) Compute the projection along the R/Rdot contour to the surface of constant HAE at height hae_max. + # The projection point at height hae_max is point_a. Also compute the cosine and sine of the contour angle + # to point_a, cos_caa and sin_caa. + point_a = self.hae_max_surface_projection.rrdot_to_ground(r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity)[0] + cos_caa = np.dot(point_a - ctr, u_rrx) / rrrc + # This variable is defined in the specification but it does not appear to be used anywhere + # sin_caa = self.look * np.sqrt(1 - cos_caa * cos_caa) + + # (4) Compute the projection along the R/Rdot contour to the surface of constant HAE at height hae_min. + # The projection point at height hae_min is point_b. Also compute the cosine and sine of the contour angle + # to point_b, cos_cab and sin_cab. + point_b = self.hae_min_surface_projection.rrdot_to_ground(r_tgt_coa, r_dot_tgt_coa, arp_position, arp_velocity)[0] + cos_cab = np.dot(point_b - ctr, u_rrx) / rrrc + sin_cab = self.look * np.sqrt(1 - cos_cab * cos_cab) + + # (5) A set of points along the R/Rdot contour are to be computed. The points will be spaced in equal + # increments of the cosine of the contour angle. Compute the step size, delta_cos_ca. + + # (5.1) Step size delta_cos_rrc is computed such that the distance between adjacent points on the R/Rdot + # contour is approximately equal to delta_dist_rrc. + delta_cos_rrc = self.delta_dist_rrc * np.abs(sin_cab) / rrrc + + # (5.2) Step size delta_cos_dem is computed such that the horizontal distance between adjacent points on + # the R/Rdot contour is approximately equal to delta_dist_dem. + delta_cos_dem = self.delta_dist_dem * (np.abs(sin_cab) / cos_cab) / rrrc + + # (5.3) Set delta_cos_ca (Note the value of delta_cos_ca is < 0) + delta_cos_ca = -1.0 * min(delta_cos_rrc, delta_cos_dem) + + # (6) Determine the number of points along the R/Rdot contour to be computed, npts. + npts = int(np.floor((cos_caa - cos_cab) / delta_cos_ca)) + 2 + + # (7) Compute the set of points along the R/Rdot contour, {Pn} for n =0, 2, ..., npts-1. Initial point P1 is + # located on the hae_min surface. The final point is located above the hae_max surface. Point Pn is computed + # in ECF coordinates. Note that here n ranges from [0, npts-1] while in the specification n is [1, npts]. + # Equations have been modified accordingly. + points_ecf = [] + for n in range(0, npts): + cos_can = cos_cab + n * delta_cos_ca # n-1 is unnecessary since n is zero based here + sin_can = self.look * np.sqrt(1 - cos_can * cos_can) + pn = ctr + rrrc * (cos_can * u_rrx + sin_can * u_rry) + points_ecf.append(pn) + + # (8 - 10) For each of the NPTS points, convert from ECF coordinates to DEM coordinates (lon, lat, ele). Also + # compute the DEM surface height for the point with DEM horizontal coordinates (lon, lat). Compute the + # difference in height (delta_height) between the point on the contour and DEM surface point. Also, + # set an indicator to track if that point is ABOVE, ON, or BELOW the DEM surface. + # + # Contour points that are within delta_hd_lim of the surface point are considered to be “on” the surface and + # will be added to the result set. Also compute a result point when points n and n+1 when both are “off” + # the surface and the R/Rdot contour intersects the surface between them (i.e. indicator n-1 x indicator n = -1) + # + # All height coordinates are in meters. + intersection_points = [] + prev_indicator = None + prev_delta_height = None + for n in range(0, npts): + point_lle = geocentric_to_geodetic(WorldCoordinate(points_ecf[n])) + point_dem = GeodeticWorldCoordinate(point_lle.coordinate.copy()) + self.elevation_model.set_elevation(point_dem) + delta_height = point_lle.elevation - point_dem.elevation + + # Determine if the contour point is ABOVE (indicator = 1), ON (indicator = 0), or BELOW (indicator = -1) + if np.abs(delta_height) < self.delta_hd_lim: + # Contour point is on the DEM surface, add it to the result set + indicator = 0 + intersection_points.append(points_ecf[n]) + elif delta_height > self.delta_hd_lim: + # Contour point is above the DEM surface + indicator = 1 + else: + # Contour point is below the DEM surface + indicator = -1 + + # If in two adjacent points one is ABOVE and the other BELOW then the contour intersected the DEM + # surface between the two points. Interpolate an intersection point and add it to the result. + if prev_indicator and prev_indicator * indicator == -1: + # contour crossed between two points; compute the surface point between + frac = prev_delta_height / (prev_delta_height - delta_height) + cos_cas = cos_cab + (n - 1 + frac) * delta_cos_ca # here the -1 is necessary for previous point + sin_cas = self.look * np.sqrt(1 - cos_cas * cos_cas) + sm = ctr + rrrc * (cos_cas * u_rrx + sin_cas * u_rry) + intersection_points.append(sm) + + # Keep track of the current indicator and delta height for comparison to the next point + prev_indicator = indicator + prev_delta_height = delta_height + + # Return the set of surface points found. Points will be ordered of increasing height above the WGS-84 + # ellipsoid. + return np.array(intersection_points) + + class SICDSensorModel(SensorModel): """ This is an implementation of the SICD sensor model as described by SICD Volume 3 Image Projections Description @@ -704,7 +992,7 @@ def __init__( """ super().__init__() self.coa_projection_set = coa_projection_set - self.image_plane = coord_converter + self.coord_converter = coord_converter self.uvect_gpn = u_gpn self.scp_arp = scp_arp self.scp_varp = scp_varp @@ -715,8 +1003,7 @@ def __init__( self.uvect_spn *= -1.0 self.uvect_spn /= np.linalg.norm(self.uvect_spn) - # TODO: Add option for HAE ground assumption, does world_to_image always need a GroundPlaneProjection? - self.default_surface_projection = GroundPlaneRRDotSurfaceProjection(self.image_plane.scp_ecf, self.uvect_gpn) + self.default_surface_projection = GroundPlaneRRDotSurfaceProjection(self.coord_converter.scp_ecf, self.uvect_gpn) def image_to_world( self, @@ -726,7 +1013,9 @@ def image_to_world( ) -> GeodeticWorldCoordinate: """ This is an implementation of an Image Grid to Scene point projection that first projects the image - location to the R/RDot contour and then intersects the R/RDot contour with the elevation model. + location to the R/RDot contour and then intersects the R/RDot contour with the elevation model. If + an elevation model is provided then this routine intersects the R/Rdot contour with the DEM surface which + may result in multiple solutions. In that case the solution with the lowest HAE is returned. :param image_coordinate: the x,y image coordinate :param elevation_model: the optional elevation model, if none supplied a plane tangent to SCP is assumed @@ -734,13 +1023,18 @@ def image_to_world( :return: the lon, lat, elev geodetic coordinate of the surface matching the image coordinate """ row_col = np.array( - [image_coordinate.r + self.image_plane.first_pixel.r, image_coordinate.c + self.image_plane.first_pixel.c] + [ + image_coordinate.r + self.coord_converter.first_pixel.r, + image_coordinate.c + self.coord_converter.first_pixel.c, + ] ) - xrow_ycol = self.image_plane.rowcol_to_xrowycol(row_col=row_col) + xrow_ycol = self.coord_converter.rowcol_to_xrowycol(row_col=row_col) r_tgt_coa, r_dot_tgt_coa, time_coa, arp_coa, varp_coa = self.coa_projection_set.precise_rrdot_computation(xrow_ycol) if elevation_model is not None: - raise NotImplementedError("SICD sensor model with DEM not yet implemented") + surface_projection = DEMRRDotSurfaceProjection( + self.coord_converter.scp_ecf, self.side_of_track, elevation_model=elevation_model + ) else: surface_projection = self.default_surface_projection @@ -768,7 +1062,7 @@ def world_to_image(self, world_coordinate: GeodeticWorldCoordinate) -> ImageCoor # The GP to IP projection direction is along the SCP COA slant plane normal. Also, compute the image # plane unit normal, uIPN. Compute projection scale factor SF as shown. uvect_proj = self.uvect_spn - scale_factor = float(np.dot(uvect_proj, self.image_plane.uvect_ipn)) + scale_factor = float(np.dot(uvect_proj, self.coord_converter.uvect_ipn)) # (3) Set initial ground plane position G1 to the scene point position S. scene_point = np.array([ecf_world_coordinate.x, ecf_world_coordinate.y, ecf_world_coordinate.z]) @@ -782,9 +1076,9 @@ def world_to_image(self, world_coordinate: GeodeticWorldCoordinate) -> ImageCoor # (4) Project ground plane point g_n to image plane point i_n. The projection distance is dist_n. Compute # image coordinates xrown and ycoln. - dist_n = np.dot(self.image_plane.scp_ecf.coordinate - g_n, self.image_plane.uvect_ipn) / scale_factor + dist_n = np.dot(self.coord_converter.scp_ecf.coordinate - g_n, self.coord_converter.uvect_ipn) / scale_factor i_n = g_n + dist_n * uvect_proj - xrow_ycol_n = self.image_plane.ipp_to_xrowycol(i_n) + xrow_ycol_n = self.coord_converter.ipp_to_xrowycol(i_n) # (5) Compute the precise projection for image grid location (xrown, ycoln) to the ground plane containing # the scene point S. The result is point p_n. For image grid location (xrown, ycoln), compute COA @@ -805,9 +1099,11 @@ def world_to_image(self, world_coordinate: GeodeticWorldCoordinate) -> ImageCoor # grid location (xrown, ycoln) as the precise image grid location for scene point S. cont = delta_gpn > tolerance and (iteration < max_iterations) - row_col = self.image_plane.xrowycol_to_rowcol(xrow_ycol_n) + row_col = self.coord_converter.xrowycol_to_rowcol(xrow_ycol_n) # Convert the row_col image grid location to an x,y image coordinate. Note that row_col is in reference # to the full image, so we subtract off the first_pixel offset to make the image coordinate correct if this # is a chip. - return ImageCoordinate([row_col[1] - self.image_plane.first_pixel.x, row_col[0] - self.image_plane.first_pixel.y]) + return ImageCoordinate( + [row_col[1] - self.coord_converter.first_pixel.x, row_col[0] - self.coord_converter.first_pixel.y] + ) diff --git a/test/aws/osml/gdal/test_gdal_dem_tile_factory.py b/test/aws/osml/gdal/test_gdal_dem_tile_factory.py index 96d94d0..cb38444 100644 --- a/test/aws/osml/gdal/test_gdal_dem_tile_factory.py +++ b/test/aws/osml/gdal/test_gdal_dem_tile_factory.py @@ -21,7 +21,7 @@ def test_load_geotiff_tile(self): from aws.osml.photogrammetry import GeodeticWorldCoordinate tile_factory = GDALDigitalElevationModelTileFactory("./test/data") - elevation_array, sensor_model = tile_factory.get_tile("n47_e034_3arc_v2.tif") + elevation_array, sensor_model, summary = tile_factory.get_tile("n47_e034_3arc_v2.tif") assert elevation_array is not None assert elevation_array.shape == (1201, 1201) @@ -32,6 +32,12 @@ def test_load_geotiff_tile(self): assert center_image.x == pytest.approx(600.5, abs=1.0) assert center_image.y == pytest.approx(600.5, abs=1.0) + assert summary.min_elevation == -1.0 + assert summary.max_elevation == 152.0 + assert summary.no_data_value == -32767 + # The 3-arc second test file used here is somewhere around 90 meter resolution + assert abs(90.0 - summary.post_spacing) < 20.0 + if __name__ == "__main__": unittest.main() diff --git a/test/aws/osml/gdal/test_sensor_model_factory.py b/test/aws/osml/gdal/test_sensor_model_factory.py index f4d0f67..c347ad9 100644 --- a/test/aws/osml/gdal/test_sensor_model_factory.py +++ b/test/aws/osml/gdal/test_sensor_model_factory.py @@ -185,11 +185,11 @@ def test_sicd_sensor_models(self): scp_image_coord = ImageCoordinate( [ - sm.image_plane.scp_pixel.x - sm.image_plane.first_pixel.x, - sm.image_plane.scp_pixel.y - sm.image_plane.first_pixel.y, + sm.coord_converter.scp_pixel.x - sm.coord_converter.first_pixel.x, + sm.coord_converter.scp_pixel.y - sm.coord_converter.first_pixel.y, ] ) - scp_world_coord = geocentric_to_geodetic(sm.image_plane.scp_ecf) + scp_world_coord = geocentric_to_geodetic(sm.coord_converter.scp_ecf) assert np.allclose(scp_image_coord.coordinate, sm.world_to_image(scp_world_coord).coordinate, atol=1.0) diff --git a/test/aws/osml/photogrammetry/test_digital_elevation_model.py b/test/aws/osml/photogrammetry/test_digital_elevation_model.py index 4b4ea65..b54119c 100644 --- a/test/aws/osml/photogrammetry/test_digital_elevation_model.py +++ b/test/aws/osml/photogrammetry/test_digital_elevation_model.py @@ -13,6 +13,7 @@ def test_dem_interpolation(self): DigitalElevationModelTileFactory, DigitalElevationModelTileSet, ) + from aws.osml.photogrammetry.elevation_model import ElevationRegionSummary from aws.osml.photogrammetry.sensor_model import SensorModel mock_tile_set = mock.Mock(DigitalElevationModelTileSet) @@ -20,6 +21,7 @@ def test_dem_interpolation(self): # This is a sample 3x3 grid of elevation data test_elevation_data = np.array([[0.0, 1.0, 4.0], [1.0, 2.0, 3.0], [2.0, 3.0, 4.0]]) + test_elevation_summary = ElevationRegionSummary(0.0, 4.0, -1, 30.0) # These are the points we will test for interpolation test_grid_coordinates = [ @@ -44,7 +46,7 @@ def test_dem_interpolation(self): # This mock tile factory will always return the 3x3 elevation grid and the sensor model mock_tile_factory = mock.Mock(DigitalElevationModelTileFactory) - mock_tile_factory.get_tile.return_value = test_elevation_data, mock_sensor_model + mock_tile_factory.get_tile.return_value = test_elevation_data, mock_sensor_model, test_elevation_summary dem = DigitalElevationModel(mock_tile_set, mock_tile_factory) @@ -96,7 +98,7 @@ def test_missing_tile(self): mock_tile_set = mock.Mock(DigitalElevationModelTileSet) mock_tile_set.find_tile_id.return_value = "MockN00E000V0.tif" mock_tile_factory = mock.Mock(DigitalElevationModelTileFactory) - mock_tile_factory.get_tile.return_value = None, None + mock_tile_factory.get_tile.return_value = None, None, None dem = DigitalElevationModel(mock_tile_set, mock_tile_factory) diff --git a/test/aws/osml/photogrammetry/test_sicd_sensor_model.py b/test/aws/osml/photogrammetry/test_sicd_sensor_model.py index 357e1b0..f43cc2f 100644 --- a/test/aws/osml/photogrammetry/test_sicd_sensor_model.py +++ b/test/aws/osml/photogrammetry/test_sicd_sensor_model.py @@ -1,6 +1,7 @@ import unittest from math import radians from pathlib import Path +from typing import Optional, Tuple import numpy as np from xsdata.formats.dataclass.parsers import XmlParser @@ -8,6 +9,8 @@ import aws.osml.formats.sicd.models.sicd_v1_2_1 as sicd121 from aws.osml.gdal.sicd_sensor_model_builder import poly1d_to_native, poly2d_to_native, xyzpoly_to_native, xyztype_to_ndarray from aws.osml.photogrammetry import ( + ConstantElevationModel, + ElevationRegionSummary, GeodeticWorldCoordinate, ImageCoordinate, INCAProjectionSet, @@ -16,6 +19,7 @@ SARImageCoordConverter, SICDSensorModel, WorldCoordinate, + geocentric_to_geodetic, geodetic_to_geocentric, ) @@ -64,18 +68,38 @@ def test_xrgycr(self): side_of_track=str(sicd.scpcoa.side_of_track.value), ) + # This is a test class that forces a constant elevation model to have a min/max height range of 100 + # meters. It's necessary to force the DEM intersection code with the SICD sensor model to iterate through + # several points searching for an intersection. + class TestElevationModel(ConstantElevationModel): + def __init__(self, hae: float): + super().__init__(constant_elevation=hae) + + def describe_region( + self, world_coordinate: GeodeticWorldCoordinate + ) -> Optional[Tuple[float, float, float, float]]: + summary = super().describe_region(world_coordinate) + return ElevationRegionSummary( + min_elevation=summary.min_elevation - 100, + max_elevation=summary.max_elevation + 100, + no_data_value=summary.no_data_value, + post_spacing=summary.post_spacing, + ) + + scp_lle = geocentric_to_geodetic(scp_ecf) + elevation_model = TestElevationModel(scp_lle.elevation) geodetic_world_coordinate = sicd_sensor_model.image_to_world( - ImageCoordinate([sicd.image_data.scppixel.col, sicd.image_data.scppixel.row]) + ImageCoordinate([sicd.image_data.scppixel.col, sicd.image_data.scppixel.row]), elevation_model=elevation_model ) ecf_world_coordinate = geodetic_to_geocentric(geodetic_world_coordinate) - assert np.allclose(ecf_world_coordinate.coordinate, scp_ecf.coordinate) + assert np.allclose(ecf_world_coordinate.coordinate, scp_ecf.coordinate, atol=0.001) geo_scp_world_coordinate = GeodeticWorldCoordinate( [radians(sicd.geo_data.scp.llh.lon), radians(sicd.geo_data.scp.llh.lat), sicd.geo_data.scp.llh.hae] ) - assert np.allclose(geo_scp_world_coordinate.coordinate, geodetic_world_coordinate.coordinate) + assert np.allclose(geo_scp_world_coordinate.coordinate, geodetic_world_coordinate.coordinate, atol=1.0e-5) calculated_image_scp = sicd_sensor_model.world_to_image(geo_scp_world_coordinate)