From a1f771cd2de27a154d333b6480368d9d1e39ebcd Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 11:34:59 -0700 Subject: [PATCH 01/17] try to repo the leak --- tests/CMakeLists.txt | 1 + .../GetObject/get_object_checksum_retry.json | 15 +++++++ tests/mock_s3_server/mock_s3_server.py | 34 ++++++++++---- tests/s3_mock_server_tests.c | 44 +++++++++++++++++++ 4 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 tests/mock_s3_server/GetObject/get_object_checksum_retry.json diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ac06da68a..e4137e7a3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -254,6 +254,7 @@ add_net_test_case(test_s3_list_bucket_valid) if(ENABLE_MOCK_SERVER_TESTS) add_net_test_case(multipart_upload_mock_server) add_net_test_case(multipart_upload_checksum_with_retry_mock_server) + add_net_test_case(multipart_download_checksum_with_retry_mock_server) add_net_test_case(async_internal_error_from_complete_multipart_mock_server) add_net_test_case(async_access_denied_from_complete_multipart_mock_server) add_net_test_case(get_object_modified_mock_server) diff --git a/tests/mock_s3_server/GetObject/get_object_checksum_retry.json b/tests/mock_s3_server/GetObject/get_object_checksum_retry.json new file mode 100644 index 000000000..32cb497da --- /dev/null +++ b/tests/mock_s3_server/GetObject/get_object_checksum_retry.json @@ -0,0 +1,15 @@ +{ + "status": 200, + "headers": { + "ETag": "b54357faf0632cce46e942fa68356b38", + "Date": "Thu, 12 Jan 2023 00:04:21 GMT", + "Last-Modified": "Tue, 10 Jan 2023 23:39:32 GMT", + "Accept-Ranges": "bytes", + "Content-Range": "bytes 0-65535/65536", + "Content-Type": "binary/octet-stream", + "x-amz-checksum-crc32": "q1875w==" + }, + "body": [ + "less data" + ] +} diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 9e8c6ccb3..718b41c64 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -16,9 +16,10 @@ MAX_RECV = 2**16 TIMEOUT = 120 # this must be higher than any response's "delay" setting -VERBOSE = False +VERBOSE = True SHOULD_THROTTLE = True +COUNT = 0 class S3Opts(Enum): CreateMultipartUpload = 1 @@ -193,8 +194,11 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge "response with", len(body), "bytes") headers = wrapper.basic_headers() + content_length_set = False for header in data['headers'].items(): - headers.append((header[0], header[1])) + headers.append((header[0], str(header[1]))) + if header[0].lower() == "content-length": + content_length_set = True if chunked: headers.append(('Transfer-Encoding', "chunked")) @@ -202,9 +206,18 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await wrapper.send(res) await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) else: - headers.append(("Content-Length", str(len(body)))) - res = h11.Response(status_code=status_code, headers=headers) - await wrapper.send(res) + print("retry_request_received_times is " + str(COUNT)) + if COUNT <= 1 and not head_request: + headers.append(("Content-Length", str(123456))) + + elif content_length_set is False: + headers.append(("Content-Length", str(len(body)))) + print(headers) + try: + res = h11.Response(status_code=status_code, headers=headers) + await wrapper.send(res) + except Exception as e: + print(e) if head_request: await wrapper.send(h11.EndOfMessage()) return @@ -230,7 +243,7 @@ async def send_mock_s3_response(wrapper, request_type, path, generate_body=False wrapper.info("Throttling") # Flip the flag wrapper.should_throttle = not wrapper.should_throttle - await send_response_from_json(wrapper, response_file, generate_body=generate_body, generate_body_size=generate_body_size) + await send_response_from_json(wrapper, response_file, generate_body=generate_body, generate_body_size=generate_body_size, head_request=head_request) async def maybe_send_error_response(wrapper, exc): @@ -279,7 +292,12 @@ def handle_get_object_modified(start_range, end_range, request): return "/get_object_modified_failure", data_length, False -def handle_get_object(request, parsed_path): +def handle_get_object(wrapper, request, parsed_path, head_request=False): + global COUNT + if parsed_path.path == "/get_object_checksum_retry" and not head_request: + COUNT = COUNT + 1 + else: + COUNT = 0 body_range_value = get_request_header_value(request, "range") @@ -340,7 +358,7 @@ async def handle_mock_s3_request(wrapper, request): else: request_type = S3Opts.GetObject response_path, generate_body_size, generate_body = handle_get_object( - request, parsed_path) + wrapper, request, parsed_path, head_request=method == "HEAD" ) else: # TODO: support more type. wrapper.info("unsupported request:", request) diff --git a/tests/s3_mock_server_tests.c b/tests/s3_mock_server_tests.c index bda02a354..81c5b4a7a 100644 --- a/tests/s3_mock_server_tests.c +++ b/tests/s3_mock_server_tests.c @@ -194,6 +194,50 @@ TEST_CASE(multipart_upload_checksum_with_retry_mock_server) { return AWS_OP_SUCCESS; } +TEST_CASE(multipart_download_checksum_with_retry_mock_server) { + (void)ctx; + /** + * We had a memory leak after the header of the request received successfully, the request failed. + * We have allocated memory that never frees. + */ + struct aws_s3_tester tester; + ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester)); + struct aws_s3_tester_client_options client_options = { + .part_size = MB_TO_BYTES(5), + .tls_usage = AWS_S3_TLS_DISABLED, + }; + + struct aws_s3_client *client = NULL; + ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client)); + /* Mock server will response without fake checksum for the body */ + struct aws_byte_cursor object_path = aws_byte_cursor_from_c_str("/get_object_checksum_retry"); + + struct aws_s3_tester_meta_request_options get_options = { + .allocator = allocator, + .meta_request_type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT, + .client = client, + .expected_validate_checksum_alg = AWS_SCA_CRC32, + .validate_get_response_checksum = true, + .get_options = + { + .object_path = object_path, + }, + .default_type_options = + { + .mode = AWS_S3_TESTER_DEFAULT_TYPE_MODE_GET, + }, + .mock_server = true, + .validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_FAILURE, + }; + + ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &get_options, NULL)); + + aws_s3_client_release(client); + aws_s3_tester_clean_up(&tester); + + return AWS_OP_SUCCESS; +} + TEST_CASE(async_internal_error_from_complete_multipart_mock_server) { (void)ctx; From 3f3d19081f1201d22f1205146edd893f6ecf8308 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 11:45:40 -0700 Subject: [PATCH 02/17] try to fix the leak --- source/s3_meta_request.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 9b9761116..f2216c59f 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -982,12 +982,12 @@ static void s_get_response_part_finish_checksum_helper(struct aws_s3_connection request->validation_algorithm = request->request_level_running_response_sum->algorithm; aws_byte_buf_clean_up(&response_body_sum); aws_byte_buf_clean_up(&encoded_response_body_sum); - aws_checksum_destroy(request->request_level_running_response_sum); - aws_byte_buf_clean_up(&request->request_level_response_header_checksum); - request->request_level_running_response_sum = NULL; } else { request->did_validate = false; } + aws_checksum_destroy(request->request_level_running_response_sum); + aws_byte_buf_clean_up(&request->request_level_response_header_checksum); + request->request_level_running_response_sum = NULL; } static int s_s3_meta_request_incoming_headers( From 9e1178e345dc60afe86f01b19c79a05cb8d447b6 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 11:59:05 -0700 Subject: [PATCH 03/17] well, too many --- source/s3_meta_request.c | 6 +++--- tests/mock_s3_server/mock_s3_server.py | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index f2216c59f..9b9761116 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -982,12 +982,12 @@ static void s_get_response_part_finish_checksum_helper(struct aws_s3_connection request->validation_algorithm = request->request_level_running_response_sum->algorithm; aws_byte_buf_clean_up(&response_body_sum); aws_byte_buf_clean_up(&encoded_response_body_sum); + aws_checksum_destroy(request->request_level_running_response_sum); + aws_byte_buf_clean_up(&request->request_level_response_header_checksum); + request->request_level_running_response_sum = NULL; } else { request->did_validate = false; } - aws_checksum_destroy(request->request_level_running_response_sum); - aws_byte_buf_clean_up(&request->request_level_response_header_checksum); - request->request_level_running_response_sum = NULL; } static int s_s3_meta_request_incoming_headers( diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 718b41c64..95337d8f6 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -20,6 +20,7 @@ SHOULD_THROTTLE = True COUNT = 0 +RETRY_TEST = False class S3Opts(Enum): CreateMultipartUpload = 1 @@ -207,7 +208,7 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) else: print("retry_request_received_times is " + str(COUNT)) - if COUNT <= 1 and not head_request: + if COUNT <= 1 and RETRY_TEST: headers.append(("Content-Length", str(123456))) elif content_length_set is False: @@ -294,10 +295,13 @@ def handle_get_object_modified(start_range, end_range, request): def handle_get_object(wrapper, request, parsed_path, head_request=False): global COUNT + global RETRY_TEST if parsed_path.path == "/get_object_checksum_retry" and not head_request: COUNT = COUNT + 1 + RETRY_TEST = True else: COUNT = 0 + RETRY_TEST = False body_range_value = get_request_header_value(request, "range") From 6c920f1cf66fc77184e669c07a3ee5e715b6e72c Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 12:00:02 -0700 Subject: [PATCH 04/17] turn off the verbose --- tests/mock_s3_server/mock_s3_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 95337d8f6..ec33b6d7c 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -16,7 +16,7 @@ MAX_RECV = 2**16 TIMEOUT = 120 # this must be higher than any response's "delay" setting -VERBOSE = True +VERBOSE = False SHOULD_THROTTLE = True COUNT = 0 From 1ebfe51075c0041ca8b25ffa4ad861ea731571c3 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 13:15:36 -0700 Subject: [PATCH 05/17] apply the fix --- source/s3_meta_request.c | 6 +++--- tests/mock_s3_server/mock_s3_server.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/source/s3_meta_request.c b/source/s3_meta_request.c index 9b9761116..f2216c59f 100644 --- a/source/s3_meta_request.c +++ b/source/s3_meta_request.c @@ -982,12 +982,12 @@ static void s_get_response_part_finish_checksum_helper(struct aws_s3_connection request->validation_algorithm = request->request_level_running_response_sum->algorithm; aws_byte_buf_clean_up(&response_body_sum); aws_byte_buf_clean_up(&encoded_response_body_sum); - aws_checksum_destroy(request->request_level_running_response_sum); - aws_byte_buf_clean_up(&request->request_level_response_header_checksum); - request->request_level_running_response_sum = NULL; } else { request->did_validate = false; } + aws_checksum_destroy(request->request_level_running_response_sum); + aws_byte_buf_clean_up(&request->request_level_response_header_checksum); + request->request_level_running_response_sum = NULL; } static int s_s3_meta_request_incoming_headers( diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index ec33b6d7c..8761a032c 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -207,13 +207,12 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await wrapper.send(res) await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) else: - print("retry_request_received_times is " + str(COUNT)) if COUNT <= 1 and RETRY_TEST: headers.append(("Content-Length", str(123456))) elif content_length_set is False: headers.append(("Content-Length", str(len(body)))) - print(headers) + try: res = h11.Response(status_code=status_code, headers=headers) await wrapper.send(res) From f97d2a451df68a69a941099054dd6cde1136c3bf Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 13:16:25 -0700 Subject: [PATCH 06/17] run the tests in parallel --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01119bda9..2c6c9c27b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,8 +6,8 @@ on: - 'main' env: - BUILDER_VERSION: v0.9.43 - BUILDER_SOURCE: releases + BUILDER_VERSION: run-test-in-parallel + BUILDER_SOURCE: channels BUILDER_HOST: https://d19elf31gohf1l.cloudfront.net PACKAGE_NAME: aws-c-s3 LINUX_BASE_IMAGE: ubuntu-18-x64 From 7b4f4f4adda0e176b59df67f95ba2a2d577762e8 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 14:18:26 -0700 Subject: [PATCH 07/17] Or just use env var for se --- .github/workflows/ci.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c6c9c27b..116ab4b25 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,8 +6,8 @@ on: - 'main' env: - BUILDER_VERSION: run-test-in-parallel - BUILDER_SOURCE: channels + BUILDER_VERSION: v0.9.43 + BUILDER_SOURCE: releases BUILDER_HOST: https://d19elf31gohf1l.cloudfront.net PACKAGE_NAME: aws-c-s3 LINUX_BASE_IMAGE: ubuntu-18-x64 @@ -16,6 +16,7 @@ env: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} AWS_REGION: us-east-1 + CTEST_PARALLEL_LEVEL: 4 jobs: linux-compat: From 7df26bc0b23a78851dc3475690baaefde58f859e Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 31 Aug 2023 15:33:23 -0700 Subject: [PATCH 08/17] try to pass the env to docker --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 116ab4b25..59461f56a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: - name: Build ${{ env.PACKAGE_NAME }} run: | aws s3 cp s3://aws-crt-test-stuff/ci/${{ env.BUILDER_VERSION }}/linux-container-ci.sh ./linux-container-ci.sh && chmod a+x ./linux-container-ci.sh - ./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-${{ matrix.image }} build -p ${{ env.PACKAGE_NAME }} + ./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-${{ matrix.image }} build -p ${{ env.PACKAGE_NAME }} --env CTEST_PARALLEL_LEVEL linux-compiler-compat: runs-on: ubuntu-22.04 # latest From 20937fba72340c35cd67e79311a7a8baec0e5a25 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 1 Sep 2023 11:31:59 -0700 Subject: [PATCH 09/17] clean up the mock server script a bit --- .../GetObject/get_object_checksum_retry.json | 5 +- tests/mock_s3_server/mock_s3_server.py | 125 ++++++++++-------- 2 files changed, 71 insertions(+), 59 deletions(-) diff --git a/tests/mock_s3_server/GetObject/get_object_checksum_retry.json b/tests/mock_s3_server/GetObject/get_object_checksum_retry.json index 32cb497da..1910e495a 100644 --- a/tests/mock_s3_server/GetObject/get_object_checksum_retry.json +++ b/tests/mock_s3_server/GetObject/get_object_checksum_retry.json @@ -8,8 +8,5 @@ "Content-Range": "bytes 0-65535/65536", "Content-Type": "binary/octet-stream", "x-amz-checksum-crc32": "q1875w==" - }, - "body": [ - "less data" - ] + } } diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 8761a032c..c881b441a 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -3,10 +3,12 @@ # # S3 Mock server logic starts from handle_mock_s3_request +from dataclasses import dataclass import json from itertools import count from urllib.parse import parse_qs, urlparse import os +from typing import Optional from enum import Enum import trio @@ -16,11 +18,12 @@ MAX_RECV = 2**16 TIMEOUT = 120 # this must be higher than any response's "delay" setting -VERBOSE = False +VERBOSE = True SHOULD_THROTTLE = True -COUNT = 0 -RETRY_TEST = False + +base_dir = os.path.dirname(os.path.realpath(__file__)) + class S3Opts(Enum): CreateMultipartUpload = 1 @@ -31,15 +34,42 @@ class S3Opts(Enum): ListParts = 6 -base_dir = os.path.dirname(os.path.realpath(__file__)) +@dataclass +class Response: + path: str + disconnect_after_headers = False + generate_body_size: Optional[int] = None + response_json_path: str = None + + def resolve_file_path(self, wrapper, request_type): + if self.response_json_path is None: + response_file = os.path.join( + base_dir, request_type.name, f"{self.path[1:]}.json") + if os.path.exists(response_file) == False: + wrapper.info( + response_file, "not exist, using the default response") + response_file = os.path.join( + base_dir, request_type.name, f"default.json") + if "throttle" in response_file: + # We throttle the request half the time to make sure it succeeds after a retry + if TrioHTTPWrapper.should_throttle is False: + wrapper.info("Skipping throttling") + response_file = os.path.join( + base_dir, request_type.name, f"default.json") + else: + wrapper.info("Throttling") + # Flip the flag + TrioHTTPWrapper.should_throttle = not TrioHTTPWrapper.should_throttle + self.response_json_path = response_file class TrioHTTPWrapper: _next_id = count() + retry_request_received_continuous = 0 + should_throttle = SHOULD_THROTTLE def __init__(self, stream): self.stream = stream - self.should_throttle = SHOULD_THROTTLE self.conn = h11.Connection(h11.SERVER) # A unique id for this connection, to include in debugging output # (useful for understanding what's going on if there are multiple @@ -173,10 +203,10 @@ async def send_simple_response(wrapper, status_code, content_type, body): await wrapper.send(h11.EndOfMessage()) -async def send_response_from_json(wrapper, response_json_path, chunked=False, generate_body=False, generate_body_size=0, head_request=False): - wrapper.info("sending response from json file: ", response_json_path, - ".\n generate_body: ", generate_body, "generate_body_size: ", generate_body_size) - with open(response_json_path, 'r') as f: +async def send_response_from_json(wrapper, response, chunked=False, head_request=False): + wrapper.info("sending response from json file: ", response.response_json_path, + ".\n generate_body_size: ", response.generate_body_size) + with open(response.response_json_path, 'r') as f: data = json.load(f) # if response has delay, then sleep before sending it @@ -186,9 +216,9 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await trio.sleep(delay) status_code = data['status'] - if generate_body: + if response.generate_body_size is not None: # generate body with a specific size instead - body = "a" * generate_body_size + body = "a" * response.generate_body_size else: body = "\n".join(data['body']) wrapper.info("Sending", status_code, @@ -207,7 +237,9 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await wrapper.send(res) await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) else: - if COUNT <= 1 and RETRY_TEST: + if TrioHTTPWrapper.retry_request_received_continuous == 1: + # Use a long `content-length` header to trigger error when we try to send EOM. + # so that the server will close connection after we send the header. headers.append(("Content-Length", str(123456))) elif content_length_set is False: @@ -226,26 +258,6 @@ async def send_response_from_json(wrapper, response_json_path, chunked=False, ge await wrapper.send(h11.EndOfMessage()) -async def send_mock_s3_response(wrapper, request_type, path, generate_body=False, generate_body_size=0, head_request=False): - response_file = os.path.join( - base_dir, request_type.name, f"{path[1:]}.json") - if os.path.exists(response_file) == False: - wrapper.info(response_file, "not exist, using the default response") - response_file = os.path.join( - base_dir, request_type.name, f"default.json") - if "throttle" in response_file: - # We throttle the request half the time to make sure it succeeds after a retry - if wrapper.should_throttle is False: - wrapper.info("Skipping throttling") - response_file = os.path.join( - base_dir, request_type.name, f"default.json") - else: - wrapper.info("Throttling") - # Flip the flag - wrapper.should_throttle = not wrapper.should_throttle - await send_response_from_json(wrapper, response_file, generate_body=generate_body, generate_body_size=generate_body_size, head_request=head_request) - - async def maybe_send_error_response(wrapper, exc): if wrapper.conn.our_state not in {h11.IDLE, h11.SEND_RESPONSE}: wrapper.info("...but I can't, because our state is", @@ -277,7 +289,7 @@ def handle_get_object_modified(start_range, end_range, request): data_length = end_range - start_range if start_range == 0: - return "/get_object_modified_first_part", data_length, True + return Response("/get_object_modified_first_part", data_length) else: # Check the request header to make sure "If-Match" is set etag = get_request_header_value(request, "if-match") @@ -288,19 +300,19 @@ def handle_get_object_modified(start_range, end_range, request): with open(response_file, 'r') as f: data = json.load(f) if data['headers']['ETag'] == etag: - return "/get_object_modified_success", data_length, False - return "/get_object_modified_failure", data_length, False + return Response("/get_object_modified_success") + return Response("/get_object_modified_failure") def handle_get_object(wrapper, request, parsed_path, head_request=False): - global COUNT - global RETRY_TEST + wrapper.info("handle_get_object") if parsed_path.path == "/get_object_checksum_retry" and not head_request: - COUNT = COUNT + 1 - RETRY_TEST = True + TrioHTTPWrapper.retry_request_received_continuous = TrioHTTPWrapper.retry_request_received_continuous + 1 + + wrapper.info("retry_request_received_continuous is " + + str(TrioHTTPWrapper.retry_request_received_continuous)) else: - COUNT = 0 - RETRY_TEST = False + TrioHTTPWrapper.retry_request_received_continuous = 0 body_range_value = get_request_header_value(request, "range") @@ -318,29 +330,27 @@ def handle_get_object(wrapper, request, parsed_path, head_request=False): if parsed_path.path == "/get_object_modified": return handle_get_object_modified(start_range, end_range, request) elif parsed_path.path == "/get_object_invalid_response_missing_content_range": - return "/get_object_invalid_response_missing_content_range", data_length, False + return Response("/get_object_invalid_response_missing_content_range") elif parsed_path.path == "/get_object_invalid_response_missing_etags": - return "/get_object_invalid_response_missing_etags", data_length, False + return Response("/get_object_invalid_response_missing_etags") - return parsed_path.path, data_length, True + return Response(parsed_path.path, data_length) def handle_list_parts(parsed_path): if parsed_path.path == "/multiple_list_parts": if parsed_path.query.find("part-number-marker") != -1: - return "/multiple_list_parts_2" + return Response("/multiple_list_parts_2") else: - return "/multiple_list_parts_1" - return parsed_path.path + return Response("/multiple_list_parts_1") + return Response(parsed_path.path) async def handle_mock_s3_request(wrapper, request): parsed_path, parsed_query = parse_request_path( request.target.decode("ascii")) - response_path = parsed_path.path - generate_body = False - generate_body_size = 0 method = request.method.decode("utf-8") + response = None if method == "POST": if parsed_path.query == "uploads": @@ -357,11 +367,11 @@ async def handle_mock_s3_request(wrapper, request): if parsed_path.query.find("uploadId") != -1: # GET /Key+?max-parts=MaxParts&part-number-marker=PartNumberMarker&uploadId=UploadId HTTP/1.1 -- List Parts request_type = S3Opts.ListParts - response_path = handle_list_parts(parsed_path) + response = handle_list_parts(parsed_path) else: request_type = S3Opts.GetObject - response_path, generate_body_size, generate_body = handle_get_object( - wrapper, request, parsed_path, head_request=method == "HEAD" ) + response = handle_get_object( + wrapper, request, parsed_path, head_request=method == "HEAD") else: # TODO: support more type. wrapper.info("unsupported request:", request) @@ -372,9 +382,14 @@ async def handle_mock_s3_request(wrapper, request): if type(event) is h11.EndOfMessage: break assert type(event) is h11.Data + if response is None: + response = Response(parsed_path.path) + + response.resolve_file_path(wrapper, request_type) + wrapper.info("resolved path is " + response.response_json_path) - await send_mock_s3_response( - wrapper, request_type, response_path, generate_body=generate_body, generate_body_size=generate_body_size, head_request=method == "HEAD") + await send_response_from_json( + wrapper, response, head_request=method == "HEAD") async def serve(port): From 7281003ac94c1fcb9c272bca5c16aac4d9c0059c Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 1 Sep 2023 11:39:38 -0700 Subject: [PATCH 10/17] turn off the verbose --- tests/mock_s3_server/mock_s3_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index c881b441a..0dad3f5b5 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -18,7 +18,7 @@ MAX_RECV = 2**16 TIMEOUT = 120 # this must be higher than any response's "delay" setting -VERBOSE = True +VERBOSE = False SHOULD_THROTTLE = True From d651af9885035b54b3b6952012262e18158b27b2 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 1 Sep 2023 16:47:43 -0700 Subject: [PATCH 11/17] use the latest version --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59461f56a..01f32ef31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,7 +6,7 @@ on: - 'main' env: - BUILDER_VERSION: v0.9.43 + BUILDER_VERSION: v0.9.48 BUILDER_SOURCE: releases BUILDER_HOST: https://d19elf31gohf1l.cloudfront.net PACKAGE_NAME: aws-c-s3 From 521b8e6be53494818081e499bfed1ee210f82239 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 1 Sep 2023 17:15:10 -0700 Subject: [PATCH 12/17] address comments --- tests/mock_s3_server/mock_s3_server.py | 41 ++++++++++++++------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 0dad3f5b5..4bc53eb98 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -39,10 +39,13 @@ class Response: path: str disconnect_after_headers = False generate_body_size: Optional[int] = None - response_json_path: str = None + json_path: str = None + throttle: bool = False + force_retry: bool = False + should_throttle: bool = SHOULD_THROTTLE def resolve_file_path(self, wrapper, request_type): - if self.response_json_path is None: + if self.json_path is None: response_file = os.path.join( base_dir, request_type.name, f"{self.path[1:]}.json") if os.path.exists(response_file) == False: @@ -52,21 +55,20 @@ def resolve_file_path(self, wrapper, request_type): base_dir, request_type.name, f"default.json") if "throttle" in response_file: # We throttle the request half the time to make sure it succeeds after a retry - if TrioHTTPWrapper.should_throttle is False: + if Response.should_throttle is False: wrapper.info("Skipping throttling") response_file = os.path.join( base_dir, request_type.name, f"default.json") else: wrapper.info("Throttling") # Flip the flag - TrioHTTPWrapper.should_throttle = not TrioHTTPWrapper.should_throttle - self.response_json_path = response_file + Response.should_throttle = not Response.should_throttle + self.json_path = response_file class TrioHTTPWrapper: _next_id = count() retry_request_received_continuous = 0 - should_throttle = SHOULD_THROTTLE def __init__(self, stream): self.stream = stream @@ -204,9 +206,9 @@ async def send_simple_response(wrapper, status_code, content_type, body): async def send_response_from_json(wrapper, response, chunked=False, head_request=False): - wrapper.info("sending response from json file: ", response.response_json_path, + wrapper.info("sending response from json file: ", response.json_path, ".\n generate_body_size: ", response.generate_body_size) - with open(response.response_json_path, 'r') as f: + with open(response.json_path, 'r') as f: data = json.load(f) # if response has delay, then sleep before sending it @@ -237,11 +239,10 @@ async def send_response_from_json(wrapper, response, chunked=False, head_request await wrapper.send(res) await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) else: - if TrioHTTPWrapper.retry_request_received_continuous == 1: + if response.force_retry: # Use a long `content-length` header to trigger error when we try to send EOM. # so that the server will close connection after we send the header. headers.append(("Content-Length", str(123456))) - elif content_length_set is False: headers.append(("Content-Length", str(len(body)))) @@ -305,12 +306,14 @@ def handle_get_object_modified(start_range, end_range, request): def handle_get_object(wrapper, request, parsed_path, head_request=False): - wrapper.info("handle_get_object") + + response = Response(parsed_path.path) if parsed_path.path == "/get_object_checksum_retry" and not head_request: TrioHTTPWrapper.retry_request_received_continuous = TrioHTTPWrapper.retry_request_received_continuous + 1 - wrapper.info("retry_request_received_continuous is " + - str(TrioHTTPWrapper.retry_request_received_continuous)) + if TrioHTTPWrapper.retry_request_received_continuous == 1: + wrapper.info("Force retry on the request") + response.force_retry = True else: TrioHTTPWrapper.retry_request_received_continuous = 0 @@ -329,12 +332,12 @@ def handle_get_object(wrapper, request, parsed_path, head_request=False): if parsed_path.path == "/get_object_modified": return handle_get_object_modified(start_range, end_range, request) - elif parsed_path.path == "/get_object_invalid_response_missing_content_range": - return Response("/get_object_invalid_response_missing_content_range") - elif parsed_path.path == "/get_object_invalid_response_missing_etags": - return Response("/get_object_invalid_response_missing_etags") + elif parsed_path.path == "/get_object_invalid_response_missing_content_range" or parsed_path.path == "/get_object_invalid_response_missing_etags": + # Don't generate the body for those requests + return response - return Response(parsed_path.path, data_length) + response.generate_body_size = data_length + return response def handle_list_parts(parsed_path): @@ -386,7 +389,7 @@ async def handle_mock_s3_request(wrapper, request): response = Response(parsed_path.path) response.resolve_file_path(wrapper, request_type) - wrapper.info("resolved path is " + response.response_json_path) + wrapper.info("resolved path is " + response.json_path) await send_response_from_json( wrapper, response, head_request=method == "HEAD") From 56a36e2a642d03f71718b783101f31f1fcecab23 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 1 Sep 2023 17:19:00 -0700 Subject: [PATCH 13/17] remove the test stuff made before --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01f32ef31..5df82b056 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,7 +37,7 @@ jobs: - name: Build ${{ env.PACKAGE_NAME }} run: | aws s3 cp s3://aws-crt-test-stuff/ci/${{ env.BUILDER_VERSION }}/linux-container-ci.sh ./linux-container-ci.sh && chmod a+x ./linux-container-ci.sh - ./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-${{ matrix.image }} build -p ${{ env.PACKAGE_NAME }} --env CTEST_PARALLEL_LEVEL + ./linux-container-ci.sh ${{ env.BUILDER_VERSION }} aws-crt-${{ matrix.image }} build -p ${{ env.PACKAGE_NAME }} linux-compiler-compat: runs-on: ubuntu-22.04 # latest From eeb1a2243bccc837932a0f5fc903876d80729f0b Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 6 Sep 2023 10:00:46 -0700 Subject: [PATCH 14/17] update the mock server script --- tests/mock_s3_server/mock_s3_server.py | 155 ++++++++++++++----------- 1 file changed, 86 insertions(+), 69 deletions(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 4bc53eb98..b3bc1aec9 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -36,6 +36,16 @@ class S3Opts(Enum): @dataclass class Response: + status_code: int + delay: int + headers: any + data: str + chunked: bool + head_request: bool + + +@dataclass +class ResponseConfig: path: str disconnect_after_headers = False generate_body_size: Optional[int] = None @@ -44,7 +54,7 @@ class Response: force_retry: bool = False should_throttle: bool = SHOULD_THROTTLE - def resolve_file_path(self, wrapper, request_type): + def _resolve_file_path(self, wrapper, request_type): if self.json_path is None: response_file = os.path.join( base_dir, request_type.name, f"{self.path[1:]}.json") @@ -55,16 +65,54 @@ def resolve_file_path(self, wrapper, request_type): base_dir, request_type.name, f"default.json") if "throttle" in response_file: # We throttle the request half the time to make sure it succeeds after a retry - if Response.should_throttle is False: + if ResponseConfig.should_throttle is False: wrapper.info("Skipping throttling") response_file = os.path.join( base_dir, request_type.name, f"default.json") else: wrapper.info("Throttling") # Flip the flag - Response.should_throttle = not Response.should_throttle + ResponseConfig.should_throttle = not ResponseConfig.should_throttle self.json_path = response_file + def resolve_response(self, wrapper, request_type, chunked=False, head_request=False): + self._resolve_file_path(wrapper, request_type) + wrapper.info("resolving response from json file: ", self.json_path, + ".\n generate_body_size: ", self.generate_body_size) + with open(self.json_path, 'r') as f: + data = json.load(f) + + # if response has delay, then sleep before sending it + delay = data.get('delay', 0) + status_code = data['status'] + if self.generate_body_size is not None: + # generate body with a specific size instead + body = "a" * self.generate_body_size + else: + body = "\n".join(data['body']) + + headers = wrapper.basic_headers() + content_length_set = False + for header in data['headers'].items(): + headers.append((header[0], str(header[1]))) + if header[0].lower() == "content-length": + content_length_set = True + + if chunked: + headers.append(('Transfer-Encoding', "chunked")) + else: + if self.force_retry: + # Use a long `content-length` header to trigger error when we try to send EOM. + # so that the server will close connection after we send the header. + headers.append(("Content-Length", str(123456))) + elif content_length_set is False: + headers.append(("Content-Length", str(len(body)))) + + response = Response(status_code=status_code, delay=delay, headers=headers, + data=body, chunked=chunked, head_request=head_request) + + return response + class TrioHTTPWrapper: _next_id = count() @@ -92,7 +140,7 @@ async def send(self, event): async def _read_from_peer(self): if self.conn.they_are_waiting_for_100_continue: self.info("Sending 100 Continue") - go_ahead = h11.InformationalResponse( + go_ahead = h11.InformationalResponseConfig( status_code=100, headers=self.basic_headers() ) await self.send(go_ahead) @@ -205,56 +253,26 @@ async def send_simple_response(wrapper, status_code, content_type, body): await wrapper.send(h11.EndOfMessage()) -async def send_response_from_json(wrapper, response, chunked=False, head_request=False): - wrapper.info("sending response from json file: ", response.json_path, - ".\n generate_body_size: ", response.generate_body_size) - with open(response.json_path, 'r') as f: - data = json.load(f) +async def send_response(wrapper, response): + if response.delay > 0: + assert response.delay < TIMEOUT + await trio.sleep(response.delay) - # if response has delay, then sleep before sending it - delay = data.get('delay', 0) - if delay > 0: - assert delay < TIMEOUT - await trio.sleep(delay) + wrapper.info("Sending", response.status_code, + "response with", len(response.data), "bytes") - status_code = data['status'] - if response.generate_body_size is not None: - # generate body with a specific size instead - body = "a" * response.generate_body_size - else: - body = "\n".join(data['body']) - wrapper.info("Sending", status_code, - "response with", len(body), "bytes") + res = h11.Response(status_code=response.status_code, + headers=response.headers) - headers = wrapper.basic_headers() - content_length_set = False - for header in data['headers'].items(): - headers.append((header[0], str(header[1]))) - if header[0].lower() == "content-length": - content_length_set = True - - if chunked: - headers.append(('Transfer-Encoding', "chunked")) - res = h11.Response(status_code=status_code, headers=headers) + try: await wrapper.send(res) - await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(body), body.encode()))) + except Exception as e: + print(e) + if response.chunked: + await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(response.data), response.data.encode()))) else: - if response.force_retry: - # Use a long `content-length` header to trigger error when we try to send EOM. - # so that the server will close connection after we send the header. - headers.append(("Content-Length", str(123456))) - elif content_length_set is False: - headers.append(("Content-Length", str(len(body)))) - - try: - res = h11.Response(status_code=status_code, headers=headers) - await wrapper.send(res) - except Exception as e: - print(e) - if head_request: - await wrapper.send(h11.EndOfMessage()) - return - await wrapper.send(h11.Data(data=body.encode())) + if not response.head_request: + await wrapper.send(h11.Data(data=response.data.encode())) await wrapper.send(h11.EndOfMessage()) @@ -290,7 +308,7 @@ def handle_get_object_modified(start_range, end_range, request): data_length = end_range - start_range if start_range == 0: - return Response("/get_object_modified_first_part", data_length) + return ResponseConfig("/get_object_modified_first_part", data_length) else: # Check the request header to make sure "If-Match" is set etag = get_request_header_value(request, "if-match") @@ -301,19 +319,19 @@ def handle_get_object_modified(start_range, end_range, request): with open(response_file, 'r') as f: data = json.load(f) if data['headers']['ETag'] == etag: - return Response("/get_object_modified_success") - return Response("/get_object_modified_failure") + return ResponseConfig("/get_object_modified_success") + return ResponseConfig("/get_object_modified_failure") def handle_get_object(wrapper, request, parsed_path, head_request=False): - response = Response(parsed_path.path) + response_config = ResponseConfig(parsed_path.path) if parsed_path.path == "/get_object_checksum_retry" and not head_request: TrioHTTPWrapper.retry_request_received_continuous = TrioHTTPWrapper.retry_request_received_continuous + 1 if TrioHTTPWrapper.retry_request_received_continuous == 1: wrapper.info("Force retry on the request") - response.force_retry = True + response_config.force_retry = True else: TrioHTTPWrapper.retry_request_received_continuous = 0 @@ -334,26 +352,26 @@ def handle_get_object(wrapper, request, parsed_path, head_request=False): return handle_get_object_modified(start_range, end_range, request) elif parsed_path.path == "/get_object_invalid_response_missing_content_range" or parsed_path.path == "/get_object_invalid_response_missing_etags": # Don't generate the body for those requests - return response + return response_config - response.generate_body_size = data_length - return response + response_config.generate_body_size = data_length + return response_config def handle_list_parts(parsed_path): if parsed_path.path == "/multiple_list_parts": if parsed_path.query.find("part-number-marker") != -1: - return Response("/multiple_list_parts_2") + return ResponseConfig("/multiple_list_parts_2") else: - return Response("/multiple_list_parts_1") - return Response(parsed_path.path) + return ResponseConfig("/multiple_list_parts_1") + return ResponseConfig(parsed_path.path) async def handle_mock_s3_request(wrapper, request): parsed_path, parsed_query = parse_request_path( request.target.decode("ascii")) method = request.method.decode("utf-8") - response = None + response_config = None if method == "POST": if parsed_path.query == "uploads": @@ -370,10 +388,10 @@ async def handle_mock_s3_request(wrapper, request): if parsed_path.query.find("uploadId") != -1: # GET /Key+?max-parts=MaxParts&part-number-marker=PartNumberMarker&uploadId=UploadId HTTP/1.1 -- List Parts request_type = S3Opts.ListParts - response = handle_list_parts(parsed_path) + response_config = handle_list_parts(parsed_path) else: request_type = S3Opts.GetObject - response = handle_get_object( + response_config = handle_get_object( wrapper, request, parsed_path, head_request=method == "HEAD") else: # TODO: support more type. @@ -385,14 +403,13 @@ async def handle_mock_s3_request(wrapper, request): if type(event) is h11.EndOfMessage: break assert type(event) is h11.Data - if response is None: - response = Response(parsed_path.path) + if response_config is None: + response_config = ResponseConfig(parsed_path.path) - response.resolve_file_path(wrapper, request_type) - wrapper.info("resolved path is " + response.json_path) + response = response_config.resolve_response( + wrapper, request_type, head_request=method == "HEAD") - await send_response_from_json( - wrapper, response, head_request=method == "HEAD") + await send_response(wrapper, response) async def serve(port): From ca1b8d3ca20a8c9b301b190fe074c537317ee928 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 6 Sep 2023 10:22:27 -0700 Subject: [PATCH 15/17] still use global flags --- .github/workflows/ci.yml | 2 +- tests/mock_s3_server/mock_s3_server.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5df82b056..338ba3ed0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} AWS_REGION: us-east-1 - CTEST_PARALLEL_LEVEL: 4 + CTEST_PARALLEL_LEVEL: 2 jobs: linux-compat: diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index b3bc1aec9..0bb83fb1a 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -19,7 +19,10 @@ TIMEOUT = 120 # this must be higher than any response's "delay" setting VERBOSE = False + +# Flags to keep between requests SHOULD_THROTTLE = True +RETRY_REQUEST_COUNT = 0 base_dir = os.path.dirname(os.path.realpath(__file__)) @@ -52,9 +55,9 @@ class ResponseConfig: json_path: str = None throttle: bool = False force_retry: bool = False - should_throttle: bool = SHOULD_THROTTLE def _resolve_file_path(self, wrapper, request_type): + global SHOULD_THROTTLE if self.json_path is None: response_file = os.path.join( base_dir, request_type.name, f"{self.path[1:]}.json") @@ -65,14 +68,14 @@ def _resolve_file_path(self, wrapper, request_type): base_dir, request_type.name, f"default.json") if "throttle" in response_file: # We throttle the request half the time to make sure it succeeds after a retry - if ResponseConfig.should_throttle is False: + if SHOULD_THROTTLE is False: wrapper.info("Skipping throttling") response_file = os.path.join( base_dir, request_type.name, f"default.json") else: wrapper.info("Throttling") # Flip the flag - ResponseConfig.should_throttle = not ResponseConfig.should_throttle + SHOULD_THROTTLE = not SHOULD_THROTTLE self.json_path = response_file def resolve_response(self, wrapper, request_type, chunked=False, head_request=False): @@ -116,7 +119,6 @@ def resolve_response(self, wrapper, request_type, chunked=False, head_request=Fa class TrioHTTPWrapper: _next_id = count() - retry_request_received_continuous = 0 def __init__(self, stream): self.stream = stream @@ -324,16 +326,16 @@ def handle_get_object_modified(start_range, end_range, request): def handle_get_object(wrapper, request, parsed_path, head_request=False): - + global RETRY_REQUEST_COUNT response_config = ResponseConfig(parsed_path.path) if parsed_path.path == "/get_object_checksum_retry" and not head_request: - TrioHTTPWrapper.retry_request_received_continuous = TrioHTTPWrapper.retry_request_received_continuous + 1 + RETRY_REQUEST_COUNT = RETRY_REQUEST_COUNT + 1 - if TrioHTTPWrapper.retry_request_received_continuous == 1: + if RETRY_REQUEST_COUNT == 1: wrapper.info("Force retry on the request") response_config.force_retry = True else: - TrioHTTPWrapper.retry_request_received_continuous = 0 + RETRY_REQUEST_COUNT = 0 body_range_value = get_request_header_value(request, "range") From 9297fb50129ed6bd87e4558bdeac9b6a398ab217 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Wed, 6 Sep 2023 10:53:07 -0700 Subject: [PATCH 16/17] more clean up of the mock server script --- tests/mock_s3_server/mock_s3_server.py | 90 ++++++++++++-------------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 0bb83fb1a..7e3d2edd1 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -196,6 +196,38 @@ def info(self, *args): # Server main loop ################################################################ + +async def send_simple_response(wrapper, status_code, content_type, body): + wrapper.info("Sending", status_code, "response with", len(body), "bytes") + headers = wrapper.basic_headers() + headers.append(("Content-Type", content_type)) + headers.append(("Content-Length", str(len(body)))) + res = h11.Response(status_code=status_code, headers=headers) + await wrapper.send(res) + await wrapper.send(h11.Data(data=body)) + await wrapper.send(h11.EndOfMessage()) + + +async def maybe_send_error_response(wrapper, exc): + if wrapper.conn.our_state not in {h11.IDLE, h11.SEND_RESPONSE}: + wrapper.info("...but I can't, because our state is", + wrapper.conn.our_state) + return + try: + if isinstance(exc, h11.RemoteProtocolError): + status_code = exc.error_status_hint + elif isinstance(exc, trio.TooSlowError): + status_code = 408 # Request Timeout + else: + status_code = 500 + body = str(exc).encode("utf-8") + await send_simple_response( + wrapper, status_code, "text/plain; charset=utf-8", body + ) + except Exception as exc: + wrapper.info("error while sending error response:", exc) + + async def http_serve(stream): wrapper = TrioHTTPWrapper(stream) wrapper.info("Got new connection") @@ -237,24 +269,6 @@ async def http_serve(stream): ################################################################ # Helper function - -def parse_request_path(request_path): - parsed_path = urlparse(request_path) - parsed_query = parse_qs(parsed_path.query) - return parsed_path, parsed_query - - -async def send_simple_response(wrapper, status_code, content_type, body): - wrapper.info("Sending", status_code, "response with", len(body), "bytes") - headers = wrapper.basic_headers() - headers.append(("Content-Type", content_type)) - headers.append(("Content-Length", str(len(body)))) - res = h11.Response(status_code=status_code, headers=headers) - await wrapper.send(res) - await wrapper.send(h11.Data(data=body)) - await wrapper.send(h11.EndOfMessage()) - - async def send_response(wrapper, response): if response.delay > 0: assert response.delay < TIMEOUT @@ -270,35 +284,16 @@ async def send_response(wrapper, response): await wrapper.send(res) except Exception as e: print(e) - if response.chunked: - await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(response.data), response.data.encode()))) - else: - if not response.head_request: + + if not response.head_request: + if response.chunked: + await wrapper.send(h11.Data(data=b"%X\r\n%s\r\n" % (len(response.data), response.data.encode()))) + else: await wrapper.send(h11.Data(data=response.data.encode())) await wrapper.send(h11.EndOfMessage()) -async def maybe_send_error_response(wrapper, exc): - if wrapper.conn.our_state not in {h11.IDLE, h11.SEND_RESPONSE}: - wrapper.info("...but I can't, because our state is", - wrapper.conn.our_state) - return - try: - if isinstance(exc, h11.RemoteProtocolError): - status_code = exc.error_status_hint - elif isinstance(exc, trio.TooSlowError): - status_code = 408 # Request Timeout - else: - status_code = 500 - body = str(exc).encode("utf-8") - await send_simple_response( - wrapper, status_code, "text/plain; charset=utf-8", body - ) - except Exception as exc: - wrapper.info("error while sending error response:", exc) - - def get_request_header_value(request, header_name): for header in request.headers: if header[0].decode("utf-8").lower() == header_name.lower(): @@ -370,8 +365,7 @@ def handle_list_parts(parsed_path): async def handle_mock_s3_request(wrapper, request): - parsed_path, parsed_query = parse_request_path( - request.target.decode("ascii")) + parsed_path = urlparse(request.target.decode("ascii")) method = request.method.decode("utf-8") response_config = None @@ -414,6 +408,10 @@ async def handle_mock_s3_request(wrapper, request): await send_response(wrapper, response) +################################################################ +# Run the server +################################################################ + async def serve(port): print("listening on http://localhost:{}".format(port)) try: @@ -421,9 +419,5 @@ async def serve(port): except KeyboardInterrupt: print("KeyboardInterrupt - shutting down") - -################################################################ -# Run the server -################################################################ if __name__ == "__main__": trio.run(serve, 8080) From 15f8a76df0e0866128c8c878942e37ed6cd4619f Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Thu, 7 Sep 2023 14:46:45 -0700 Subject: [PATCH 17/17] Apply suggestions from code review Co-authored-by: Michael Graeb --- tests/mock_s3_server/mock_s3_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/mock_s3_server/mock_s3_server.py b/tests/mock_s3_server/mock_s3_server.py index 7e3d2edd1..73de24922 100644 --- a/tests/mock_s3_server/mock_s3_server.py +++ b/tests/mock_s3_server/mock_s3_server.py @@ -305,7 +305,7 @@ def handle_get_object_modified(start_range, end_range, request): data_length = end_range - start_range if start_range == 0: - return ResponseConfig("/get_object_modified_first_part", data_length) + return ResponseConfig("/get_object_modified_first_part", generate_body_size=data_length) else: # Check the request header to make sure "If-Match" is set etag = get_request_header_value(request, "if-match") @@ -399,6 +399,7 @@ async def handle_mock_s3_request(wrapper, request): if type(event) is h11.EndOfMessage: break assert type(event) is h11.Data + if response_config is None: response_config = ResponseConfig(parsed_path.path)