diff --git a/docs/source/coregistration.rst b/docs/source/coregistration.rst index fdcaf97a..6239fcb0 100644 --- a/docs/source/coregistration.rst +++ b/docs/source/coregistration.rst @@ -2,17 +2,23 @@ DEM Coregistration ================== + +.. contents:: Contents + :local: + +Introduction +^^^^^^^^^^^^ + Coregistration of a DEM is performed when it needs to be compared to a reference, but the DEM does not align with the reference perfectly. There are many reasons for why this might be, for example: poor georeferencing, unknown coordinate system transforms or vertical datums, and instrument- or processing-induced distortion. A main principle of all coregistration approaches is the assumption that all or parts of the portrayed terrain are unchanged between the reference and the DEM to be aligned. This *stable ground* can be extracted by masking out features that are assumed to be unstable. Then, the DEM to be aligned is translated, rotated and/or bent to fit the stable surfaces of the reference DEM as well as possible. -In mountainous environments, unstable areas could be: glaciers, landslides, vegetation, dead-ice terrain and human settlements. +In mountainous environments, unstable areas could be: glaciers, landslides, vegetation, dead-ice terrain and human structures. Unless the entire terrain is assumed to be stable, a mask layer is required. -``xdem`` supports either shapefiles or rasters to specify the masked (unstable) areas. -There are multiple methods for coregistration, and each have their own strengths and weaknesses. +There are multiple approaches for coregistration, and each have their own strengths and weaknesses. Below is a summary of how each method works, and when it should (and should not) be used. **Example data** @@ -28,18 +34,42 @@ Examples are given using data close to Longyearbyen on Svalbard. These can be lo # Download the necessary data. This may take a few minutes. xdem.examples.download_longyearbyen_examples(overwrite=False) + ### Load the data using xdem and geoutils (could be with rasterio and geopandas instead) # Load a reference DEM from 2009 reference_dem = xdem.DEM(xdem.examples.FILEPATHS["longyearbyen_ref_dem"]) # Load a moderately well aligned DEM from 1990 - dem_to_be_aligned = xdem.DEM(xdem.examples.FILEPATHS["longyearbyen_tba_dem"]) + dem_to_be_aligned = xdem.DEM(xdem.examples.FILEPATHS["longyearbyen_tba_dem"]).resample(reference_dem) # Load glacier outlines from 1990. This will act as the unstable ground. glacier_outlines = gu.Vector(xdem.examples.FILEPATHS["longyearbyen_glacier_outlines"]) -.. contents:: Contents - :local: + + # Prepare the inputs for coregistration. + ref_data = reference_dem.data # This is a numpy 2D array/masked_array + tba_data = dem_to_be_aligned.data # This is a numpy 2D array/masked_array + mask = glacier_outlines.create_mask(reference_dem) == 255 # This is a boolean numpy 2D array + transform = reference_dem.transform # This is a rio.transform.Affine object. + + + +The Coreg object +^^^^^^^^^^^^^^^^^^^^ +:class:`xdem.coreg.Coreg` + +Each of the coregistration approaches in ``xdem`` inherit their interface from the ``Coreg`` class. +It is written in a style that should resemble that of ``scikit-learn`` (see their `LinearRegression `_ class for example). +Each coregistration approach has the methods: + +* ``.fit()`` for estimating the transform. +* ``.apply()`` for applying the transform to a DEM. +* ``.apply_pts()`` for applying the transform to a set of 3D points. +* ``.to_matrix()`` to convert the transform to a 4x4 transformation matrix, if possible. + +First, ``.fit()`` is called to estimate the transform, and then this transform can be used or exported using the subsequent methods. Nuth and Kääb (2011) ^^^^^^^^^^^^^^^^^^^^ +:class:`xdem.coreg.NuthKaab` + - **Performs:** translation and bias corrections. - **Supports weights** (soon) - **Recommended for:** Noisy data with low rotational differences. @@ -63,15 +93,17 @@ Example ******* .. code-block:: python - aligned_dem, error = xdem.coreg.coregister( - reference_dem, - dem_to_be_aligned, - method="nuth_kaab", - mask=glacier_outlines - ) + nuth_kaab = coreg.NuthKaab() + # Fit the data to a suitable x/y/z offset. + nuth_kaab.fit(ref_data, tba_data, transform=transform, mask=mask) + + # Apply the transformation to the data (or any other data) + aligned_dem = nuth_kaab.apply(tba_data, transform=transform) Deramping ^^^^^^^^^ +:class:`xdem.coreg.Deramp` + - **Performs:** Bias, linear or nonlinear height corrections. - **Supports weights** (soon) - **Recommended for:** Data with no horizontal offset and low to moderate rotational differences. @@ -91,17 +123,51 @@ Example ******* .. code-block:: python - # Apply a 1st order deramping correction. - deramped_dem, error = xdem.coreg.coregister( - reference_dem, - dem_to_be_aligned, - method="deramp", - deramp_degree=1, - mask=glacier_outlines - ) + # Instantiate a 1st order deramping object. + deramp = coreg.Deramp(degree=1) + # Fit the data to a suitable polynomial solution. + deramp.fit(ref_data, tba_data, transform=transform, mask=mask) + + # Apply the transformation to the data (or any other data) + deramped_dem = deramp.apply(dem_to_be_aligned.data, transform=dem_to_be_aligned.transform) + + +Bias correction +^^^^^^^^^^^^^^^ +:class:`xdem.coreg.BiasCorr` + +- **Performs:** (Weighted) bias correction using the mean, median or anything else +- **Supports weights** (soon) +- **Recommended for:** A precursor step to e.g. ICP. + +``BiasCorr`` has very similar functionality to ``Deramp(degree=0)`` or the z-component of `Nuth and Kääb (2011)`_. +This function is more customizable, for example allowing changing of the bias algorithm (from weighted average to e.g. median). +It should also be faster, since it is a single function call. + +Limitations +*********** +Only performs vertical corrections, so it should be combined with another approach. + +Example +******* +.. code-block:: python + + bias_corr = coreg.BiasCorr() + # Note that the transform argument is not needed, since it is a simple vertical correction. + bias_corr.fit(ref_data, tba_data, mask=mask) + + # Apply the bias to a DEM + corrected_dem = bias_corr.apply(tba_data) + + # Use median bias instead + bias_median = coreg.BiasCorr(bias_func=np.median) + + bias_median.fit(... # etc. ICP ^^^ +:class:`xdem.coreg.ICP` + - **Performs:** Rigid transform correction (translation + rotation). - **Does not support weights** - **Recommended for:** Data with low noise and a high relative rotation. @@ -117,8 +183,7 @@ This may improve results on noisy data significantly, but care should still be t Limitations *********** -ICP is notoriously bad on noisy data. -TODO: Add references for ICP being bad on noisy data. +ICP often works poorly on noisy data. The outlier removal functionality of the opencv implementation is a step in the right direction, but it still does not compete with other coregistration approaches when the relative rotation is small. In cases of high rotation, ICP is the only approach that can account for this properly, but results may need refinement, for example with the `Nuth and Kääb (2011)`_ approach. @@ -128,12 +193,63 @@ Example ******* .. code-block:: python - # Use the opencv ICP implementation. For PDAL, use "icp_pdal") - aligned_dem, error = xdem.coreg.coregister( - reference_dem, - dem_to_be_aligned, - method="icp", - mask=glacier_outlines - ) + # Instantiate the object with default parameters + icp = coreg.ICP() + # Fit the data to a suitable transformation. + icp.fit(ref_data, tba_data, transform=transform, mask=mask) + + # Apply the transformation matrix to the data (or any other data) + aligned_dem = icp.apply(tba_data, transform=transform) + + +The CoregPipeline object +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +:class:`xdem.coreg.CoregPipeline` + +Often, more than one coregistration approach is necessary to obtain the best results. +For example, ICP works poorly with large initial biases, so a ``CoregPipeline`` can be constructed to perform both sequentially: + +.. code-block:: python + + pipeline = coreg.CoregPipeline([coreg.BiasCorr(), coreg.ICP()]) + + pipeline.fit(... # etc. + + # This works identically to the syntax above + pipeline2 = coreg.BiasCorr() + coreg.ICP() + +The ``CoregPipeline`` object exposes the same interface as the ``Coreg`` object. +The results of a pipeline can be used in other programs by exporting the combined transformation matrix: + +.. code-block:: python + + pipeline.to_matrix() + + +This class is heavily inspired by the `Pipeline `_ and `make_pipeline() `_ functionalities in ``scikit-learn``. + +Suggested pipelines +******************* + +For sub-pixel accuracy, the `Nuth and Kääb (2011)`_ approach should almost always be used. +The approach does not account for rotations in the dataset, however, so a combination is often necessary. +For small rotations, a 1st degree deramp could be used: + +.. code-block:: python + + coreg.NuthKaab() + coreg.Deramp(degree=1) + +For larger rotations, ICP is the only reliable approach (but does not outperform in sub-pixel accuracy): + +.. code-block:: python + + coreg.ICP() + coreg.NuthKaab() + + +For large biases, rotations and high amounts of noise: + +.. code-block:: python + coreg.BiasCorr() + coreg.ICP() + coreg.NuthKaab() + diff --git a/setup.py b/setup.py index 23f41648..a2375f8a 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,8 @@ packages=['xdem'], install_requires=['numpy', 'scipy', 'rasterio', 'geopandas', 'pyproj', 'tqdm', 'geoutils @ https://github.com/GlacioHack/GeoUtils/tarball/master', 'scikit-gstat'], - extras_require={'rioxarray': ['rioxarray'], 'richdem': ['richdem'], 'pdal': ['pdal'], 'opencv': ['opencv']}, + extras_require={'rioxarray': ['rioxarray'], 'richdem': ['richdem'], 'pdal': [ + 'pdal'], 'opencv': ['opencv'], "pytransform3d": ["pytransform3d"]}, scripts=[], zip_safe=False) diff --git a/tests/test_coreg.py b/tests/test_coreg.py index fcdaed5d..815e8c44 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg.py @@ -6,13 +6,19 @@ """ from __future__ import annotations +import copy import os import tempfile +import time +import warnings from typing import Any import geoutils as gu +import numpy as np -from xdem import coreg, examples +with warnings.catch_warnings(): + warnings.simplefilter("ignore") + from xdem import coreg, examples, spatial_tools def load_examples() -> tuple[gu.georaster.Raster, gu.georaster.Raster, gu.geovector.Vector]: @@ -126,3 +132,237 @@ def test_only_paths(): to_be_aligned_raster = examples.FILEPATHS["longyearbyen_tba_dem"] coreg.coregister(reference_raster, to_be_aligned_raster) + + +class TestCoregClass: + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask. + inlier_mask = outlines.create_mask(ref) != 255 + + fit_params = dict( + reference_dem=ref.data, + dem_to_be_aligned=tba.data, + inlier_mask=inlier_mask, + transform=ref.transform, + verbose=False, + ) + # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. + points = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T + + def test_bias(self): + warnings.simplefilter("error") + + # Create a bias correction instance + biascorr = coreg.BiasCorr() + # Fit the bias model to the data + biascorr.fit(**self.fit_params) + + # Check that a bias was found. + assert biascorr._meta.get("bias") is not None + assert biascorr._meta["bias"] != 0.0 + + # Copy the bias to see if it changes in the test (it shouldn't) + bias = copy.copy(biascorr._meta["bias"]) + + # Check that the to_matrix function works as it should + matrix = biascorr.to_matrix() + assert matrix[2, 3] == bias, matrix + + # Check that the first z coordinate is now the bias + assert biascorr.apply_pts(self.points)[0, 2] == biascorr._meta["bias"] + + # Apply the model to correct the DEM + tba_unbiased = biascorr.apply(self.tba.data, None) + + # Create a new bias correction model + biascorr2 = coreg.BiasCorr() + # Check that this is indeed a new object + assert biascorr is not biascorr2 + # Fit the corrected DEM to see if the bias will be close to or at zero + biascorr2.fit(reference_dem=self.ref.data, dem_to_be_aligned=tba_unbiased, inlier_mask=self.inlier_mask) + # Test the bias + assert abs(biascorr2._meta.get("bias")) < 0.01 + + # Check that the original model's bias has not changed (that the _meta dicts are two different objects) + assert biascorr._meta["bias"] == bias + + def test_nuth_kaab(self): + warnings.simplefilter("error") + + nuth_kaab = coreg.NuthKaab(max_iterations=10) + + # Synthesize a shifted and vertically offset DEM + pixel_shift = 2 + bias = 5 + shifted_dem = self.ref.data.squeeze().copy() + shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift] + shifted_dem[:, :pixel_shift] = np.nan + shifted_dem += bias + + # Fit the synthesized shifted DEM to the original + nuth_kaab.fit(self.ref.data.squeeze(), shifted_dem, + transform=self.ref.transform, verbose=self.fit_params["verbose"]) + + # Make sure that the estimated offsets are similar to what was synthesized. + assert abs(nuth_kaab._meta["offset_east_px"] - pixel_shift) < 0.03 + assert abs(nuth_kaab._meta["offset_north_px"]) < 0.03 + assert abs(nuth_kaab._meta["bias"] + bias) < 0.03 + + # Apply the estimated shift to "revert the DEM" to its original state. + unshifted_dem = nuth_kaab.apply(shifted_dem, transform=self.ref.transform) + # Measure the difference (should be more or less zero) + diff = np.asarray(self.ref.data.squeeze() - unshifted_dem) + + # Check that the median is very close to zero + assert np.abs(np.nanmedian(diff)) < 0.01 + # Check that the RMSE is low + assert np.sqrt(np.nanmean(np.square(diff))) < 1 + + # Transform some arbitrary points. + transformed_points = nuth_kaab.apply_pts(self.points) + + # Check that the x shift is close to the pixel_shift * image resolution + assert abs((transformed_points[0, 0] - self.points[0, 0]) + pixel_shift * self.ref.res[0]) < 0.1 + # Check that the z shift is close to the original bias. + assert abs((transformed_points[0, 2] - self.points[0, 2]) + bias) < 0.1 + + def test_deramping(self): + warnings.simplefilter("error") + + # Try a 1st degree deramping. + deramp = coreg.Deramp(degree=1) + + # Fit the data + deramp.fit(**self.fit_params) + + # Apply the deramping to a DEm + deramped_dem = deramp.apply(self.tba.data, self.ref.transform) + + # Get the periglacial offset after deramping + periglacial_offset = (self.ref.data.squeeze() - deramped_dem)[self.inlier_mask.squeeze()] + # Get the periglacial offset before deramping + pre_offset = (self.ref.data - self.tba.data).squeeze()[self.inlier_mask] + + # Check that the error improved + assert np.abs(np.mean(periglacial_offset)) < np.abs(np.mean(pre_offset)) + + # Check that the mean periglacial offset is low + assert np.abs(np.mean(periglacial_offset)) < 1 + + # Try a 0 degree deramp (basically bias correction) + deramp0 = coreg.Deramp(degree=0) + deramp0.fit(**self.fit_params) + + # Check that only one coefficient exists (y = x + a => coefficients=["a"]) + assert len(deramp0._meta["coefficients"]) == 1 + # Extract said bias + bias = deramp0._meta["coefficients"][0] + + # Make sure to_matrix does not throw an error. It will for higher degree deramps + deramp0.to_matrix() + + # Check that the apply_pts would apply a z shift equal to the bias + assert deramp0.apply_pts(self.points)[0, 2] == bias + + def test_icp_opencv(self): + warnings.simplefilter("error") + + # Do a fast an dirty 3 iteration ICP just to make sure it doesn't error out. + icp = coreg.ICP(max_iterations=3) + icp.fit(**self.fit_params) + + aligned_dem = icp.apply(self.tba.data, self.ref.transform) + + assert aligned_dem.shape == self.ref.data.squeeze().shape + + def test_pipeline(self): + warnings.simplefilter("error") + + # Create a pipeline from two coreg methods. + pipeline = coreg.CoregPipeline([coreg.BiasCorr(), coreg.ICP(max_iterations=3)]) + pipeline.fit(**self.fit_params) + + aligned_dem = pipeline.apply(self.tba.data, self.ref.transform) + + assert aligned_dem.shape == self.ref.data.squeeze().shape + + # Make a new pipeline with two bias correction approaches. + pipeline2 = coreg.CoregPipeline([coreg.BiasCorr(), coreg.BiasCorr()]) + # Set both "estimated" biases to be 1 + pipeline2.pipeline[0]._meta["bias"] = 1 + pipeline2.pipeline[1]._meta["bias"] = 1 + + # Assert that the combined bias is 2 + pipeline2.to_matrix()[2, 3] == 2.0 + + def test_coreg_add(self): + warnings.simplefilter("error") + # Test with a bias of 4 + bias = 4 + + bias1 = coreg.BiasCorr() + bias2 = coreg.BiasCorr() + + # Set the bias attribute + for bias_corr in (bias1, bias2): + bias_corr._meta["bias"] = bias + + # Add the two coregs and check that the resulting bias is 2* bias + bias3 = bias1 + bias2 + assert bias3.to_matrix()[2, 3] == bias * 2 + + # Make sure the correct exception is raised on incorrect additions + try: + bias1 + 1 + except ValueError as exception: + if "Incompatible add type" not in str(exception): + raise exception + + # Try to add a Coreg step to an already existing CoregPipeline + bias4 = bias3 + bias1 + assert bias4.to_matrix()[2, 3] == bias * 3 + + # Try to add two CoregPipelines + bias5 = bias3 + bias3 + assert bias5.to_matrix()[2, 3] == bias * 4 + + def test_subsample(self): + warnings.simplefilter("error") + + # Test subsampled bias correction + bias_sub = coreg.BiasCorr() + + # Fit the bias using 50% of the unmasked data using a fraction + bias_sub.fit(**self.fit_params, subsample=0.5) + # Do the same but specify the pixel count instead. + # They are not perfectly equal (np.count_nonzero(self.mask) // 2 would be exact) + # But this would just repeat the subsample code, so that makes little sense to test. + bias_sub.fit(**self.fit_params, subsample=self.tba.data.size // 2) + + # Do full bias corr to compare + bias_full = coreg.BiasCorr() + bias_full.fit(**self.fit_params) + + # Check that the estimated biases are similar + assert abs(bias_sub._meta["bias"] - bias_full._meta["bias"]) < 0.1 + + # Test ICP with subsampling + icp_full = coreg.ICP(max_iterations=20) + icp_sub = coreg.ICP(max_iterations=20) + + # Measure the start and stop time to get the duration + start_time = time.time() + icp_full.fit(**self.fit_params) + icp_full_duration = time.time() - start_time + + # Do the same with 50% subsampling + start_time = time.time() + icp_sub.fit(**self.fit_params, subsample=0.5) + icp_sub_duration = time.time() - start_time + + # Make sure that the subsampling increased performance + assert icp_full_duration > icp_sub_duration + + # Calculate the difference in the full vs. subsampled ICP matrices + matrix_diff = np.abs(icp_full.to_matrix() - icp_sub.to_matrix()) + # Check that the x/y/z differences do not exceed 30cm + assert np.count_nonzero(matrix_diff > 0.3) == 0 diff --git a/xdem/coreg.py b/xdem/coreg.py index c1aac73b..bc124068 100644 --- a/xdem/coreg.py +++ b/xdem/coreg.py @@ -33,6 +33,8 @@ from rasterio import Affine from tqdm import trange +import xdem + try: import richdem as rd _has_rd = True @@ -45,6 +47,12 @@ except ImportError: _has_cv2 = False +try: + from pytransform3d.transform_manager import TransformManager + _HAS_P3D = True +except ImportError: + _HAS_P3D = False + def filter_by_range(ds: rio.DatasetReader, rangelim: tuple[float, float]): """ @@ -148,6 +156,7 @@ def write_geotiff(filepath: str, values: np.ndarray, crs: Optional[rio.crs.CRS], :param crs: The coordinate system of the raster. :param bounds: The bounding coordinates of the raster. """ + warnings.warn("This function is not needed anymore.", DeprecationWarning) transform = rio.transform.from_bounds(*bounds, width=values.shape[1], height=values.shape[0]) nodata_value = np.finfo(values.dtype).min @@ -268,6 +277,7 @@ def icp_coregistration_pdal(reference_dem: np.ndarray, dem_to_be_aligned: np.nda # noqa: DAR101 **_ """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) dem_to_be_aligned_unmasked = dem_to_be_aligned.copy() # Check that PDAL is installed. check_for_pdal() @@ -431,6 +441,7 @@ def icp_coregistration_opencv(reference_dem: np.ndarray, dem_to_be_aligned: np.n # noqa: DAR101 **_ """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) if not _has_cv2: raise AssertionError("opencv (cv2) is not available and needs to be installed.") dem_to_be_aligned_unmasked = dem_to_be_aligned.copy() @@ -628,6 +639,7 @@ def deramping(elevation_difference, x_coordinates: np.ndarray, y_coordinates: np :returns: A callable function to estimate the ramp. """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) # Extract only the finite values of the elevation difference and corresponding coordinates. valid_diffs = elevation_difference[np.isfinite(elevation_difference)] valid_x_coords = x_coordinates[np.isfinite(elevation_difference)] @@ -735,6 +747,7 @@ def deramp_dem(reference_dem: np.ndarray, dem_to_be_aligned: np.ndarray, mask: O # noqa: DAR101 **_ """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) elevation_difference = (reference_dem - dem_to_be_aligned) # Apply the mask if it exists. @@ -769,6 +782,7 @@ def calculate_nmad(array: np.ndarray) -> float: :returns: The NMAD of the array. """ + warnings.warn("This function is deprecated in favour of xdem.spatial_tools.nmad().", DeprecationWarning) # TODO: Get a reference for why NMAD is used (and make sure the N stands for normalized) nmad = 1.4826 * np.nanmedian(np.abs(array - np.nanmedian(array))) @@ -795,6 +809,7 @@ def amaury_coregister_dem(reference_dem: np.ndarray, dem_to_be_aligned: np.ndarr # noqa: DAR101 **_ """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) # TODO: Add offset_east and offset_north as return variables? # Make a new DEM which will be modified inplace aligned_dem = dem_to_be_aligned.copy() @@ -924,6 +939,8 @@ def amaury_coregister_dem(reference_dem: np.ndarray, dem_to_be_aligned: np.ndarr class CoregMethod(Enum): """A selection of a coregistration method.""" + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) + ICP_PDAL = icp_coregistration_pdal ICP_OPENCV = icp_coregistration_opencv AMAURY = amaury_coregister_dem @@ -1016,6 +1033,7 @@ def coregister(reference_raster: Union[str, gu.georaster.Raster], to_be_aligned_ :returns: A coregistered Raster and the NMAD of the (potentially) masked offsets. """ + warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) # Load GeoUtils Rasters/Vectors if filepaths are given as arguments. if isinstance(reference_raster, str): reference_raster = gu.georaster.Raster(reference_raster) @@ -1063,3 +1081,634 @@ def coregister(reference_raster: Union[str, gu.georaster.Raster], to_be_aligned_ ) return aligned_raster, error + + +def _transform_to_bounds_and_res(shape: tuple[int, int], + transform: rio.transform.Affine) -> tuple[rio.coords.BoundingBox, float]: + """Get the bounding box and (horizontal) resolution from a transform and the shape of a DEM.""" + bounds = rio.coords.BoundingBox( + *rio.transform.array_bounds(shape[0], shape[1], transform=transform)) + resolution = (bounds.right - bounds.left) / shape[1] + + return bounds, resolution + + +def _get_x_and_y_coords(shape: tuple[int, int], transform: rio.transform.Affine): + """Generate center coordinates from a transform and the shape of a DEM.""" + bounds, resolution = _transform_to_bounds_and_res(shape, transform) + x_coords, y_coords = np.meshgrid( + np.linspace(bounds.left + resolution / 2, bounds.right - resolution / 2, num=shape[1]), + np.linspace(bounds.bottom + resolution / 2, bounds.top - resolution / 2, num=shape[0])[::-1] + ) + return x_coords, y_coords + + +class Coreg: + _meta: Optional[dict[str, Any]] = None # All __init__ functions should instantiate an empty dict. + _fit_called = False # Flag to check if the .fit() method has been called. + + def __init__(self): + """This function should have been overwritten by subclassing.""" + raise ValueError("Coreg class should not be instantiated directly.") + + def fit(self, reference_dem: Union[np.ndarray, np.ma.masked_array], + dem_to_be_aligned: Union[np.ndarray, np.ma.masked_array], + inlier_mask: Optional[np.ndarray] = None, + transform: Optional[rio.transform.Affine] = None, + weights: Optional[np.ndarray] = None, + subsample: Union[float, int] = 1.0, + verbose: bool = False): + """ + Estimate the coregistration transform on the given DEMs. + + :param reference_dem: 2D array of elevation values acting reference. + :param dem_to_be_aligned: 2D array of elevation values to be aligned. + :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). + :param transform: Optional. Transform of the reference_dem. Mandatory in some cases. + :param weights: Optional. Per-pixel weights for the coregistration. + :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. + :param verbose: Print progress messages to stdout. + """ + # Make sure that the mask has an expected format. + if inlier_mask is not None: + inlier_mask = np.asarray(inlier_mask) + assert inlier_mask.dtype == bool, f"Invalid mask dtype: '{inlier_mask.dtype}'. Expected 'bool'" + + if weights is not None: + raise NotImplementedError("Weights have not yet been implemented") + + # The reference mask is the union of the nan occurrence and the (potential) ma mask. + ref_mask = np.isnan(reference_dem) | (reference_dem.mask + if isinstance(reference_dem, np.ma.masked_array) else False) + # The to-be-aligned mask is the union of the nan occurrence and the (potential) ma mask. + tba_mask = np.isnan(dem_to_be_aligned) | (dem_to_be_aligned.mask + if isinstance(dem_to_be_aligned, np.ma.masked_array) else False) + + # The full mask (inliers=True) is the inverse of the above masks and the provided mask. + full_mask = (~ref_mask & ~tba_mask & (np.asarray(inlier_mask) if inlier_mask is not None else True)).squeeze() + + # If subsample is not equal to one, subsampling should be performed. + if subsample != 1.0: + # If subsample is less than one, it is parsed as a fraction (e.g. 0.8 => retain 80% of the values) + if subsample < 1.0: + subsample = int(np.count_nonzero(full_mask) * (1 - subsample)) + + # Randomly pick N inliers in the full_mask where N=subsample + random_falses = np.random.choice(np.argwhere(full_mask.flatten()).squeeze(), int(subsample), replace=False) + # Convert the 1D indices to 2D indices + cols = (random_falses // full_mask.shape[0]).astype(int) + rows = random_falses % full_mask.shape[0] + # Set the N random inliers to be parsed as outliers instead. + full_mask[rows, cols] = False + + # The arrays to provide the functions will be ndarrays with NaNs for masked out areas. + ref_dem = np.where(full_mask, np.asarray(reference_dem), np.nan).squeeze() + tba_dem = np.where(full_mask, np.asarray(dem_to_be_aligned), np.nan).squeeze() + + # Run the associated fitting function + self._fit_func(ref_dem=ref_dem, tba_dem=tba_dem, transform=transform, weights=weights, verbose=verbose) + + # Flag that the fitting function has been called. + self._fit_called = True + + def apply(self, dem: Union[np.ndarray, np.ma.masked_array], + transform: rio.transform.Affine) -> Union[np.ndarray, np.ma.masked_array]: + """ + Apply the estimated transform to a DEM. + + :param dem: A DEM to apply the transform on. + :param transform: The transform object of the DEM. TODO: Remove?? + + :returns: The transformed DEM. + """ + if not self._fit_called: + raise AssertionError(".fit() does not seem to have been called yet") + + # The mask is the union of the nan occurrence and the (potential) ma mask. + dem_mask = (np.isnan(dem) | (dem.mask if isinstance(dem, np.ma.masked_array) else False)).squeeze() + + # The array to provide the functions will be an ndarray with NaNs for masked out areas. + dem_array = np.where(~dem_mask, np.asarray(dem), np.nan).squeeze() + + # Run the associated apply function + applied_dem = self._apply_func(dem_array, transform) + + # Return the array in the same format as it was given (ndarray or masked_array) + return np.ma.masked_array(applied_dem, mask=dem.mask) if isinstance(dem, np.ma.masked_array) else applied_dem + + def apply_pts(self, coords: np.ndarray) -> np.ndarray: + """ + Apply the estimated transform to a set of 3D points. + + :param coords: A (N, 3) array of X/Y/Z coordinates. + + :returns: The transformed coordinates. + """ + if not self._fit_called: + raise AssertionError(".fit() does not seem to have been called yet") + assert coords.shape[1] == 3, f"'coords' shape must be (N, 3). Given shape: {coords.shape}" + + return self._apply_pts_func(coords) + + def to_matrix(self) -> np.ndarray: + """Convert the transform to a 4x4 transformation matrix.""" + return self._to_matrix_func() + + def __add__(self, other: Coreg) -> CoregPipeline: + """Return a pipeline consisting of self and the other coreg function.""" + if not isinstance(other, Coreg): + raise ValueError(f"Incompatible add type: {type(other)}. Expected 'Coreg' subclass") + return CoregPipeline([self, other]) + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + raise NotImplementedError("This should have been implemented by subclassing") + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + raise NotImplementedError("This should have been implemented by subclassing") + + def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray: + raise NotImplementedError("This should have been implemented by subclassing") + + def _to_matrix_func(self) -> np.ndarray: + raise NotImplementedError("This should be implemented by subclassing") + + +class BiasCorr(Coreg): + """ + DEM bias correction. + + Estimates the mean (or median, weighted avg., etc.) offset between two DEMs. + """ + + def __init__(self, bias_func=np.average): # pylint: disable=super-init-not-called + """ + Instantiate a bias correction object. + + :param bias_func: The function to use for calculating the bias. Default: (weighted) average. + """ + self._meta: dict[str, Any] = {"bias_func": bias_func} + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + """Estimate the bias using the bias_func.""" + if verbose: + print("Estimating bias...") + diff = ref_dem - tba_dem + diff = diff[np.isfinite(diff)] + + # Use weights if those were provided. + bias = self._meta["bias_func"](diff) if weights is None \ + else self._meta["bias_func"](diff, weights=weights) + + if verbose: + print("Bias estimated") + + self._meta["bias"] = bias + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + """Apply the bias to a DEM.""" + return dem + self._meta["bias"] + + def _apply_pts_func(self, coords: np.ndarray): + """Apply the bias to a given coordinate array.""" + new_coords = coords.copy() + new_coords[:, 2] += self._apply_func(coords[:, 2], None) # type: ignore + + return new_coords + + def _to_matrix_func(self) -> np.ndarray: + """Convert the bias to a transform matrix.""" + empty_matrix = np.diag(np.ones(4, dtype=float)) + + empty_matrix[2, 3] += self._meta["bias"] + + return empty_matrix + + +class ICP(Coreg): + """ + Iterative Closest Point DEM coregistration. + + Estimates a rigid transform (rotation + translation) between two DEMs. + + Requires 'opencv' + See opencv docs for more info: https://docs.opencv.org/master/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html + """ + + def __init__(self, max_iterations=100, tolerance=0.05, rejection_scale=2.5, num_levels=6): # pylint: disable=super-init-not-called + """ + Instantiate an ICP coregistration object. + + :param max_iterations: The maximum allowed iterations before stopping. + :param tolerance: The residual change threshold after which to stop the iterations. + :param rejection_scale: The threshold (std * rejection_scale) to consider points as outliers. + :param num_levels: Number of octree levels to consider. A higher number is faster but may be more inaccurate. + """ + if not _has_cv2: + raise ValueError("Optional dependency needed. Install 'opencv'") + self.max_iterations = max_iterations + self.tolerance = tolerance + self.rejection_scale = rejection_scale + self.num_levels = num_levels + self._meta: dict[str, Any] = {} + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + """Estimate the rigid transform from tba_dem to ref_dem.""" + if weights is not None: + warnings.warn("ICP was given weights, but does not support it.") + + bounds, resolution = _transform_to_bounds_and_res(ref_dem.shape, transform) + points: dict[str, np.ndarray] = {} + # Generate the x and y coordinates for the reference_dem + x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform) + # Subtract by the bounding coordinates to avoid float32 rounding errors. + x_coords -= bounds.left + y_coords -= bounds.bottom + for key, dem in zip(["ref", "tba"], [ref_dem, tba_dem]): + + gradient_x, gradient_y = np.gradient(dem) + + normal_east = np.sin(np.arctan(gradient_y / resolution)) * -1 + normal_north = np.sin(np.arctan(gradient_x / resolution)) + normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) + + valid_mask = ~np.isnan(dem) & ~np.isnan(normal_east) & ~np.isnan(normal_north) + + point_cloud = np.dstack([ + x_coords[valid_mask], + y_coords[valid_mask], + dem[valid_mask], + normal_east[valid_mask], + normal_north[valid_mask], + normal_up[valid_mask] + ]).squeeze() + + points[key] = point_cloud[~np.any(np.isnan(point_cloud), axis=1)].astype("float32") + + icp = cv2.ppf_match_3d_ICP(self.max_iterations, self.tolerance, self.rejection_scale, self.num_levels) + if verbose: + print("Running ICP...") + _, residual, matrix = icp.registerModelToScene(points["tba"], points["ref"]) + if verbose: + print("ICP finished") + + assert residual < 1000, f"ICP coregistration failed: residual={residual}, threshold: 1000" + + self._meta["matrix"] = matrix + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + """Apply the coregistration matrix to a DEM.""" + bounds, resolution = _transform_to_bounds_and_res(dem.shape, transform) + x_coords, y_coords = _get_x_and_y_coords(dem.shape, transform) + x_coords -= bounds.left + y_coords -= bounds.bottom + + valid_mask = np.isfinite(dem) + transformed_points = self._apply_pts_func(np.dstack([ + x_coords[valid_mask], + y_coords[valid_mask], + dem[valid_mask] + ]).squeeze()) + + aligned_dem = scipy.interpolate.griddata( + points=transformed_points[:, :2], + values=transformed_points[:, 2], + xi=tuple(np.meshgrid( + np.linspace(bounds.left, bounds.right, dem.shape[1]) - bounds.left, + np.linspace(bounds.bottom, bounds.top, dem.shape[0])[::-1] - bounds.bottom + )), + method="cubic" + ) + aligned_dem[~valid_mask] = np.nan + + return aligned_dem + + def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray: + """Apply the coregistration matrix to a set of points.""" + transformed_points = cv2.perspectiveTransform(coords.reshape(1, -1, 3), self.to_matrix()).squeeze() + return transformed_points + + def _to_matrix_func(self) -> np.ndarray: + """Return the coregistration matrix.""" + return self._meta["matrix"] + + +class Deramp(Coreg): + """ + Polynomial DEM deramping. + + Estimates an n-D polynomial between the difference of two DEMs. + """ + + def __init__(self, degree: int = 1): + """ + Instantiate a deramping correction object. + + :param degree: The polynomial degree to estimate. degree=0 is a simple bias correction. + """ + self.degree = degree + + self._meta: dict[str, Any] = {} + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + """Fit the dDEM between the DEMs to a least squares polynomial equation.""" + x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform) + + ddem = ref_dem - tba_dem + valid_mask = np.isfinite(ddem) + ddem = ddem[valid_mask] + x_coords = x_coords[valid_mask] + y_coords = y_coords[valid_mask] + + # Formulate the 2D polynomial whose coefficients will be solved for. + def poly2d(x_coordinates: np.ndarray, y_coordinates: np.ndarray, + coefficients: np.ndarray) -> np.ndarray: + """ + Estimate values from a 2D-polynomial. + + :param x_coordinates: x-coordinates of the difference array (must have the same shape as elevation_difference). + :param y_coordinates: y-coordinates of the difference array (must have the same shape as elevation_difference). + :param coefficients: The coefficients (a, b, c, etc.) of the polynomial. + :param degree: The degree of the polynomial. + + :raises ValueError: If the length of the coefficients list is not compatible with the degree. + + :returns: The values estimated by the polynomial. + """ + # Check that the coefficient size is correct. + coefficient_size = (self.degree + 1) * (self.degree + 2) / 2 + if len(coefficients) != coefficient_size: + raise ValueError() + + # Do Amaury's black magic to formulate and calculate the polynomial equation. + estimated_values = np.sum([coefficients[k * (k + 1) // 2 + j] * x_coordinates ** (k - j) * + y_coordinates ** j for k in range(self.degree + 1) for j in range(k + 1)], axis=0) + return estimated_values # type: ignore + + def residuals(coefs: np.ndarray, x_coords: np.ndarray, y_coords: np.ndarray, targets: np.ndarray): + return np.median(np.abs(targets - poly2d(x_coords, y_coords, coefs))) + + if verbose: + print("Estimating deramp function...") + coefs = scipy.optimize.fmin( + func=residuals, + x0=np.zeros(shape=((self.degree + 1) * (self.degree + 2) // 2)), + args=(x_coords, y_coords, ddem), + disp=verbose + ) + + self._meta["coefficients"] = coefs + self._meta["func"] = lambda x, y: poly2d(x, y, coefs) + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + """Apply the deramp function to a DEM.""" + x_coords, y_coords = _get_x_and_y_coords(dem.shape, transform) + + ramp = self._meta["func"](x_coords, y_coords) + + return dem + ramp + + def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray: + """Apply the deramp function to a set of points.""" + new_coords = coords.copy() + + new_coords[:, 2] += self._meta["func"](new_coords[:, 0], new_coords[:, 1]) + + return new_coords + + def _to_matrix_func(self) -> np.ndarray: + """Return a transform matrix if possible.""" + if self.degree > 1: + raise ValueError( + "Nonlinear deramping degrees cannot be represented as transformation matrices." + f" (max 1, given: {self.degree})") + if self.degree == 1: + raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.") + + # If degree==0, it's just a bias correction + empty_matrix = np.diag(np.ones(4, dtype=float)) + + empty_matrix[2, 3] += self._meta["coefficients"][0] + + return empty_matrix + + +class CoregPipeline(Coreg): + """ + A sequential set of coregistration steps. + """ + + def __init__(self, pipeline: list[Coreg]): # pylint: disable=super-init-not-called + """ + Instantiate a new coregistration pipeline. + + :param: Coregistration steps to run in the sequence they are given. + """ + self.pipeline = pipeline + self._meta = {} + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + """Fit each coregistration step with the previously transformed DEM.""" + tba_dem_mod = tba_dem.copy() + + for i, coreg in enumerate(self.pipeline): + if verbose: + print(f"Running pipeline step: {i + 1} / {len(self.pipeline)}") + coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, weights=weights, verbose=verbose) + coreg._fit_called = True + + tba_dem_mod = coreg._apply_func(tba_dem_mod, transform) + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + """Apply the coregistration steps sequentially to a DEM.""" + dem_mod = dem.copy() + + for coreg in self.pipeline: + dem_mod = coreg.apply(dem_mod, transform) + + return dem_mod + + def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray: + """Apply the coregistration steps sequentially to a set of points.""" + coords_mod = coords.copy() + + for coreg in self.pipeline: + coords_mod = coreg._apply_pts_func(coords_mod) + + return coords_mod + + def _to_matrix_func(self) -> np.ndarray: + """Try to join the coregistration steps to a single transformation matrix.""" + if not _HAS_P3D: + raise ValueError("Optional dependency needed. Install 'pytransform3d'") + + transform_mgr = TransformManager() + + with warnings.catch_warnings(): + # Deprecation warning from pytransform3d. Let's hope that is fixed in the near future. + warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") + for i, coreg in enumerate(self.pipeline): + new_matrix = coreg.to_matrix() + + transform_mgr.add_transform(i, i + 1, new_matrix) + + return transform_mgr.get_transform(0, len(self.pipeline)) + + def __iter__(self): + """Iterate over the pipeline steps.""" + for coreg in self.pipeline: + yield coreg + + def __add__(self, other: Union[list[Coreg], Coreg, CoregPipeline]) -> CoregPipeline: + """Append Coreg(s) or a CoregPipeline to the pipeline.""" + if not isinstance(other, Coreg): + other = list(other) + else: + other = [other] + + pipelines = self.pipeline + other + + return CoregPipeline(pipelines) + + +class NuthKaab(Coreg): + """ + Nuth and Kääb (2011) DEM coregistration. + + Implemented after the paper: + https://doi.org/10.5194/tc-5-271-2011 + """ + + def __init__(self, max_iterations: int = 50, error_threshold: float = 0.05): # pylint: disable=super-init-not-called + """ + Instantiate a new Nuth and Kääb (2011) coregistration object. + + :param max_iterations: The maximum allowed iterations before stopping. + :param error_threshold: The residual change threshold after which to stop the iterations. + """ + self.max_iterations = max_iterations + self.error_threshold = error_threshold + + self._meta: dict[str, Any] = {} + + def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optional[rio.transform.Affine], + weights: Optional[np.ndarray], verbose: bool = False): + """Estimate the x/y/z offset between two DEMs.""" + bounds, resolution = _transform_to_bounds_and_res(ref_dem.shape, transform) + # Make a new DEM which will be modified inplace + aligned_dem = tba_dem.copy() + + # Calculate slope and aspect maps from the reference DEM + slope, aspect = calculate_slope_and_aspect(ref_dem) + + # Make index grids for the east and north dimensions + east_grid = np.arange(ref_dem.shape[1]) + north_grid = np.arange(ref_dem.shape[0]) + + # Make a function to estimate the aligned DEM (used to construct an offset DEM) + elevation_function = scipy.interpolate.RectBivariateSpline(x=north_grid, y=east_grid, + z=np.where(np.isnan(aligned_dem), -9999, aligned_dem)) + # Make a function to estimate nodata gaps in the aligned DEM (used to fix the estimated offset DEM) + nodata_function = scipy.interpolate.RectBivariateSpline(x=north_grid, y=east_grid, z=np.isnan(aligned_dem)) + # Initialise east and north pixel offset variables (these will be incremented up and down) + offset_east, offset_north, bias = 0.0, 0.0, 0.0 + + if verbose: + print("Running Nuth and Kääb (2011) coregistration") + # Iteratively run the analysis until the maximum iterations or until the error gets low enough + for i in trange(self.max_iterations, disable=not verbose, desc="Iteratively correcting dataset"): + + # Calculate the elevation difference and the residual (NMAD) between them. + elevation_difference = ref_dem - aligned_dem + bias = np.nanmedian(elevation_difference) + # Correct potential biases + elevation_difference -= bias + + nmad = xdem.spatial_tools.nmad(elevation_difference) + + assert ~np.isnan(nmad), (offset_east, offset_north) + + # Stop if the NMAD is low and a few iterations have been made + if i > 5 and nmad < self.error_threshold: + if verbose: + print(f"NMAD went below the error threshold of {self.error_threshold}") + break + + # Estimate the horizontal shift from the implementation by Nuth and Kääb (2011) + east_diff, north_diff, _ = get_horizontal_shift( # type: ignore + elevation_difference=elevation_difference, + slope=slope, + aspect=aspect + ) + # Increment the offsets with the overall offset + offset_east += east_diff + offset_north += north_diff + + # Calculate new elevations from the offset x- and y-coordinates + new_elevation = elevation_function(y=east_grid + offset_east, x=north_grid - offset_north) + + # Set NaNs where NaNs were in the original data + new_nans = nodata_function(y=east_grid + offset_east, x=north_grid - offset_north) + new_elevation[new_nans >= 1] = np.nan + + # Assign the newly calculated elevations to the aligned_dem + aligned_dem = new_elevation + + self._meta["offset_east_px"] = offset_east + self._meta["offset_north_px"] = offset_north + self._meta["bias"] = bias + self._meta["resolution"] = resolution + + def _apply_func(self, dem: np.ndarray, transform: rio.transform.Affine) -> np.ndarray: + """Apply the estimated x/y/z offsets to a DEM.""" + bounds, resolution = _transform_to_bounds_and_res(dem.shape, transform) + scaling_factor = self._meta["resolution"] / resolution + + # Make index grids for the east and north dimensions + east_grid = np.arange(dem.shape[1]) * scaling_factor + north_grid = np.arange(dem.shape[0]) * scaling_factor + + # Make a function to estimate the DEM (used to construct an offset DEM) + elevation_function = scipy.interpolate.RectBivariateSpline(x=north_grid, y=east_grid, + z=np.where(np.isnan(dem), -9999, dem)) + # Make a function to estimate nodata gaps in the aligned DEM (used to fix the estimated offset DEM) + nodata_function = scipy.interpolate.RectBivariateSpline(x=north_grid, y=east_grid, z=np.isnan(dem)) + + shifted_east_grid = east_grid + self._meta["offset_east_px"] + shifted_north_grid = north_grid - self._meta["offset_north_px"] + + shifted_dem = elevation_function(y=shifted_east_grid, x=shifted_north_grid) + new_nans = nodata_function(y=shifted_east_grid, x=shifted_north_grid) + shifted_dem[new_nans >= 1] = np.nan + + shifted_dem += self._meta["bias"] + + return shifted_dem + + def _apply_pts_func(self, coords: np.ndarray) -> np.ndarray: + """Apply the estimated x/y/z offsets to a set of points.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] + offset_north = self._meta["offset_north_px"] * self._meta["resolution"] + + new_coords = coords.copy() + new_coords[:, 0] -= offset_east + new_coords[:, 1] -= offset_north + new_coords[:, 2] += self._meta["bias"] + + return new_coords + + def _to_matrix_func(self) -> np.ndarray: + """Return a transformation matrix from the estimated offsets.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] + offset_north = self._meta["offset_north_px"] * self._meta["resolution"] + + matrix = np.diag(np.ones(4, dtype=float)) + matrix[0, 3] += offset_east + matrix[1, 3] += offset_north + matrix[2, 3] += self._meta["bias"] + + return matrix