Skip to content

Commit

Permalink
feat: add HAE and DEM projections to SICD sensor model (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
edparris authored Sep 13, 2023
1 parent a190b18 commit 93f044c
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 39 deletions.
1 change: 0 additions & 1 deletion src/aws/osml/formats/sicd/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
# nothing here
39 changes: 32 additions & 7 deletions src/aws/osml/gdal/gdal_dem_tile_factory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import logging
from typing import Any, Optional, Tuple

import numpy as np
from osgeo import gdal

from aws.osml.photogrammetry import DigitalElevationModelTileFactory, GDALAffineSensorModel
from aws.osml.photogrammetry import (
DigitalElevationModelTileFactory,
ElevationRegionSummary,
GDALAffineSensorModel,
ImageCoordinate,
geodetic_to_geocentric,
)


class GDALDigitalElevationModelTileFactory(DigitalElevationModelTileFactory):
Expand All @@ -24,15 +31,17 @@ def __init__(self, tile_directory: str) -> None:
super().__init__()
self.tile_directory = tile_directory

def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[GDALAffineSensorModel]]:
def get_tile(
self, tile_path: str
) -> Tuple[Optional[Any], Optional[GDALAffineSensorModel], Optional[ElevationRegionSummary]]:
"""
Retrieve a numpy array of elevation values and a sensor model.
TODO: Replace Any with numpy.typing.ArrayLike once we move to numpy >1.20
:param tile_path: the location of the tile to load
:return: an array of elevation values and a sensor model or (None, None)
:return: an array of elevation values, a sensor model, and a summary or (None, None, None)
"""
tile_location = f"{self.tile_directory}/{tile_path}"
tile_location = tile_location.replace("s3:/", "/vsis3", 1)
Expand All @@ -43,15 +52,31 @@ def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[GDALAffineSe
# information isn't available.
if not ds:
logging.debug(f"No DEM tile available for {tile_path}. Checked {tile_location}")
return None, None
return None, None, None

# If the raster exists but doesn't have a geo transform then it is likely invalid input data.
geo_transform = ds.GetGeoTransform(can_return_null=True)
if not geo_transform:
logging.warning(f"DEM tile does not have geo transform metadata and can't be used: {tile_location}")
return None, None
return None, None, None

band_as_array = ds.GetRasterBand(1).ReadAsArray(0, 0, ds.RasterXSize, ds.RasterYSize)
raster_band = ds.GetRasterBand(1)
band_as_array = raster_band.ReadAsArray(0, 0, ds.RasterXSize, ds.RasterYSize)
height, width = band_as_array.shape
sensor_model = GDALAffineSensorModel(geo_transform)

return band_as_array, sensor_model
# Compute the distance in meters from the upper left to the lower right corner. Divide that by the distance
# in pixels to get an approximate pixel size in meters
ul_corner_ecf = geodetic_to_geocentric(sensor_model.image_to_world(ImageCoordinate([0, 0]))).coordinate
lr_corner_ecf = geodetic_to_geocentric(sensor_model.image_to_world(ImageCoordinate([width, height]))).coordinate
post_spacing = np.linalg.norm(ul_corner_ecf - lr_corner_ecf) / np.sqrt(width * width + height * height)

stats = raster_band.GetStatistics(True, True)
summary = ElevationRegionSummary(
min_elevation=stats[0],
max_elevation=stats[1],
no_data_value=raster_band.GetNoDataValue(),
post_spacing=post_spacing,
)

return band_as_array, sensor_model, summary
3 changes: 2 additions & 1 deletion src/aws/osml/photogrammetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
geodetic_to_geocentric,
)
from .digital_elevation_model import DigitalElevationModel, DigitalElevationModelTileFactory, DigitalElevationModelTileSet
from .elevation_model import ConstantElevationModel, ElevationModel
from .elevation_model import ConstantElevationModel, ElevationModel, ElevationRegionSummary
from .gdal_sensor_model import GDALAffineSensorModel
from .projective_sensor_model import ProjectiveSensorModel
from .replacement_sensor_model import (
Expand Down Expand Up @@ -57,6 +57,7 @@
"DigitalElevationModelTileSet",
"ConstantElevationModel",
"ElevationModel",
"ElevationRegionSummary",
"GDALAffineSensorModel",
"SARImageCoordConverter",
"INCAProjectionSet",
Expand Down
35 changes: 26 additions & 9 deletions src/aws/osml/photogrammetry/digital_elevation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from scipy.interpolate import RectBivariateSpline

from .coordinates import GeodeticWorldCoordinate
from .elevation_model import ElevationModel
from .elevation_model import ElevationModel, ElevationRegionSummary
from .sensor_model import SensorModel


Expand Down Expand Up @@ -52,15 +52,15 @@ def __init__(self) -> None:
pass

@abstractmethod
def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[SensorModel]]:
def get_tile(self, tile_path: str) -> Tuple[Optional[Any], Optional[SensorModel], Optional[ElevationRegionSummary]]:
"""
Retrieve a numpy array of elevation values and a sensor model.
TODO: Replace Any with numpy.typing.ArrayLike once we move to numpy >1.20
:param tile_path: the location of the tile to load
:return: an array of elevation values and a sensor model
:return: an array of elevation values, a sensor model, and a summary
"""


