Skip to content

Commit

Permalink
refactor: break out methods from app.py into submodules
Browse files Browse the repository at this point in the history
  • Loading branch information
drduhe committed Oct 11, 2024
1 parent cf017d1 commit 0c55b27
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 249 deletions.
296 changes: 65 additions & 231 deletions src/aws/osml/model_runner/app.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion src/aws/osml/model_runner/database/feature_table.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright 2023-2024 Amazon.com, Inc. or its affiliates.

import logging
import random
import time
Expand Down
4 changes: 0 additions & 4 deletions src/aws/osml/model_runner/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,5 @@ class SelfThrottledRegionException(Exception):
pass


class InvalidFeaturePropertiesException(Exception):
pass


class AggregateOutputFeaturesException(Exception):
pass
2 changes: 1 addition & 1 deletion src/aws/osml/model_runner/inference/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
from .detector import Detector
from .endpoint_factory import FeatureDetectorFactory
from .feature_selection import FeatureSelector
from .feature_utils import calculate_processing_bounds, get_source_property
from .feature_utils import calculate_processing_bounds, get_extents, get_source_property
from .http_detector import HTTPDetector
from .sm_detector import SMDetector
4 changes: 4 additions & 0 deletions src/aws/osml/model_runner/inference/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@

class FeatureDistillationException(Exception):
pass


class InvalidFeaturePropertiesException(Exception):
pass
105 changes: 101 additions & 4 deletions src/aws/osml/model_runner/inference/feature_utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
# Copyright 2023-2024 Amazon.com, Inc. or its affiliates.

import functools
import json
import logging
import math
from datetime import datetime
from io import BufferedReader
from json import dumps
from math import radians
from math import degrees, radians
from secrets import token_hex
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import shapely
from geojson import Feature, FeatureCollection, LineString, MultiLineString, MultiPoint, MultiPolygon, Point, Polygon, loads
from osgeo import gdal
from shapely.geometry.base import BaseGeometry

from aws.osml.model_runner.common import ImageDimensions
from aws.osml.photogrammetry import GeodeticWorldCoordinate, SensorModel
from aws.osml.model_runner.common import GeojsonDetectionField, ImageDimensions
from aws.osml.photogrammetry import GeodeticWorldCoordinate, ImageCoordinate, SensorModel

from .exceptions import InvalidFeaturePropertiesException

logger = logging.getLogger(__name__)


