Skip to content

Commit

Permalink
feature: http endpoint support
Browse files Browse the repository at this point in the history
  • Loading branch information
drduhe committed Oct 26, 2023
1 parent aeb9609 commit cb919bb
Show file tree
Hide file tree
Showing 15 changed files with 4,653 additions and 24 deletions.
23 changes: 12 additions & 11 deletions bin/process_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@

# set up a cli tool for the script using argparse
parser = argparse.ArgumentParser("process_image")
parser.add_argument("--image", help="The target image URL to process with OSML Model Runner.", type=str, default="small")
parser.add_argument("--model", help="The target model to use for object detection.", type=str, default="centerpoint")
parser.add_argument("--image", help="Target image URL to process with OSML Model Runner.", type=str, default="small")
parser.add_argument("--model", help="Target model to use for object detection.", type=str, default="centerpoint")
parser.add_argument("--skip_integ", help="Whether or not to compare image with known results.", action="store_true")
parser.add_argument("--tile_format", help="The target tile format to use for tiling.", type=str)
parser.add_argument("--tile_compression", help="The compression used for the target image.", type=str)
parser.add_argument("--tile_size", help="The tile size to split the image into for model processing.", type=str)
parser.add_argument("--tile_overlap", help="The tile overlap to consider when processing regions.", type=str)
parser.add_argument("--feature_selection_options", help="The feature selection options JSON string.", type=str)
parser.add_argument("--region", help="The AWS region OSML is deployed to.", type=str, default=default_region)
parser.add_argument("--account", help="The AWS account OSML is deployed to.", type=str, default=default_account)
parser.add_argument("--tile_format", help="Target tile format to use for tiling.", type=str)
parser.add_argument("--tile_compression", help="Compression used for the target image.", type=str)
parser.add_argument("--tile_size", help="Tile size to split the image into for model processing.", type=str)
parser.add_argument("--tile_overlap", help="Tile overlap to consider when processing regions.", type=str)
parser.add_argument("--feature_selection_options", help="Feature selection options JSON string.", type=str)
parser.add_argument("--region", help="AWS region OSML is deployed to.", type=str, default=default_region)
parser.add_argument("--account", help="AWS account OSML is deployed to.", type=str, default=default_account)
parser.add_argument("--endpoint_type", help="Type of model endpoint to test, sm or http.", type=str, default="sm")
args = parser.parse_args()

# standard test images deployed by CDK
Expand All @@ -67,7 +68,7 @@
"sicd_interferometric_hh_ntf": f"s3://{image_bucket}/sicd-interferometric-hh.nitf",
}

# call into root directory of this package so that we can run this script from anywhere.
# call into the root directory of this package so that we can run this script from anywhere.
os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))

# set the python path to include the project source
Expand Down Expand Up @@ -95,6 +96,6 @@
test = "src/aws/osml/process_image/test_process_image.py"
else:
# run integration test against known results
test = f"src/aws/osml/integ/{args.model}/test_{args.model}_model.py"
test = f"src/aws/osml/integ/{args.endpoint_type}_{args.model}/test_{args.endpoint_type}_{args.model}_model.py"

subprocess.run(["python3", "-m", "pytest", "-o", "log_cli=true", "-vv", test])
1 change: 1 addition & 0 deletions logs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*
File renamed without changes.
69 changes: 69 additions & 0 deletions src/aws/osml/integ/http_centerpoint/test_http_centerpoint_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates.

import logging

from aws.osml.utils import (
OSMLConfig,
count_features,
count_region_request_items,
ddb_client,
elb_client,
kinesis_client,
run_model_on_image,
s3_client,
sqs_client,
validate_expected_region_request_items,
validate_features_match,
)

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def test_model_runner_centerpoint_http_model() -> None:
"""
Run the test using the CenterPointModel and validate the number of features
and region requests using the HTTP endpoint
:return: None
"""

if OSMLConfig.HTTP_CENTERPOINT_MODEL_URL:
http_endpoint_url = OSMLConfig.HTTP_CENTERPOINT_MODEL_URL
else:
http_endpoint_dns = get_load_balancer_dns_url(OSMLConfig.HTTP_CENTERPOINT_MODEL_ELB_NAME)
http_endpoint_url = f"http://{http_endpoint_dns}{OSMLConfig.HTTP_CENTERPOINT_MODEL_INFERENCE_PATH}"