Expand Down Expand Up @@ -115,14 +115,31 @@ def set_elevation(self, geodetic_world_coordinate: GeodeticWorldCoordinate) -> N
if not tile_id:
return

interpolation_grid, sensor_model = self.get_interpolation_grid(tile_id)
interpolation_grid, sensor_model, summary = self.get_interpolation_grid(tile_id)

if interpolation_grid is not None and sensor_model is not None:
image_coordinate = sensor_model.world_to_image(geodetic_world_coordinate)
geodetic_world_coordinate.elevation = interpolation_grid(image_coordinate.x, image_coordinate.y)[0][0]

def describe_region(self, geodetic_world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]:
"""
Get a summary of the region near the provided world coordinate
:param geodetic_world_coordinate: the coordinate at the center of the region of interest
:return: a summary of the elevation data in this tile
"""

tile_id = self.tile_set.find_tile_id(geodetic_world_coordinate)
if not tile_id:
return

interpolation_grid, sensor_model, summary = self.get_interpolation_grid(tile_id)
return summary

@cachedmethod(operator.attrgetter("raster_cache"))
def get_interpolation_grid(self, tile_path: str) -> Tuple[Optional[RectBivariateSpline], Optional[SensorModel]]:
def get_interpolation_grid(
self, tile_path: str
) -> Tuple[Optional[RectBivariateSpline], Optional[SensorModel], Optional[ElevationRegionSummary]]:
"""
This method loads and converts an array of elevation values into a class that can
interpolate values that lie between measured elevations. The sensor model is also
Expand All @@ -135,13 +152,13 @@ def get_interpolation_grid(self, tile_path: str) -> Tuple[Optional[RectBivariate
:param tile_path: the location of the tile to load
:return: the cached interpolation object and sensor model
:return: the cached interpolation object, sensor model, and summary
"""
elevations_array, sensor_model = self.tile_factory.get_tile(tile_path)
elevations_array, sensor_model, summary = self.tile_factory.get_tile(tile_path)
if elevations_array is not None and sensor_model is not None:
height, width = elevations_array.shape
x = range(0, width)
y = range(0, height)
return RectBivariateSpline(x, y, elevations_array.T, kx=1, ky=1), sensor_model
return RectBivariateSpline(x, y, elevations_array.T, kx=1, ky=1), sensor_model, summary
else:
return None, None
return None, None, None
37 changes: 37 additions & 0 deletions src/aws/osml/photogrammetry/elevation_model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional

from .coordinates import GeodeticWorldCoordinate


@dataclass
class ElevationRegionSummary:
"""
This class contains a general summary of an elevation tile.
"""

min_elevation: float
max_elevation: float
no_data_value: int
post_spacing: float


class ElevationModel(ABC):
"""
An elevation model associates a height z for a given x, y of a world coordinate. It typically provides information
Expand All @@ -24,6 +38,15 @@ def set_elevation(self, world_coordinate: GeodeticWorldCoordinate) -> None:
:return: None
"""

@abstractmethod
def describe_region(self, world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]:
"""
Get a summary of the region near the provided world coordinate
:param world_coordinate: the coordinate at the center of the region of interest
:return: the summary information
"""


class ConstantElevationModel(ElevationModel):
"""
Expand All @@ -50,3 +73,17 @@ def set_elevation(self, world_coordinate: GeodeticWorldCoordinate) -> None:
:return: None
"""
world_coordinate.elevation = self.constant_elevation

def describe_region(self, world_coordinate: GeodeticWorldCoordinate) -> Optional[ElevationRegionSummary]:
"""
Get a summary of the region near the provided world coordinate
:param world_coordinate: the coordinate at the center of the region of interest
:return: [min elevation value, max elevation value, no elevation data value, post spacing]
"""
return ElevationRegionSummary(
min_elevation=self.constant_elevation,
max_elevation=self.constant_elevation,
no_data_value=-32767,
post_spacing=30.0,
)
Loading

0 comments on commit 93f044c

Please sign in to comment.