def features_to_image_shapes(sensor_model: SensorModel, features: List[Feature]) -> List[BaseGeometry]:
Expand Down Expand Up @@ -253,3 +260,93 @@ def get_source_property(image_location: str, image_extension: str, dataset: gdal
else:
logging.warning(f"Source metadata not available for {image_extension} image extension!")
return None


def get_extents(ds: gdal.Dataset, sm: SensorModel) -> Dict[str, Any]:
"""
Returns the geographic extents of the given GDAL dataset.
:param ds: GDAL dataset.
:param sm: OSML Sensor Model imputed for dataset
:return: Dictionary with keys 'north', 'south', 'east', 'west' representing the extents.
"""
# Compute WGS-84 world coordinates for each image corners to impute the extents for visualizations
image_corners = [[0, 0], [ds.RasterXSize, 0], [ds.RasterXSize, ds.RasterYSize], [0, ds.RasterYSize]]
geo_image_corners = [sm.image_to_world(ImageCoordinate(corner)) for corner in image_corners]
locations = [(degrees(p.latitude), degrees(p.longitude)) for p in geo_image_corners]
feature_bounds = functools.reduce(
lambda prev, f: [
min(f[0], prev[0]),
min(f[1], prev[1]),
max(f[0], prev[2]),
max(f[1], prev[3]),
],
locations,
[math.inf, math.inf, -math.inf, -math.inf],
)

return {
"north": feature_bounds[2],
"south": feature_bounds[0],
"east": feature_bounds[3],
"west": feature_bounds[1],
}


def add_properties_to_features(job_id: str, feature_properties: str, features: List[Feature]) -> List[Feature]:
"""
Add arbitrary and controlled property dictionaries to geojson feature properties
:param job_id: str = unique identifier for the job
:param feature_properties: str = additional feature properties or metadata from the image processing
:param features: List[geojson.Feature] = the list of features to update
:return: List[geojson.Feature] = updated list of features
"""
try:
feature_properties: List[dict] = json.loads(feature_properties)
for feature in features:
# Update the features with their inference metadata
feature["properties"].update(get_inference_metadata_property(job_id, feature["properties"]["inferenceTime"]))

# For the custom provided feature properties, update
for feature_property in feature_properties:
feature["properties"].update(feature_property)

# Remove unneeded feature properties if they are present
if feature.get("properties", {}).get("inferenceTime"):
del feature["properties"]["inferenceTime"]
if feature.get("properties", {}).get(GeojsonDetectionField.BOUNDS):
del feature["properties"][GeojsonDetectionField.BOUNDS]
if feature.get("properties", {}).get(GeojsonDetectionField.GEOM):
del feature["properties"][GeojsonDetectionField.GEOM]
if feature.get("properties", {}).get("detection_score"):
del feature["properties"]["detection_score"]
if feature.get("properties", {}).get("feature_types"):
del feature["properties"]["feature_types"]
if feature.get("properties", {}).get("image_id"):
del feature["properties"]["image_id"]
if feature.get("properties", {}).get("adjusted_feature_types"):
del feature["properties"]["adjusted_feature_types"]

except Exception as err:
logging.exception(err)
raise InvalidFeaturePropertiesException("Could not apply custom properties to features!")
return features


def get_inference_metadata_property(job_id: str, inference_time: str) -> Dict[str, Any]:
"""
Create an inference dictionary property to append to geojson features
:param job_id: str = unique identifier for the job
:param inference_time: str = the time the inference was made in epoch millisec
:return: Dict[str, Any] = an inference metadata dictionary property to attach to features
"""
inference_metadata_property = {
"inferenceMetadata": {
"jobId": job_id,
"inferenceDT": inference_time,
}
}
return inference_metadata_property
46 changes: 45 additions & 1 deletion src/aws/osml/model_runner/sink/sink_factory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Copyright 2023-2024 Amazon.com, Inc. or its affiliates.

import json
import logging
from typing import Any, Dict, List

from aws.osml.model_runner.api import InvalidImageRequestException
from geojson import Feature

from aws.osml.model_runner.api import InvalidImageRequestException, SinkMode
from aws.osml.model_runner.sink import KinesisSink, S3Sink, Sink

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,3 +52,44 @@ def outputs_to_sinks(destinations: List[Dict[str, Any]]) -> List[Sink]:
logger.error(error)
raise InvalidImageRequestException(error)
return outputs

@staticmethod
def sink_features(job_id: str, outputs: str, features: List[Feature]) -> bool:
"""
Writing the features output to S3 and/or Kinesis Stream
:param job_id: str = unique identifier for the job
:param outputs: str = details about the job output syncs
:param features: List[Features] = the list of features to update
:return: bool = if it has successfully written to an output sink
"""
tracking_output_sinks = {
"S3": False,
"Kinesis": False,
} # format: job_id = {"s3": true, "kinesis": true}

# Ensure we have outputs defined for where to dump our features
if outputs:
logging.debug(f"Writing aggregate feature for job '{job_id}'")
for sink in SinkFactory.outputs_to_sinks(json.loads(outputs)):
if sink.mode == SinkMode.AGGREGATE and job_id:
is_write_output_succeeded = sink.write(job_id, features)
tracking_output_sinks[sink.name()] = is_write_output_succeeded

# Log them let them know if both written to both outputs (S3 and Kinesis) or one in another
# If both couldn't write to either stream because both were down, return False. Otherwise True
if tracking_output_sinks["S3"] and not tracking_output_sinks["Kinesis"]:
logging.debug("ModelRunner was able to write the features to S3 but not Kinesis. Continuing...")
return True
elif not tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]:
logging.debug("ModelRunner was able to write the features to Kinesis but not S3. Continuing...")
return True
elif tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]:
logging.debug("ModelRunner was able to write the features to both S3 and Kinesis. Continuing...")
return True
else:
logging.error("ModelRunner was not able to write the features to either S3 or Kinesis. Failing...")
return False
else:
raise InvalidImageRequestException("No output destinations were defined for this image request!")
2 changes: 1 addition & 1 deletion src/aws/osml/model_runner/tile_worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# flake8: noqa

