Skip to content

Commit

Permalink
fix: support big file > 5 GB for s3 sink
Browse files Browse the repository at this point in the history
  • Loading branch information
RanbirAulakh committed Aug 21, 2024
1 parent 863d9bf commit 77020b7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
52 changes: 44 additions & 8 deletions src/aws/osml/model_runner/sink/s3_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import logging
import os
import sys
import tempfile
from typing import List, Optional

import boto3
import geojson
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
from geojson import Feature, FeatureCollection

Expand Down Expand Up @@ -51,18 +54,51 @@ def write(self, image_id: str, features: List[Feature]) -> bool:

# validate if S3 bucket exists and accessible
if self.validate_s3_bucket():
# Calculate the size
geojson_str = geojson.dumps(features_collection)
geojson_size = sys.getsizeof(geojson_str)

# Set a size threshold
size_threshold = 2 * 1024**3 # 2 GB

# image_id is the concatenation of the job id and source image url in s3. We just
# want to base our key off of the original image file name so split by '/' and use
# the last element
object_key = os.path.join(self.prefix, image_id.split("/")[-1] + ".geojson")
# Add the aggregated features to a feature collection and encode the full set of features
# as a GeoJSON output.
self.s3_client.put_object(
Body=str(geojson.dumps(features_collection)),
Bucket=self.bucket,
Key=object_key,
ACL="bucket-owner-full-control",
)

# Using upload_file, we can upload 2 GB in a short period of time compared to put_object.
# With upload_file, large files are broken down into smaller parts (e.g., 128 MB) and uploaded in parallel.
# Since it’s already inside the AWS VPC, we are fully utilizing the max upload bandwidth. If one part fails,
# it can be retried, whereas put_object only uploads the file in a single request. This means if you try
# to upload 2 GB and it fails at any point, you’ll have to retry the entire 2 GB.
if geojson_size > size_threshold:
logging.info(
f"GeoJSON size ({geojson_size} B) exceeds the threshold of 5 GB. Using tmp file for upload_file."
)
# Create a temporary file to store aggregated features as a GeoJSON data
with tempfile.NamedTemporaryFile(delete=True) as temp_file:
with open(temp_file.name, "w") as f:
f.write(geojson_str)

# Use upload_file to upload the file to S3
self.s3_client.upload_file(
Filename=temp_file.name,
Bucket=self.bucket,
Key=object_key,
Config=TransferConfig(
multipart_threshold=128 * 1024**2, # 128 MB
max_concurrency=10,
multipart_chunksize=256 * 1024**2, # 256 MB
use_threads=True,
),
ExtraArgs={"ACL": "bucket-owner-full-control"},
)
else:
logging.info(f"GeoJSON size ({geojson_size} B) is within the threshold. Using put_object.")
self.s3_client.put_object(
Bucket=self.bucket, Key=object_key, Body=geojson_str, ACL="bucket-owner-full-control"
)

logger.info(f"Wrote aggregate feature collection for Image '{image_id}' to s3://{self.bucket}/{object_key}")
return True
else:
Expand Down
46 changes: 46 additions & 0 deletions test/aws/osml/model_runner/sink/test_s3_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import boto3
import geojson
from botocore.stub import ANY, Stubber
from geojson import Feature
from moto import mock_aws

TEST_PREFIX = "folder"
TEST_RESULTS_BUCKET = "test-results-bucket"
TEST_IMAGE_ID = "test-image-id"

MOCK_S3_PUT_OBJECT_RESPONSE = {
"ResponseMetadata": {
"RequestId": "5994D680BF127CE3",
Expand Down Expand Up @@ -161,6 +164,49 @@ def test_return_mode(self):
s3_sink = S3Sink(TEST_RESULTS_BUCKET, TEST_PREFIX)
assert SinkMode.AGGREGATE == s3_sink.mode

@mock_aws
@mock.patch("boto3.s3.transfer.S3Transfer.upload_file", autospec=True)
@mock.patch("sys.getsizeof", return_value=6 * 1024**3) # Mock geojson_size to be 6 GB
def test_write_triggers_multipart_upload(self, mock_getsizeof, mock_upload_file):
from aws.osml.model_runner.app_config import BotoConfig
from aws.osml.model_runner.sink.s3_sink import S3Sink

s3_client = boto3.client("s3", config=BotoConfig.default)
s3_client.create_bucket(
Bucket=TEST_RESULTS_BUCKET,
CreateBucketConfiguration={"LocationConstraint": "us-west-2"},
)
sink = S3Sink(bucket=TEST_RESULTS_BUCKET, prefix=TEST_PREFIX)

features = Feature(geometry={"type": "Point", "coordinates": [0.0, 0.0]})
result = sink.write(image_id=TEST_IMAGE_ID, features=features)
self.assertTrue(result)

mock_upload_file.assert_called_once()

@mock_aws
@mock.patch("boto3.client", autospec=True)
@mock.patch("boto3.s3.transfer.S3Transfer.upload_file", autospec=True)
def test_write_put_object(self, mock_upload_file, mock_boto_client):
from aws.osml.model_runner.sink.s3_sink import S3Sink

# Create a mock S3 client and configure put_object to be mocked
mock_s3_client = mock.Mock()
mock_boto_client.return_value = mock_s3_client
mock_s3_client.put_object = mock.Mock()

# Set up the S3Sink instance using the mocked S3 client
sink = S3Sink(bucket=TEST_RESULTS_BUCKET, prefix=TEST_PREFIX)

# Simulate writing small GeoJSON data that would trigger put_object
features = Feature(geometry={"type": "Point", "coordinates": [0.0, 0.0]})
result = sink.write(image_id=TEST_IMAGE_ID, features=features)
self.assertTrue(result)

# Verify put_object is called and upload_file is not called
mock_s3_client.put_object.assert_called_once()
mock_upload_file.assert_not_called()

@staticmethod
def build_feature_list() -> List[geojson.Feature]:
with open("./test/data/detections.geojson", "r") as geojson_file:
Expand Down

0 comments on commit 77020b7

Please sign in to comment.