# launch our image request and validate it completes
image_id, job_id, image_processing_request, shard_iter = run_model_on_image(
sqs_client(), http_endpoint_url, "HTTP_ENDPOINT", kinesis_client()
)

# count the created features in the table for this image
count_features(image_id=image_id, ddb_client=ddb_client())

# verify the results we created in the appropriate syncs
validate_features_match(
image_processing_request=image_processing_request,
job_id=job_id,
shard_iter=shard_iter,
s3_client=s3_client(),
kinesis_client=kinesis_client(),
)

# validate the number of region requests that were created in the process and check if they are succeeded
region_request_count = count_region_request_items(image_id=image_id, ddb_client=ddb_client())
validate_expected_region_request_items(region_request_count)


def get_load_balancer_dns_url(load_balancer_name: str) -> str:
"""
Get the DNS URL for the given load balancer
:param load_balancer_name: The name of the load balancer
:return: The DNS URL for the load balancer
"""
logger.debug("Retrieving DNS name for '{}'...".format(load_balancer_name))
res = elb_client().describe_load_balancers(Names=[load_balancer_name])
dns_name = res.get("LoadBalancers", [])[0].get("DNSName")
logger.debug("Found DNS name: {}".format(dns_name))
return dns_name
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_model_runner_aircraft_model() -> None:

# Launch our image request and validate it completes
image_id, job_id, image_processing_request, kinesis_shard = run_model_on_image(
sqs_client(), OSMLConfig.SM_AIRCRAFT_MODEL, kinesis_client()
sqs_client(), OSMLConfig.SM_AIRCRAFT_MODEL, "SM_ENDPOINT", kinesis_client()
)

# Count the features that were create in the table for this image
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_model_runner_center_point_model() -> None:

# launch our image request and validate it completes
image_id, job_id, image_processing_request, shard_iter = run_model_on_image(
sqs_client(), OSMLConfig.SM_CENTERPOINT_MODEL, kinesis_client()
sqs_client(), OSMLConfig.SM_CENTERPOINT_MODEL, "SM_ENDPOINT", kinesis_client()
)

# count the features that were create in the table for this image
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_model_runner_flood_model() -> None:

# Launch our image request and validate it completes
image_id, job_id, image_processing_request, kinesis_shard = run_model_on_image(
sqs_client(), OSMLConfig.SM_FLOOD_MODEL, kinesis_client()
sqs_client(), OSMLConfig.SM_FLOOD_MODEL, "SM_ENDPOINT", kinesis_client()
)

# Count the features that were create in the table for this image
Expand Down
2 changes: 1 addition & 1 deletion src/aws/osml/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# __init__.py file.
# flake8: noqa