from .tile_worker import TileWorker
from .tile_worker_utils import process_tiles, setup_tile_workers
from .tile_worker_utils import process_tiles, select_features, setup_tile_workers
from .tiling_strategy import TilingStrategy
from .variable_overlap_tiling_strategy import VariableOverlapTilingStrategy
from .variable_tile_tiling_strategy import VariableTileTilingStrategy
68 changes: 62 additions & 6 deletions src/aws/osml/model_runner/tile_worker/tile_worker_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright 2023-2024 Amazon.com, Inc. or its affiliates.

import ast
import json
import logging
import tempfile
from pathlib import Path
Expand All @@ -10,19 +12,25 @@
from aws_embedded_metrics import MetricsLogger
from aws_embedded_metrics.metric_scope import metric_scope
from aws_embedded_metrics.unit import Unit
from geojson import Feature
from osgeo import gdal

from aws.osml.features import Geolocator, ImagedFeaturePropertyAccessor
from aws.osml.gdal import GDALConfigEnv
from aws.osml.image_processing.gdal_tile_factory import GDALTileFactory
from aws.osml.model_runner.database import RegionRequestItem, RegionRequestTable
from aws.osml.model_runner.api import RegionRequest
from aws.osml.model_runner.app_config import MetricLabels, ServiceConfig
from aws.osml.model_runner.common import (
FeatureDistillationDeserializer,
ImageRegion,
Timer,
get_credentials_for_assumed_role,
)
from aws.osml.model_runner.database import FeatureTable, RegionRequestItem, RegionRequestTable
from aws.osml.model_runner.inference import FeatureSelector
from aws.osml.model_runner.inference.endpoint_factory import FeatureDetectorFactory
from aws.osml.photogrammetry import ElevationModel, SensorModel

from ..api import RegionRequest
from ..app_config import MetricLabels, ServiceConfig
from ..common import Timer, get_credentials_for_assumed_role
from ..database import FeatureTable
from ..inference.endpoint_factory import FeatureDetectorFactory
from .exceptions import ProcessTilesException, SetupTileWorkersException
from .tile_worker import TileWorker
from .tiling_strategy import TilingStrategy
Expand Down Expand Up @@ -282,3 +290,51 @@ def sizeof_fmt(num: float, suffix: str = "B") -> str:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Yi", suffix)


def select_features(
feature_distillation_option: str,
features: List[Feature],
processing_bounds: ImageRegion,
region_size: str,
tile_size: str,
tile_overlap: str,
tiling_strategy: TilingStrategy,
) -> List[Feature]:
"""
Selects the desired features using the options in the JobItem (NMS, SOFT_NMS, etc.).
This code applies a feature selector only to the features that came from regions of the image
that were processed multiple times. First features are grouped based on the region they were
processed in. Any features found in the overlap area between regions are run through the
FeatureSelector. If they were not part of an overlap area between regions, they will be grouped
based on tile boundaries. Any features that fall into the overlap of adjacent tiles are filtered
by the FeatureSelector. All other features should not be duplicates; they are added to the result
without additional filtering.
Computationally, this implements two critical factors that lower the overall processing time for the
O(N^2) selection algorithms. First, it will filter out the majority of features that couldn't possibly
have duplicates generated by our tiled image processing; Second, it runs the selection algorithms
incrementally on much smaller groups of features.
:param region_size:
:param feature_distillation_option: str = the options used in selecting features (e.g., NMS/SOFT_NMS, thresholds)
:param features: List[Feature] = the list of geojson features to process
:param processing_bounds: the requested area of the image
:param region_size: str = region size to use for feature dedup
:param tile_size: str = size of the tiles used during processing
:param tile_overlap: str = overlap between tiles during processing
:param tiling_strategy: the tiling strategy to use for feature dedup
:return: List[Feature] = the list of geojson features after processing
"""
feature_distillation_option_dict = json.loads(feature_distillation_option)
feature_distillation_option = FeatureDistillationDeserializer().deserialize(feature_distillation_option_dict)
feature_selector = FeatureSelector(feature_distillation_option)

region_size = ast.literal_eval(region_size)
tile_size = ast.literal_eval(tile_size)
overlap = ast.literal_eval(tile_overlap)
deduped_features = tiling_strategy.cleanup_duplicate_features(
processing_bounds, region_size, tile_size, overlap, features, feature_selector
)

return deduped_features

0 comments on commit 0c55b27

Please sign in to comment.