diff --git a/packages/middleware-sdk-s3/package.json b/packages/middleware-sdk-s3/package.json index c587778955bb..86dcc557128c 100644 --- a/packages/middleware-sdk-s3/package.json +++ b/packages/middleware-sdk-s3/package.json @@ -31,6 +31,8 @@ "@smithy/smithy-client": "^3.1.8", "@smithy/types": "^3.3.0", "@smithy/util-config-provider": "^3.0.0", + "@smithy/util-stream": "^3.0.6", + "@smithy/util-utf8": "^3.0.0", "tslib": "^2.6.2" }, "devDependencies": { diff --git a/packages/middleware-sdk-s3/src/throw-200-exceptions.spec.ts b/packages/middleware-sdk-s3/src/throw-200-exceptions.spec.ts index e9cdbb56838d..031631803c4c 100644 --- a/packages/middleware-sdk-s3/src/throw-200-exceptions.spec.ts +++ b/packages/middleware-sdk-s3/src/throw-200-exceptions.spec.ts @@ -1,15 +1,14 @@ import { HttpRequest, HttpResponse } from "@smithy/protocol-http"; +import { toUtf8 } from "@smithy/util-utf8"; +import { Readable } from "stream"; import { throw200ExceptionsMiddleware } from "./throw-200-exceptions"; describe("throw200ExceptionsMiddlewareOptions", () => { const mockNextHandler = jest.fn(); - const mockStreamCollector = jest.fn(); - const mockUtf8Encoder = jest.fn(); const mockResponse = jest.fn(); const mockConfig = { - streamCollector: mockStreamCollector, - utf8Encoder: mockUtf8Encoder, + utf8Encoder: toUtf8, }; beforeEach(() => { @@ -17,9 +16,6 @@ describe("throw200ExceptionsMiddlewareOptions", () => { }); describe("tests for statusCode < 200 and >= 300", () => { - mockStreamCollector.mockResolvedValue(Buffer.from("")); - mockUtf8Encoder.mockReturnValue(""); - it.each([199, 300])("results for statusCode %i", async (statusCode) => { mockNextHandler.mockReturnValue({ response: mockResponse, @@ -39,13 +35,11 @@ describe("throw200ExceptionsMiddlewareOptions", () => { it("should throw if response body is empty", async () => { expect.assertions(3); - mockStreamCollector.mockResolvedValue(Buffer.from("")); - mockUtf8Encoder.mockReturnValue(""); mockNextHandler.mockReturnValue({ response: new HttpResponse({ statusCode: 200, headers: {}, - body: "", + body: Readable.from(Buffer.from("")), }), }); const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, { @@ -73,13 +67,11 @@ describe("throw200ExceptionsMiddlewareOptions", () => { 656c76696e6727732072657175657374 Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg== `; - mockStreamCollector.mockResolvedValue(Buffer.from(errorBody)); - mockUtf8Encoder.mockReturnValue(errorBody); mockNextHandler.mockReturnValue({ response: new HttpResponse({ statusCode: 200, headers: {}, - body: "", + body: Readable.from(Buffer.from(errorBody)), }), }); const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any); @@ -106,13 +98,40 @@ describe("throw200ExceptionsMiddlewareOptions", () => { Access Denied `; - mockStreamCollector.mockResolvedValue(Buffer.from(errorBody)); - mockUtf8Encoder.mockReturnValue(errorBody); mockNextHandler.mockReturnValue({ response: new HttpResponse({ statusCode: 200, headers: {}, - body: "", + body: Readable.from(Buffer.from(errorBody)), + }), + }); + const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any); + const { response } = await handler({ + input: {}, + request: new HttpRequest({ + hostname: "s3.us-east-1.amazonaws.com", + }), + }); + expect(HttpResponse.isInstance(response)).toBe(true); + // @ts-ignore + expect(response.statusCode).toEqual(200); + }); + + /** + * This is an exception to the specification. We cannot afford to read + * a streaming body for its entire duration just to check for an extremely unlikely + * terminating XML tag if the stream is very long. + */ + it("should not throw if the Error tag is on an excessively long body", async () => { + const errorBody = ` + + ${"a".repeat(3000)} +`; + mockNextHandler.mockReturnValue({ + response: new HttpResponse({ + statusCode: 200, + headers: {}, + body: Readable.from(Buffer.from(errorBody)), }), }); const handler = throw200ExceptionsMiddleware(mockConfig)(mockNextHandler, {} as any); diff --git a/packages/middleware-sdk-s3/src/throw-200-exceptions.ts b/packages/middleware-sdk-s3/src/throw-200-exceptions.ts index c7c5176bf095..451efa52833c 100644 --- a/packages/middleware-sdk-s3/src/throw-200-exceptions.ts +++ b/packages/middleware-sdk-s3/src/throw-200-exceptions.ts @@ -7,9 +7,9 @@ import { RelativeMiddlewareOptions, StreamCollector, } from "@smithy/types"; +import { headStream, splitStream } from "@smithy/util-stream"; type PreviouslyResolved = { - streamCollector: StreamCollector; utf8Encoder: Encoder; }; @@ -22,6 +22,13 @@ const THROW_IF_EMPTY_BODY: Record = { CompleteMultipartUploadCommand: true, }; +/** + * @internal + * We will check at most this many bytes from the stream when looking for + * an error-like 200 status. + */ +const MAX_BYTES_TO_INSPECT = 3000; + /** * In case of an internal error/terminated connection, S3 operations may return 200 errors. CopyObject, UploadPartCopy, * CompleteMultipartUpload may return empty payload or payload with only xml Preamble. @@ -36,12 +43,25 @@ export const throw200ExceptionsMiddleware = if (!HttpResponse.isInstance(response)) { return result; } - const { statusCode, body } = response; + const { statusCode, body: sourceBody } = response; if (statusCode < 200 || statusCode >= 300) { return result; } - const bodyBytes: Uint8Array = await collectBody(body, config); + let bodyCopy = sourceBody; + let body = sourceBody; + + if (sourceBody && typeof sourceBody === "object" && !(sourceBody instanceof Uint8Array)) { + [bodyCopy, body] = await splitStream(sourceBody); + } + // restore split body to the response for deserialization. + response.body = body; + + const bodyBytes: Uint8Array = await collectBody(bodyCopy, { + streamCollector: async (stream: any) => { + return headStream(stream, MAX_BYTES_TO_INSPECT); + }, + }); const bodyStringTail = config.utf8Encoder(bodyBytes.subarray(bodyBytes.length - 16)); // Throw on 200 response with empty body, legacy behavior allowlist. @@ -56,14 +76,16 @@ export const throw200ExceptionsMiddleware = response.statusCode = 400; } - // Body stream is consumed and paused at this point. So replace the response.body to the collected bytes. - // So that the deserializer can consume the body as normal. - response.body = bodyBytes; return result; }; -// Collect low-level response body stream to Uint8Array. -const collectBody = (streamBody: any = new Uint8Array(), context: PreviouslyResolved): Promise => { +/** + * @internal + */ +const collectBody = ( + streamBody: any = new Uint8Array(), + context: { streamCollector: StreamCollector } +): Promise => { if (streamBody instanceof Uint8Array) { return Promise.resolve(streamBody); }