from .clients import cw_client, ddb_client, kinesis_client, s3_client, sm_client, sqs_client
from .clients import cw_client, ddb_client, elb_client, kinesis_client, s3_client, sm_client, sqs_client
from .integ_utils import (
build_image_processing_request,
count_features,
Expand Down
10 changes: 10 additions & 0 deletions src/aws/osml/utils/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,13 @@ def cw_client() -> boto3.client:
"""
session = get_session_credentials()
return session.client("cloudwatch", region_name=OSMLConfig.REGION)


def elb_client() -> boto3.client:
"""
Get resources from the default ElasticLoadBalancing session
:return: boto3.client = ELB client
"""
session = get_session_credentials()
return session.client("elbv2", region_name=OSMLConfig.REGION)
20 changes: 11 additions & 9 deletions src/aws/osml/utils/integ_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@


def run_model_on_image(
sqs_client: boto3.resource, sm_endpoint: str, kinesis_client: Optional[boto3.resource]
sqs_client: boto3.resource, endpoint: str, endpoint_type: str, kinesis_client: Optional[boto3.resource]
) -> Tuple[str, str, Dict[str, Any], Optional[Dict[str, Any]]]:
"""
The workflow to build an image request for a specific SM endpoint and then place it
on the corresponding SQS queue for ModelRunner to pick up and process. Once the image
has been completed return the associated image_id and image_request object for analysis.
:param sqs_client: boto3.resource = the sqs client fixture passed in
:param sm_endpoint: str = the SM endpoint you wish to run your image against
:param endpoint: str = the endpoint you wish to run your image against
:param endpoint_type: str = "SM_ENDPOINT" or "HTTP_ENDPOINT" to trigger the mode of processing
:param kinesis_client: Optional[boto3.resource] = the optional kinesis client fixture passed in
:return: Tuple[str, str, Dict[str, Any], Dict[str, Any]] = the generated image_id, job_id, image_request,
Expand All @@ -33,7 +34,7 @@ def run_model_on_image(
image_url = OSMLConfig.TARGET_IMAGE # get image_url

# Build an image processing request from the test environment
image_processing_request = build_image_processing_request(sm_endpoint, image_url)
image_processing_request = build_image_processing_request(endpoint, endpoint_type, image_url)

# Get the current Kinesis shard iterator to listen to for results since our start time
shard_iter = get_kinesis_shard(kinesis_client)
Expand Down Expand Up @@ -360,15 +361,16 @@ def feature_collections_equal(expected: List[geojson.Feature], actual: List[geoj
return True


def build_image_processing_request(sm_endpoint: str, image_url: str) -> Dict[str, Any]:
def build_image_processing_request(endpoint: str, endpoint_type: str, image_url: str) -> Dict[str, Any]:
"""
Build an image_processing_request meant to be placed on the corresponding ModelRunner SQS queue.
The image request is configured from the hydra provisioned hydra environment. In the future
this could, and probably should, be extended to build more variant image requests for additional
testing configurations.
:param image_url: the URL to the image you want to process
:param sm_endpoint: str = the SM model endpoint that you want to build the image_request for
:param endpoint: str = the model endpoint that you want to build the image_request for
:param endpoint_type: str = "SM_ENDPOINT" or "HTTP_ENDPOINT" to trigger the mode of image processing
:return: Dict[str, Any] = the dictionary representation of the image request
"""
Expand All @@ -382,9 +384,9 @@ def build_image_processing_request(sm_endpoint: str, image_url: str) -> Dict[str
else:
result_bucket = f"{OSMLConfig.S3_RESULTS_BUCKET_PREFIX}-{OSMLConfig.ACCOUNT}"

logging.info(f"Starting ModelRunner image job in {OSMLConfig.REGION}")
logging.info(f"Image: {image_url}")
logging.info(f"Model: {sm_endpoint}")
logging.info("Starting ModelRunner image job in {}".format(OSMLConfig.REGION))
logging.info("Image: {}".format(image_url))
logging.info("Model: {}".format(endpoint))

job_id = token_hex(16)
job_name = f"test-{job_id}"
Expand All @@ -399,7 +401,7 @@ def build_image_processing_request(sm_endpoint: str, image_url: str) -> Dict[str
{"type": "S3", "bucket": result_bucket, "prefix": f"{job_name}/"},
{"type": "Kinesis", "stream": result_stream, "batchSize": 1000},
],
"imageProcessor": {"name": sm_endpoint, "type": "SM_ENDPOINT"},
"imageProcessor": {"name": endpoint, "type": endpoint_type},
"imageProcessorTileSize": OSMLConfig.TILE_SIZE,
"imageProcessorTileOverlap": OSMLConfig.TILE_OVERLAP,
"imageProcessorTileFormat": OSMLConfig.TILE_FORMAT,
Expand Down
5 changes: 5 additions & 0 deletions src/aws/osml/utils/osml_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class OSMLConfig:
SM_FLOOD_MODEL: str = os.getenv("SM_FLOOD_MODEL", "flood")
SM_AIRCRAFT_MODEL: str = os.getenv("SM_AIRCRAFT_MODEL", "aircraft")

# HTTP model config
HTTP_CENTERPOINT_MODEL_URL: str = os.getenv("HTTP_CENTER_POINT_MODEL_URL", None)
HTTP_CENTERPOINT_MODEL_ELB_NAME: str = os.getenv("HTTP_CENTER_POINT_MODEL_ELB_NAME", "test-http-model-endpoint")
HTTP_CENTERPOINT_MODEL_INFERENCE_PATH = os.getenv("HTTP_CENTERPOINT_MODEL_INFERENCE_PATH", "/invocations")

# bucket name prefixes
S3_RESULTS_BUCKET: str = os.getenv("S3_RESULTS_BUCKET")
S3_RESULTS_BUCKET_PREFIX: str = os.getenv("S3_RESULTS_BUCKET_PREFIX", "test-results")
Expand Down
Loading

0 comments on commit cb919bb

Please sign in to comment.