From 08f821365c34010d1bed77b7c7e454e2f71c65fb Mon Sep 17 00:00:00 2001 From: AllanFly120 Date: Fri, 14 Feb 2020 15:49:21 -0800 Subject: [PATCH] feat: add event stream serde depedencies (#824) feat: revive event stream dependencies feat: allow event signer provider in the config feat: rename util-eventstream-node to eventstream-serde-node feat: separate eventstream serde from util package feat: exprimental event stream signing dependencies feat: add eventstream-serde-browser chore: add doc for not supporting eventstream request on browser chore: add ignore files to eventstream dependencies chore: remove middleware-eventstream-signing from PR chore: remove eventstream-signer-node from PR chore(eventstream-serde-node): add unit tests feat: throw error/exception event and terminate event stream chore: fix docs fix: add eventstream-serde-config-resolver --- .../src/EventStreamMarshaller.ts | 11 +- .../src/HeaderMarshaller.spec.ts | 4 +- .../src/HeaderMarshaller.ts | 8 +- packages/eventstream-marshaller/src/Int64.ts | 3 + packages/eventstream-serde-browser/.gitignore | 8 + packages/eventstream-serde-browser/.npmignore | 13 ++ packages/eventstream-serde-browser/LICENSE | 201 ++++++++++++++++++ packages/eventstream-serde-browser/README.md | 4 + .../eventstream-serde-browser/package.json | 30 +++ .../src/EventStreamMarshaller.ts | 61 ++++++ .../src/getChunkedStream.ts | 108 ++++++++++ .../src/getDeserializingStream.ts | 30 +++ .../src/getEventMessageStream.ts | 54 +++++ .../eventstream-serde-browser/src/index.ts | 1 + .../eventstream-serde-browser/src/provider.ts | 15 ++ .../eventstream-serde-browser/src/utils.ts | 19 ++ .../eventstream-serde-browser/tsconfig.json | 17 ++ .../tsconfig.test.json | 11 + .../.gitignore | 8 + .../.npmignore | 13 ++ .../eventstream-serde-config-resolver/LICENSE | 201 ++++++++++++++++++ .../README.md | 4 + .../package.json | 28 +++ .../src/index.ts | 26 +++ .../tsconfig.json | 16 ++ .../tsconfig.test.json | 11 + packages/eventstream-serde-node/.gitignore | 8 + packages/eventstream-serde-node/.npmignore | 13 ++ packages/eventstream-serde-node/LICENSE | 201 ++++++++++++++++++ packages/eventstream-serde-node/README.md | 4 + .../eventstream-serde-node/jest.config.js | 5 + packages/eventstream-serde-node/package.json | 30 +++ .../src/EventDeserializerStream.spec.ts | 17 ++ .../src/EventDeserializerStream.ts | 34 +++ .../src/EventMessageChunkerStream.spec.ts | 146 +++++++++++++ .../src/EventMessageChunkerStream.ts | 119 +++++++++++ .../src/EventStreamMarshaller.ts | 87 ++++++++ .../src/MessageUnmarshallerStream.spec.ts | 73 +++++++ .../src/MessageUnmarshallerStream.ts | 57 +++++ .../MockEventMessageSource.fixture.ts | 45 ++++ .../src/fixtures/event.fixture.ts | 14 ++ packages/eventstream-serde-node/src/index.ts | 1 + .../eventstream-serde-node/src/provider.ts | 15 ++ packages/eventstream-serde-node/src/utils.ts | 58 +++++ packages/eventstream-serde-node/tsconfig.json | 16 ++ .../eventstream-serde-node/tsconfig.test.json | 11 + packages/types/src/eventStream.ts | 106 +++++++++ packages/types/src/index.ts | 1 + 48 files changed, 1960 insertions(+), 6 deletions(-) create mode 100644 packages/eventstream-serde-browser/.gitignore create mode 100644 packages/eventstream-serde-browser/.npmignore create mode 100644 packages/eventstream-serde-browser/LICENSE create mode 100644 packages/eventstream-serde-browser/README.md create mode 100644 packages/eventstream-serde-browser/package.json create mode 100644 packages/eventstream-serde-browser/src/EventStreamMarshaller.ts create mode 100644 packages/eventstream-serde-browser/src/getChunkedStream.ts create mode 100644 packages/eventstream-serde-browser/src/getDeserializingStream.ts create mode 100644 packages/eventstream-serde-browser/src/getEventMessageStream.ts create mode 100644 packages/eventstream-serde-browser/src/index.ts create mode 100644 packages/eventstream-serde-browser/src/provider.ts create mode 100644 packages/eventstream-serde-browser/src/utils.ts create mode 100644 packages/eventstream-serde-browser/tsconfig.json create mode 100644 packages/eventstream-serde-browser/tsconfig.test.json create mode 100644 packages/eventstream-serde-config-resolver/.gitignore create mode 100644 packages/eventstream-serde-config-resolver/.npmignore create mode 100644 packages/eventstream-serde-config-resolver/LICENSE create mode 100644 packages/eventstream-serde-config-resolver/README.md create mode 100644 packages/eventstream-serde-config-resolver/package.json create mode 100644 packages/eventstream-serde-config-resolver/src/index.ts create mode 100644 packages/eventstream-serde-config-resolver/tsconfig.json create mode 100644 packages/eventstream-serde-config-resolver/tsconfig.test.json create mode 100644 packages/eventstream-serde-node/.gitignore create mode 100644 packages/eventstream-serde-node/.npmignore create mode 100644 packages/eventstream-serde-node/LICENSE create mode 100644 packages/eventstream-serde-node/README.md create mode 100644 packages/eventstream-serde-node/jest.config.js create mode 100644 packages/eventstream-serde-node/package.json create mode 100644 packages/eventstream-serde-node/src/EventDeserializerStream.spec.ts create mode 100644 packages/eventstream-serde-node/src/EventDeserializerStream.ts create mode 100644 packages/eventstream-serde-node/src/EventMessageChunkerStream.spec.ts create mode 100644 packages/eventstream-serde-node/src/EventMessageChunkerStream.ts create mode 100644 packages/eventstream-serde-node/src/EventStreamMarshaller.ts create mode 100644 packages/eventstream-serde-node/src/MessageUnmarshallerStream.spec.ts create mode 100644 packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts create mode 100644 packages/eventstream-serde-node/src/fixtures/MockEventMessageSource.fixture.ts create mode 100644 packages/eventstream-serde-node/src/fixtures/event.fixture.ts create mode 100644 packages/eventstream-serde-node/src/index.ts create mode 100644 packages/eventstream-serde-node/src/provider.ts create mode 100644 packages/eventstream-serde-node/src/utils.ts create mode 100644 packages/eventstream-serde-node/tsconfig.json create mode 100644 packages/eventstream-serde-node/tsconfig.test.json create mode 100644 packages/types/src/eventStream.ts diff --git a/packages/eventstream-marshaller/src/EventStreamMarshaller.ts b/packages/eventstream-marshaller/src/EventStreamMarshaller.ts index 893367fe72b73..dc3b7a3a40f75 100644 --- a/packages/eventstream-marshaller/src/EventStreamMarshaller.ts +++ b/packages/eventstream-marshaller/src/EventStreamMarshaller.ts @@ -1,9 +1,8 @@ import { HeaderMarshaller } from "./HeaderMarshaller"; -import { Message, MessageHeaders, MessageHeaderValue } from "./Message"; import { splitMessage } from "./splitMessage"; +import { Message, MessageHeaders } from "@aws-sdk/types"; import { Crc32 } from "@aws-crypto/crc32"; import { Decoder, Encoder } from "@aws-sdk/types"; -import { toHex } from "@aws-sdk/util-hex-encoding"; /** * A marshaller that can convert binary-packed event stream messages into @@ -54,4 +53,12 @@ export class EventStreamMarshaller { return { headers: this.headerMarshaller.parse(headers), body }; } + + /** + * Convert a structured JavaScript object with tagged headers into a binary + * event stream message header. + */ + formatHeaders(rawHeaders: MessageHeaders): Uint8Array { + return this.headerMarshaller.format(rawHeaders); + } } diff --git a/packages/eventstream-marshaller/src/HeaderMarshaller.spec.ts b/packages/eventstream-marshaller/src/HeaderMarshaller.spec.ts index 7940a5a4fa500..a76dfb37f884d 100644 --- a/packages/eventstream-marshaller/src/HeaderMarshaller.spec.ts +++ b/packages/eventstream-marshaller/src/HeaderMarshaller.spec.ts @@ -1,6 +1,6 @@ -import { HeaderMarshaller } from "./HeaderMarshaller"; -import { MessageHeaders } from "./Message"; +import { MessageHeaders } from "@aws-sdk/types"; import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8-universal"; +import { HeaderMarshaller } from "./HeaderMarshaller"; import { Int64 } from "./Int64"; describe("HeaderMarshaller", () => { diff --git a/packages/eventstream-marshaller/src/HeaderMarshaller.ts b/packages/eventstream-marshaller/src/HeaderMarshaller.ts index d75f31bec9ba2..0429f9a9d3a4e 100644 --- a/packages/eventstream-marshaller/src/HeaderMarshaller.ts +++ b/packages/eventstream-marshaller/src/HeaderMarshaller.ts @@ -1,5 +1,9 @@ -import { MessageHeaders, MessageHeaderValue } from "./Message"; -import { Decoder, Encoder } from "@aws-sdk/types"; +import { + Decoder, + Encoder, + MessageHeaders, + MessageHeaderValue +} from "@aws-sdk/types"; import { fromHex, toHex } from "@aws-sdk/util-hex-encoding"; import { Int64 } from "./Int64"; diff --git a/packages/eventstream-marshaller/src/Int64.ts b/packages/eventstream-marshaller/src/Int64.ts index 5a2ad7bf4e42f..02a2643d9f607 100644 --- a/packages/eventstream-marshaller/src/Int64.ts +++ b/packages/eventstream-marshaller/src/Int64.ts @@ -1,4 +1,7 @@ import { toHex } from "@aws-sdk/util-hex-encoding"; +import { Int64 as IInt64 } from "@aws-sdk/types"; + +export interface Int64 extends IInt64 {} /** * A lossless representation of a signed, 64-bit integer. Instances of this diff --git a/packages/eventstream-serde-browser/.gitignore b/packages/eventstream-serde-browser/.gitignore new file mode 100644 index 0000000000000..3d1714c9806ef --- /dev/null +++ b/packages/eventstream-serde-browser/.gitignore @@ -0,0 +1,8 @@ +/node_modules/ +/build/ +/coverage/ +/docs/ +*.tsbuildinfo +*.tgz +*.log +package-lock.json diff --git a/packages/eventstream-serde-browser/.npmignore b/packages/eventstream-serde-browser/.npmignore new file mode 100644 index 0000000000000..4b9fe3abf33a6 --- /dev/null +++ b/packages/eventstream-serde-browser/.npmignore @@ -0,0 +1,13 @@ +/src/ +/coverage/ +/docs/ +tsconfig.test.json +*.tsbuildinfo + +*.spec.js +*.spec.d.ts +*.spec.js.map + +*.fixture.js +*.fixture.d.ts +*.fixture.js.map diff --git a/packages/eventstream-serde-browser/LICENSE b/packages/eventstream-serde-browser/LICENSE new file mode 100644 index 0000000000000..e907b58668da3 --- /dev/null +++ b/packages/eventstream-serde-browser/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/eventstream-serde-browser/README.md b/packages/eventstream-serde-browser/README.md new file mode 100644 index 0000000000000..63e8456b40ac0 --- /dev/null +++ b/packages/eventstream-serde-browser/README.md @@ -0,0 +1,4 @@ +# @aws-sdk/@aws-sdk/eventstream-serde-browser + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/@aws-sdk/eventstream-serde-browser/alpha.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-browser) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/@aws-sdk/eventstream-serde-browser.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-browser) diff --git a/packages/eventstream-serde-browser/package.json b/packages/eventstream-serde-browser/package.json new file mode 100644 index 0000000000000..9219d3dad53af --- /dev/null +++ b/packages/eventstream-serde-browser/package.json @@ -0,0 +1,30 @@ +{ + "name": "@aws-sdk/eventstream-serde-browser", + "version": "1.0.0-alpha.0", + "scripts": { + "prepublishOnly": "tsc", + "pretest": "tsc -p tsconfig.test.json", + "test": "jest" + }, + "main": "./build/index.js", + "module": "./build/index.js", + "types": "./build/index.d.ts", + "author": { + "name": "AWS SDK for JavaScript Team", + "url": "https://aws.amazon.com/javascript/" + }, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/eventstream-marshaller": "^1.0.0-alpha.2", + "@aws-sdk/types": "^1.0.0-alpha.2", + "tslib": "^1.8.0" + }, + "devDependencies": { + "@types/jest": "^24.0.12", + "typescript": "~3.4.0", + "jest": "^24.7.1" + }, + "engines": { + "node": ">= 10.0.0" + } +} diff --git a/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts b/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts new file mode 100644 index 0000000000000..6d22fe27731dd --- /dev/null +++ b/packages/eventstream-serde-browser/src/EventStreamMarshaller.ts @@ -0,0 +1,61 @@ +import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; +import { + Encoder, + Decoder, + Message, + EventStreamMarshaller as IEventStreamMarshaller +} from "@aws-sdk/types"; +import { ReadableStreamtoIterable } from "./utils"; +import { getChunkedStream } from "./getChunkedStream"; +import { getEventMessageStream } from "./getEventMessageStream"; +import { getDeserializingStream } from "./getDeserializingStream"; + +export interface EventStreamMarshaller extends IEventStreamMarshaller {} + +export interface EventStreamMarshallerOptions { + utf8Encoder: Encoder; + utf8Decoder: Decoder; +} + +export class EventStreamMarshaller { + private readonly eventMarshaller: EventMarshaller; + constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) { + this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder); + } + + deserialize( + body: ReadableStream, + deserializer: (input: { [event: string]: Message }) => T + ): AsyncIterable { + const chunkedStream = getChunkedStream(body); + const messageStream = getEventMessageStream( + chunkedStream, + this.eventMarshaller + ); + const deserialingStream = getDeserializingStream( + messageStream, + deserializer + ); + return ReadableStreamtoIterable(deserialingStream); + } + + /** + * Generate a ReadableStream that serialize events + * to event stream binary chunks; Use a pull stream + * here to support low connection speed. + * + * This doesn't work on browser currently because + * browser doesn't support upload streaming. + * reference: + * * https://bugs.chromium.org/p/chromium/issues/detail?id=688906 + * * https://bugzilla.mozilla.org/show_bug.cgi?id=1387483 + * + */ + serialize( + input: AsyncIterable, + serializer: (event: T) => Message + ): ReadableStream { + throw new Error(`event stream request in browser is not supported +Reference: https://bugs.chromium.org/p/chromium/issues/detail?id=688906`); + } +} diff --git a/packages/eventstream-serde-browser/src/getChunkedStream.ts b/packages/eventstream-serde-browser/src/getChunkedStream.ts new file mode 100644 index 0000000000000..68a2d73dd29bd --- /dev/null +++ b/packages/eventstream-serde-browser/src/getChunkedStream.ts @@ -0,0 +1,108 @@ +export function getChunkedStream( + source: ReadableStream +): ReadableStream { + const sourceReader = source.getReader(); + let currentMessageTotalLength = 0; + let currentMessagePendingLength = 0; + let currentMessage: Uint8Array | null = null; + let messageLengthBuffer: Uint8Array | null = null; + const allocateMessage = function(size: number) { + if (typeof size !== "number") { + throw new Error( + "Attempted to allocate an event message where size was not a number: " + + size + ); + } + currentMessageTotalLength = size; + currentMessagePendingLength = 4; + currentMessage = new Uint8Array(size); + const currentMessageView = new DataView(currentMessage.buffer); + currentMessageView.setUint32(0, size, false); //set big-endian Uint32 to 0~3 bytes + }; + + const chunkedStream = new ReadableStream({ + start(controller) { + function push() { + return sourceReader.read().then(({ done, value }) => { + if (done) { + if (currentMessageTotalLength) { + if (currentMessageTotalLength === currentMessagePendingLength) { + controller.enqueue(currentMessage); + } else { + throw new Error("Truncated event message received."); + } + } + controller.close(); + return; + } + + const chunkLength = value.length; + let currentOffset = 0; + + while (currentOffset < chunkLength) { + // create new message if necessary + if (!currentMessage) { + // working on a new message, determine total length + const bytesRemaining = chunkLength - currentOffset; + // prevent edge case where total length spans 2 chunks + if (!messageLengthBuffer) { + messageLengthBuffer = new Uint8Array(4); + } + const numBytesForTotal = Math.min( + 4 - currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer + bytesRemaining // bytes left in chunk + ); + + messageLengthBuffer.set( + value.slice(currentOffset, currentOffset + numBytesForTotal), + currentMessagePendingLength + ); + + currentMessagePendingLength += numBytesForTotal; + currentOffset += numBytesForTotal; + + if (currentMessagePendingLength < 4) { + // not enough information to create the current message + break; + } + allocateMessage( + new DataView(messageLengthBuffer.buffer).getUint32(0, false) + ); + messageLengthBuffer = null; + } + + // write data into current message + const numBytesToWrite = Math.min( + currentMessageTotalLength - currentMessagePendingLength, // number of bytes left to complete message + chunkLength - currentOffset // number of bytes left in the original chunk + ); + currentMessage!.set( + value.slice(currentOffset, currentOffset + numBytesToWrite), + currentMessagePendingLength + ); + currentMessagePendingLength += numBytesToWrite; + currentOffset += numBytesToWrite; + + // check if a message is ready to be pushed + if ( + currentMessageTotalLength && + currentMessageTotalLength === currentMessagePendingLength + ) { + // push out the message + controller.enqueue(currentMessage); + // cleanup + currentMessage = null; + currentMessageTotalLength = 0; + currentMessagePendingLength = 0; + } + } + push(); + }); + } + + push(); + } + }); + + return chunkedStream; +} diff --git a/packages/eventstream-serde-browser/src/getDeserializingStream.ts b/packages/eventstream-serde-browser/src/getDeserializingStream.ts new file mode 100644 index 0000000000000..78c050f25eaf4 --- /dev/null +++ b/packages/eventstream-serde-browser/src/getDeserializingStream.ts @@ -0,0 +1,30 @@ +import { Message } from "@aws-sdk/types"; + +export function getDeserializingStream( + messageStream: ReadableStream<{ [name: string]: Message }>, + deserializer: (input: any) => any +): ReadableStream<{ [name: string]: any }> { + const messageReader = messageStream.getReader(); + const deserializedStream = new ReadableStream<{ [name: string]: any }>({ + start(controller) { + function push() { + messageReader.read().then(async ({ done, value }) => { + if (done) { + controller.close(); + return; + } + + try { + controller.enqueue(await deserializer(value)); + push(); + } catch (e) { + controller.error(e); + } + }); + } + + push(); + } + }); + return deserializedStream; +} diff --git a/packages/eventstream-serde-browser/src/getEventMessageStream.ts b/packages/eventstream-serde-browser/src/getEventMessageStream.ts new file mode 100644 index 0000000000000..799158a0dc172 --- /dev/null +++ b/packages/eventstream-serde-browser/src/getEventMessageStream.ts @@ -0,0 +1,54 @@ +import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; +import { Message } from "@aws-sdk/types"; + +export function getEventMessageStream( + chunkedStream: ReadableStream, + eventMarshaller: EventMarshaller +): ReadableStream<{ [name: string]: Message }> { + const chunkReader = chunkedStream.getReader(); + const messageStream = new ReadableStream<{ [name: string]: Message }>({ + start(controller) { + function push() { + chunkReader.read().then(({ done, value }) => { + if (done) { + controller.close(); + return; + } + + const message = eventMarshaller.unmarshall(value); + const { value: messageType } = message.headers[":message-type"]; + if (messageType === "error") { + // Unmodeled exception in event + const unmodeledError = new Error( + (message.headers[":error-message"].value as string) || + "UnknownError" + ); + unmodeledError.name = message.headers[":error-code"] + .value as string; + controller.error(unmodeledError); + } else if (messageType === "exception") { + // throw this.exceptionsDeserializer(message); + controller.enqueue({ + [message.headers[":exception-type"].value as string]: message + }); + } else if (messageType === "event") { + controller.enqueue({ + [message.headers[":event-type"].value as string]: message + }); + } else { + controller.error( + new Error( + `Unrecognizable event type: ${message.headers[":event-type"].value}` + ) + ); + } + push(); + }); + } + + push(); + } + }); + + return messageStream; +} diff --git a/packages/eventstream-serde-browser/src/index.ts b/packages/eventstream-serde-browser/src/index.ts new file mode 100644 index 0000000000000..1a8f850f5f8ac --- /dev/null +++ b/packages/eventstream-serde-browser/src/index.ts @@ -0,0 +1 @@ +export * from "./provider"; diff --git a/packages/eventstream-serde-browser/src/provider.ts b/packages/eventstream-serde-browser/src/provider.ts new file mode 100644 index 0000000000000..8b0edf9220a4b --- /dev/null +++ b/packages/eventstream-serde-browser/src/provider.ts @@ -0,0 +1,15 @@ +import { + Encoder, + Decoder, + EventSigner, + EventStreamSerdeProvider, + Provider +} from "@aws-sdk/types"; +import { EventStreamMarshaller } from "./EventStreamMarshaller"; + +/** browser event stream serde utils provider */ +export const eventStreamSerdeProvider: EventStreamSerdeProvider = (options: { + utf8Encoder: Encoder; + utf8Decoder: Decoder; + eventSigner: EventSigner | Provider; +}) => new EventStreamMarshaller(options); diff --git a/packages/eventstream-serde-browser/src/utils.ts b/packages/eventstream-serde-browser/src/utils.ts new file mode 100644 index 0000000000000..d261672d7c728 --- /dev/null +++ b/packages/eventstream-serde-browser/src/utils.ts @@ -0,0 +1,19 @@ +/** + * Convert ReadableStream into an async iterable. + */ +export async function* ReadableStreamtoIterable( + readableStream: ReadableStream +): AsyncIterable { + const reader = readableStream.getReader(); + let done = false; + while (!done) { + const { done: end, value } = await reader.read(); + if (end) { + done = true; + break; + } + if (value) { + yield value; + } + } +} diff --git a/packages/eventstream-serde-browser/tsconfig.json b/packages/eventstream-serde-browser/tsconfig.json new file mode 100644 index 0000000000000..052ca02b9a441 --- /dev/null +++ b/packages/eventstream-serde-browser/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES5", + "module": "ESNext", + "moduleResolution": "node", + "declaration": true, + "strict": true, + "sourceMap": true, + "downlevelIteration": true, + "importHelpers": true, + "noEmitHelpers": true, + "lib": ["es2018.asynciterable", "DOM"], + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/eventstream-serde-browser/tsconfig.test.json b/packages/eventstream-serde-browser/tsconfig.test.json new file mode 100644 index 0000000000000..17d0f1b7321fb --- /dev/null +++ b/packages/eventstream-serde-browser/tsconfig.test.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "sourceMap": false, + "inlineSourceMap": true, + "inlineSources": true, + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/eventstream-serde-config-resolver/.gitignore b/packages/eventstream-serde-config-resolver/.gitignore new file mode 100644 index 0000000000000..3d1714c9806ef --- /dev/null +++ b/packages/eventstream-serde-config-resolver/.gitignore @@ -0,0 +1,8 @@ +/node_modules/ +/build/ +/coverage/ +/docs/ +*.tsbuildinfo +*.tgz +*.log +package-lock.json diff --git a/packages/eventstream-serde-config-resolver/.npmignore b/packages/eventstream-serde-config-resolver/.npmignore new file mode 100644 index 0000000000000..4b9fe3abf33a6 --- /dev/null +++ b/packages/eventstream-serde-config-resolver/.npmignore @@ -0,0 +1,13 @@ +/src/ +/coverage/ +/docs/ +tsconfig.test.json +*.tsbuildinfo + +*.spec.js +*.spec.d.ts +*.spec.js.map + +*.fixture.js +*.fixture.d.ts +*.fixture.js.map diff --git a/packages/eventstream-serde-config-resolver/LICENSE b/packages/eventstream-serde-config-resolver/LICENSE new file mode 100644 index 0000000000000..e907b58668da3 --- /dev/null +++ b/packages/eventstream-serde-config-resolver/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/eventstream-serde-config-resolver/README.md b/packages/eventstream-serde-config-resolver/README.md new file mode 100644 index 0000000000000..58977f6cbcd4b --- /dev/null +++ b/packages/eventstream-serde-config-resolver/README.md @@ -0,0 +1,4 @@ +# @aws-sdk/@aws-sdk/eventstream-serde-config-resolver + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/@aws-sdk/eventstream-serde-config-resolver/alpha.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-config-resolver) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/@aws-sdk/eventstream-serde-config-resolver.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-config-resolver) diff --git a/packages/eventstream-serde-config-resolver/package.json b/packages/eventstream-serde-config-resolver/package.json new file mode 100644 index 0000000000000..df08b8521d9ba --- /dev/null +++ b/packages/eventstream-serde-config-resolver/package.json @@ -0,0 +1,28 @@ +{ + "name": "@aws-sdk/eventstream-serde-config-resolver", + "version": "1.0.0-alpha.0", + "scripts": { + "prepublishOnly": "tsc", + "pretest": "tsc -p tsconfig.test.json", + "test": "jest" + }, + "main": "./build/index.js", + "types": "./build/index.d.ts", + "author": { + "name": "AWS SDK for JavaScript Team", + "url": "https://aws.amazon.com/javascript/" + }, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/types": "^1.0.0-alpha.2", + "tslib": "^1.8.0" + }, + "devDependencies": { + "@types/jest": "^24.0.12", + "typescript": "~3.4.0", + "jest": "^24.7.1" + }, + "engines": { + "node": ">= 10.0.0" + } +} diff --git a/packages/eventstream-serde-config-resolver/src/index.ts b/packages/eventstream-serde-config-resolver/src/index.ts new file mode 100644 index 0000000000000..39105fe4f21d2 --- /dev/null +++ b/packages/eventstream-serde-config-resolver/src/index.ts @@ -0,0 +1,26 @@ +import { + Decoder, + Encoder, + EventStreamMarshaller, + EventStreamSerdeProvider +} from "@aws-sdk/types"; +export interface EventStreamSerdeInputConfig {} + +export interface EventStreamSerdeResolvedConfig { + eventStreamMarshaller: EventStreamMarshaller; +} + +interface PreviouslyResolved { + utf8Encoder: Encoder; + utf8Decoder: Decoder; + eventStreamSerdeProvider: EventStreamSerdeProvider; +} + +export function resolveEventStreamSerdeConfig( + input: T & PreviouslyResolved & EventStreamSerdeInputConfig +): T & EventStreamSerdeResolvedConfig { + return { + ...input, + eventStreamMarshaller: input.eventStreamSerdeProvider({ ...input }) + }; +} diff --git a/packages/eventstream-serde-config-resolver/tsconfig.json b/packages/eventstream-serde-config-resolver/tsconfig.json new file mode 100644 index 0000000000000..30f910550c82a --- /dev/null +++ b/packages/eventstream-serde-config-resolver/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "es2017", + "module": "commonjs", + "declaration": true, + "strict": true, + "sourceMap": true, + "downlevelIteration": true, + "importHelpers": true, + "noEmitHelpers": true, + "lib": ["es2018.asynciterable"], + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/eventstream-serde-config-resolver/tsconfig.test.json b/packages/eventstream-serde-config-resolver/tsconfig.test.json new file mode 100644 index 0000000000000..17d0f1b7321fb --- /dev/null +++ b/packages/eventstream-serde-config-resolver/tsconfig.test.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "sourceMap": false, + "inlineSourceMap": true, + "inlineSources": true, + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/eventstream-serde-node/.gitignore b/packages/eventstream-serde-node/.gitignore new file mode 100644 index 0000000000000..3d1714c9806ef --- /dev/null +++ b/packages/eventstream-serde-node/.gitignore @@ -0,0 +1,8 @@ +/node_modules/ +/build/ +/coverage/ +/docs/ +*.tsbuildinfo +*.tgz +*.log +package-lock.json diff --git a/packages/eventstream-serde-node/.npmignore b/packages/eventstream-serde-node/.npmignore new file mode 100644 index 0000000000000..4b9fe3abf33a6 --- /dev/null +++ b/packages/eventstream-serde-node/.npmignore @@ -0,0 +1,13 @@ +/src/ +/coverage/ +/docs/ +tsconfig.test.json +*.tsbuildinfo + +*.spec.js +*.spec.d.ts +*.spec.js.map + +*.fixture.js +*.fixture.d.ts +*.fixture.js.map diff --git a/packages/eventstream-serde-node/LICENSE b/packages/eventstream-serde-node/LICENSE new file mode 100644 index 0000000000000..e907b58668da3 --- /dev/null +++ b/packages/eventstream-serde-node/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/packages/eventstream-serde-node/README.md b/packages/eventstream-serde-node/README.md new file mode 100644 index 0000000000000..4b4ae42acde5b --- /dev/null +++ b/packages/eventstream-serde-node/README.md @@ -0,0 +1,4 @@ +# @aws-sdk/@aws-sdk/eventstream-serde-node + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/@aws-sdk/eventstream-serde-node/preview.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-node) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/@aws-sdk/eventstream-serde-node.svg)](https://www.npmjs.com/package/@aws-sdk/@aws-sdk/eventstream-serde-node) diff --git a/packages/eventstream-serde-node/jest.config.js b/packages/eventstream-serde-node/jest.config.js new file mode 100644 index 0000000000000..498ea8304467c --- /dev/null +++ b/packages/eventstream-serde-node/jest.config.js @@ -0,0 +1,5 @@ +const base = require("../../jest.config.base.js"); + +module.exports = { + ...base +}; diff --git a/packages/eventstream-serde-node/package.json b/packages/eventstream-serde-node/package.json new file mode 100644 index 0000000000000..69dbd6310884a --- /dev/null +++ b/packages/eventstream-serde-node/package.json @@ -0,0 +1,30 @@ +{ + "name": "@aws-sdk/eventstream-serde-node", + "version": "1.0.0-alpha.0", + "scripts": { + "prepublishOnly": "tsc", + "pretest": "tsc -p tsconfig.test.json", + "test": "jest" + }, + "main": "./build/index.js", + "types": "./build/index.d.ts", + "author": { + "name": "AWS SDK for JavaScript Team", + "url": "https://aws.amazon.com/javascript/" + }, + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/eventstream-marshaller": "^1.0.0-alpha.2", + "@aws-sdk/types": "^1.0.0-alpha.2", + "tslib": "^1.8.0" + }, + "devDependencies": { + "@aws-sdk/util-utf8-node": "^1.0.0-alpha.2", + "@types/jest": "^24.0.12", + "typescript": "~3.4.0", + "jest": "^24.7.1" + }, + "engines": { + "node": ">= 10.0.0" + } +} diff --git a/packages/eventstream-serde-node/src/EventDeserializerStream.spec.ts b/packages/eventstream-serde-node/src/EventDeserializerStream.spec.ts new file mode 100644 index 0000000000000..d653defc0f89a --- /dev/null +++ b/packages/eventstream-serde-node/src/EventDeserializerStream.spec.ts @@ -0,0 +1,17 @@ +import { EventDeserializerStream } from "./EventDeserializerStream"; + +describe("EventDeserializerStream", () => { + it("throws when deserializer throws an error", done => { + const deserStream = new EventDeserializerStream({ + deserializer: message => { + throw new Error("error event"); + } + }); + deserStream.on("error", error => { + expect(error).toBeDefined(); + expect(error.message).toEqual("error event"); + done(); + }); + deserStream.write({}); + }); +}); diff --git a/packages/eventstream-serde-node/src/EventDeserializerStream.ts b/packages/eventstream-serde-node/src/EventDeserializerStream.ts new file mode 100644 index 0000000000000..6044fc0031fef --- /dev/null +++ b/packages/eventstream-serde-node/src/EventDeserializerStream.ts @@ -0,0 +1,34 @@ +import { Transform, TransformOptions, TransformCallback } from "stream"; + +export type EventDeserializerStreamOptions = TransformOptions & { + deserializer: (input: any) => any; +}; + +export class EventDeserializerStream extends Transform { + private readonly deserializer: (input: any) => any; + constructor(options: EventDeserializerStreamOptions) { + super({ + readableObjectMode: true, + writableObjectMode: true, + ...options + }); + this.deserializer = options.deserializer; + //TODO: use 'autoDestroy' when targeting Node 11 + //reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options + this.on("error", () => { + this.destroy(); + }); + this.on("end", () => { + this.destroy(); + }); + } + + async _transform(chunk: any, encoding: string, callback: TransformCallback) { + try { + this.push(await this.deserializer(chunk)); + return callback(); + } catch (err) { + callback(err); + } + } +} diff --git a/packages/eventstream-serde-node/src/EventMessageChunkerStream.spec.ts b/packages/eventstream-serde-node/src/EventMessageChunkerStream.spec.ts new file mode 100644 index 0000000000000..448be2c549eb4 --- /dev/null +++ b/packages/eventstream-serde-node/src/EventMessageChunkerStream.spec.ts @@ -0,0 +1,146 @@ +import { EventMessageChunkerStream } from "./EventMessageChunkerStream"; +import { + recordEventMessage, + statsEventMessage, + endEventMessage +} from "./fixtures/event.fixture"; +import { MockEventMessageSource } from "./fixtures/MockEventMessageSource.fixture"; + +describe("EventMessageChunkerStream", () => { + it("splits payloads into individual messages", done => { + const messages = []; + const mockMessages = [ + recordEventMessage, + statsEventMessage, + endEventMessage + ]; + const mockStream = new MockEventMessageSource({ + messages: mockMessages, + emitSize: 100 + }); + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("data", msg => { + messages.push(msg); + }); + chunkerStream.on("end", function() { + expect(messages.length).toBe(3); + done(); + }); + }); + + it("splits payloads in correct order", done => { + const messages: Array = []; + const mockMessages = [ + recordEventMessage, + statsEventMessage, + recordEventMessage, + endEventMessage + ]; + const mockStream = new MockEventMessageSource({ + messages: mockMessages, + emitSize: 100 + }); + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("data", msg => { + messages.push(msg); + }); + chunkerStream.on("end", function() { + expect(messages.length).toBe(4); + for (let i = 0; i < mockMessages.length; i++) { + expect(messages[i].toString("base64")).toEqual( + mockMessages[i].toString("base64") + ); + } + done(); + }); + }); + + it("splits payloads when received all at once", done => { + const messages = []; + const mockMessages = [ + recordEventMessage, + statsEventMessage, + endEventMessage + ]; + const mockStream = new MockEventMessageSource({ + messages: mockMessages, + emitSize: mockMessages.reduce((prev, cur) => { + return prev + cur.length; + }, 0) + }); + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("data", msg => { + messages.push(msg); + }); + chunkerStream.on("end", function() { + expect(messages.length).toBe(3); + done(); + }); + }); + + it("splits payloads when total event message length spans multiple chunks", done => { + const messages = []; + const mockMessages = [ + recordEventMessage, + statsEventMessage, + endEventMessage + ]; + const mockStream = new MockEventMessageSource({ + messages: mockMessages, + emitSize: 1 + }); + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("data", msg => { + messages.push(msg); + }); + chunkerStream.on("end", function() { + expect(messages.length).toBe(3); + done(); + }); + }); + + it("splits payloads when total event message length spans 2 chunks", done => { + const messages = []; + const mockMessages = [ + recordEventMessage, + statsEventMessage, + endEventMessage + ]; + const mockStream = new MockEventMessageSource({ + messages: mockMessages, + emitSize: recordEventMessage.length + 2 + }); + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("data", msg => { + messages.push(msg); + }); + chunkerStream.on("end", function() { + expect(messages.length).toBe(3); + done(); + }); + }); + + it("sends an error if an event message is truncated", done => { + const responseMessage = Buffer.concat([ + recordEventMessage, + statsEventMessage, + endEventMessage + ]); + const mockStream = new MockEventMessageSource({ + messages: [responseMessage.slice(0, responseMessage.length - 4)], + emitSize: 10 + }); + + const chunkerStream = new EventMessageChunkerStream(); + mockStream.pipe(chunkerStream); + chunkerStream.on("error", err => { + expect(err.message).toEqual("Truncated event message received."); + done(); + }); + }); +}); diff --git a/packages/eventstream-serde-node/src/EventMessageChunkerStream.ts b/packages/eventstream-serde-node/src/EventMessageChunkerStream.ts new file mode 100644 index 0000000000000..64d2c77829193 --- /dev/null +++ b/packages/eventstream-serde-node/src/EventMessageChunkerStream.ts @@ -0,0 +1,119 @@ +import { Transform, TransformOptions, TransformCallback } from "stream"; + +export class EventMessageChunkerStream extends Transform { + private currentMessageTotalLength: number; + private currentMessagePendingLength: number; + private currentMessage: Buffer | null; + private messageLengthBuffer: Buffer | null; + + constructor(options: TransformOptions = {}) { + super({ + readableObjectMode: true, + ...options + }); + this.currentMessageTotalLength = 0; + this.currentMessagePendingLength = 0; + this.currentMessage = null; + this.messageLengthBuffer = null; + //TODO: use 'autoDestroy' when targeting Node 11 + //reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options + this.on("error", () => { + this.destroy(); + }); + this.on("end", () => { + this.destroy(); + }); + } + + _transform(chunk: any, encoding: string, callback: TransformCallback): void { + const chunkLength = chunk.length; + let currentOffset = 0; + + while (currentOffset < chunkLength) { + // create new message if necessary + if (!this.currentMessage) { + // working on a new message, determine total length + const bytesRemaining = chunkLength - currentOffset; + // prevent edge case where total length spans 2 chunks + if (!this.messageLengthBuffer) { + this.messageLengthBuffer = Buffer.alloc(4); + } + const numBytesForTotal = Math.min( + 4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer + bytesRemaining // bytes left in chunk + ); + + chunk.copy( + this.messageLengthBuffer, + this.currentMessagePendingLength, + currentOffset, + currentOffset + numBytesForTotal + ); + + this.currentMessagePendingLength += numBytesForTotal; + currentOffset += numBytesForTotal; + + if (this.currentMessagePendingLength < 4) { + // not enough information to create the current message + break; + } + this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0)); + this.messageLengthBuffer = null; + } + + // write data into current message + const numBytesToWrite = Math.min( + this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message + chunkLength - currentOffset // number of bytes left in the original chunk + ); + chunk.copy( + this.currentMessage, // target buffer + this.currentMessagePendingLength, // target offset + currentOffset, // chunk offset + currentOffset + numBytesToWrite // chunk end to write + ); + this.currentMessagePendingLength += numBytesToWrite; + currentOffset += numBytesToWrite; + + // check if a message is ready to be pushed + if ( + this.currentMessageTotalLength && + this.currentMessageTotalLength === this.currentMessagePendingLength + ) { + // push out the message + this.push(this.currentMessage); + // cleanup + this.currentMessage = null; + this.currentMessageTotalLength = 0; + this.currentMessagePendingLength = 0; + } + } + + callback(); + } + + _flush(callback: TransformCallback): void { + if (this.currentMessageTotalLength) { + if (this.currentMessageTotalLength === this.currentMessagePendingLength) { + callback(undefined, this.currentMessage); + } else { + callback(new Error("Truncated event message received.")); + } + } else { + callback(); + } + } + + private allocateMessage(size: number) { + if (typeof size !== "number") { + throw new Error( + "Attempted to allocate an event message where size was not a number: " + + size + ); + } + this.currentMessageTotalLength = size; + this.currentMessagePendingLength = 4; + this.currentMessage = Buffer.alloc(size); + this.currentMessage!.writeUInt32BE(size, 0); + } +} diff --git a/packages/eventstream-serde-node/src/EventStreamMarshaller.ts b/packages/eventstream-serde-node/src/EventStreamMarshaller.ts new file mode 100644 index 0000000000000..747f843217fb1 --- /dev/null +++ b/packages/eventstream-serde-node/src/EventStreamMarshaller.ts @@ -0,0 +1,87 @@ +import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; +import { + Encoder, + Decoder, + Message, + EventStreamMarshaller as IEventStreamMarshaller +} from "@aws-sdk/types"; +import { Readable, pipeline } from "stream"; +import { ReadabletoIterable } from "./utils"; +import { EventMessageChunkerStream } from "./EventMessageChunkerStream"; +import { MessageUnmarshallerStream } from "./MessageUnmarshallerStream"; +import { EventDeserializerStream } from "./EventDeserializerStream"; + +export interface EventStreamMarshaller extends IEventStreamMarshaller {} + +export interface EventStreamMarshallerOptions { + utf8Encoder: Encoder; + utf8Decoder: Decoder; +} + +export class EventStreamMarshaller { + private readonly eventMarshaller: EventMarshaller; + constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) { + this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder); + } + + deserialize( + body: Readable, + deserializer: (input: { [event: string]: Message }) => T + ): AsyncIterable { + const eventDeserializerStream = new EventDeserializerStream({ + deserializer + }); + pipeline( + body, + new EventMessageChunkerStream(), //frame the body + new MessageUnmarshallerStream({ + eventMarshaller: this.eventMarshaller + }), + eventDeserializerStream, + err => { + if (err) throw err; + } + ); + //should use stream[Symbol.asyncIterable] when the api is stable + //reference: https://nodejs.org/docs/latest-v11.x/api/stream.html#stream_readable_symbol_asynciterator + return ReadabletoIterable(eventDeserializerStream); + } + + serialize( + input: AsyncIterable, + serializer: (event: T) => Message + ): Readable { + //will use Readable.from(Iterable) in Node12 + const inputIterator = input[Symbol.asyncIterator](); + const self = this; + let generatorDone = false; + const stream = new Readable({ + objectMode: true, + async read() { + try { + const result = await inputIterator.next(); + if (result.done && generatorDone) { + this.push(null); + return; + } + const payloadBuf = result.done + ? new Uint8Array(0) + : self.eventMarshaller.marshall(serializer(result.value)); + this.push(payloadBuf); + if (result.done && !generatorDone) generatorDone = true; + } catch (e) { + this.destroy(e); + } + } + }); + //TODO: use 'autoDestroy' when targeting Node 11 + //reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options + stream.on("error", () => { + stream.destroy(); + }); + stream.on("end", () => { + stream.destroy(); + }); + return stream; + } +} diff --git a/packages/eventstream-serde-node/src/MessageUnmarshallerStream.spec.ts b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.spec.ts new file mode 100644 index 0000000000000..aa1708e7e70ee --- /dev/null +++ b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.spec.ts @@ -0,0 +1,73 @@ +import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8-node"; +import { EventStreamMarshaller } from "@aws-sdk/eventstream-marshaller"; +import { MessageUnmarshallerStream } from "./MessageUnmarshallerStream"; +import { + recordEventMessage, + statsEventMessage, + endEventMessage +} from "./fixtures/event.fixture"; +import { Message } from "@aws-sdk/types"; + +describe("MessageUnmarshallerStream", () => { + it("emits parsed message on data", done => { + const expectedMessages: Array = [ + { + headers: { + ":content-type": { + type: "string", + value: "application/octet-stream" + }, + ":event-type": { type: "string", value: "Records" }, + ":message-type": { type: "string", value: "event" } + }, + body: new Uint8Array( + Buffer.from( + `1,Foo,When life gives you foo...\n2,Bar,make Bar!\n3,Fizz,Sometimes paired with...\n4,Buzz,the infamous Buzz!\n` + ) + ) + }, + { + headers: { + ":content-type": { + type: "string", + value: "text/xml" + }, + ":event-type": { type: "string", value: "Stats" }, + ":message-type": { type: "string", value: "event" } + }, + body: new Uint8Array( + Buffer.from( + '126126107' + ) + ) + }, + { + headers: { + ":event-type": { type: "string", value: "End" }, + ":message-type": { type: "string", value: "event" } + }, + body: new Uint8Array() + } + ]; + + const unmarshallerStream = new MessageUnmarshallerStream({ + eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8) + }); + + const messages: Array = []; + unmarshallerStream.on("data", msg => { + messages.push(msg[Object.keys(msg)[0]]); + }); + unmarshallerStream.on("end", () => { + for (let i = 1; i < messages.length; i++) { + expect(messages[i]).toEqual(expectedMessages[i]); + } + done(); + }); + + unmarshallerStream.write(recordEventMessage); + unmarshallerStream.write(statsEventMessage); + unmarshallerStream.write(endEventMessage); + unmarshallerStream.end(); + }); +}); diff --git a/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts new file mode 100644 index 0000000000000..727fb83d0ec2c --- /dev/null +++ b/packages/eventstream-serde-node/src/MessageUnmarshallerStream.ts @@ -0,0 +1,57 @@ +import { Transform, TransformOptions, TransformCallback } from "stream"; +import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller"; + +export type MessageUnmarshallerStreamOptions = TransformOptions & { + eventMarshaller: EventMarshaller; +}; + +export class MessageUnmarshallerStream extends Transform { + private readonly eventMarshaller: EventMarshaller; + constructor(options: MessageUnmarshallerStreamOptions) { + super({ + readableObjectMode: true, + writableObjectMode: true, + ...options + }); + this.eventMarshaller = options.eventMarshaller; + //TODO: use 'autoDestroy' when targeting Node 11 + //reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options + this.on("error", () => { + this.destroy(); + }); + this.on("end", () => { + this.destroy(); + }); + } + + _transform(chunk: any, encoding: string, callback: TransformCallback) { + try { + const message = this.eventMarshaller.unmarshall(chunk); + const { value: messageType } = message.headers[":message-type"]; + if (messageType === "error") { + // Unmodeled exception in event + const unmodeledError = new Error( + (message.headers[":error-message"].value as string) || "UnknownError" + ); + unmodeledError.name = message.headers[":error-code"].value as string; + throw unmodeledError; + } else if (messageType === "exception") { + // For modeled exception, push it to deserializer and throw after deserializing + this.push({ + [message.headers[":exception-type"].value as string]: message + }); + } else if (messageType === "event") { + this.push({ + [message.headers[":event-type"].value as string]: message + }); + } else { + throw Error( + `Unrecognizable event type: ${message.headers[":event-type"].value}` + ); + } + return callback(); + } catch (err) { + callback(err); + } + } +} diff --git a/packages/eventstream-serde-node/src/fixtures/MockEventMessageSource.fixture.ts b/packages/eventstream-serde-node/src/fixtures/MockEventMessageSource.fixture.ts new file mode 100644 index 0000000000000..bbe7103d8c351 --- /dev/null +++ b/packages/eventstream-serde-node/src/fixtures/MockEventMessageSource.fixture.ts @@ -0,0 +1,45 @@ +import { Readable, ReadableOptions } from "stream"; + +export interface MockEventMessageSourceOptions extends ReadableOptions { + messages: Array; + emitSize: number; + throwError?: Error; +} + +export class MockEventMessageSource extends Readable { + private readonly data: Buffer; + private readonly emitSize: number; + private readonly throwError?: Error; + private readCount = 0; + constructor(options: MockEventMessageSourceOptions) { + super(options); + this.data = Buffer.concat(options.messages); + this.emitSize = options.emitSize; + this.throwError = options.throwError; + } + + _read() { + const self = this; + if (this.readCount === this.data.length) { + if (this.throwError) { + process.nextTick(function() { + self.emit("error", new Error("Throwing an error!")); + }); + return; + } else { + this.push(null); + return; + } + } + + const bytesLeft = this.data.length - this.readCount; + const numBytesToSend = Math.min(bytesLeft, this.emitSize); + + const chunk = this.data.slice( + this.readCount, + this.readCount + numBytesToSend + ); + this.readCount += numBytesToSend; + this.push(chunk); + } +} diff --git a/packages/eventstream-serde-node/src/fixtures/event.fixture.ts b/packages/eventstream-serde-node/src/fixtures/event.fixture.ts new file mode 100644 index 0000000000000..e599553351b3e --- /dev/null +++ b/packages/eventstream-serde-node/src/fixtures/event.fixture.ts @@ -0,0 +1,14 @@ +export const recordEventMessage = Buffer.from( + "AAAA0AAAAFX31gVLDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAB1JlY29yZHMNOmNvbnRlbnQtdHlwZQcAGGFwcGxpY2F0aW9uL29jdGV0LXN0cmVhbTEsRm9vLFdoZW4gbGlmZSBnaXZlcyB5b3UgZm9vLi4uCjIsQmFyLG1ha2UgQmFyIQozLEZpenosU29tZXRpbWVzIHBhaXJlZCB3aXRoLi4uCjQsQnV6eix0aGUgaW5mYW1vdXMgQnV6eiEKzxKeSw==", + "base64" +); + +export const statsEventMessage = Buffer.from( + "AAAA0QAAAEM+YpmqDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcABVN0YXRzDTpjb250ZW50LXR5cGUHAAh0ZXh0L3htbDxTdGF0cyB4bWxucz0iIj48Qnl0ZXNTY2FubmVkPjEyNjwvQnl0ZXNTY2FubmVkPjxCeXRlc1Byb2Nlc3NlZD4xMjY8L0J5dGVzUHJvY2Vzc2VkPjxCeXRlc1JldHVybmVkPjEwNzwvQnl0ZXNSZXR1cm5lZD48L1N0YXRzPiJ0pLk=", + "base64" +); + +export const endEventMessage = Buffer.from( + "AAAAOAAAACjBxoTUDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAA0VuZM+X05I=", + "base64" +); diff --git a/packages/eventstream-serde-node/src/index.ts b/packages/eventstream-serde-node/src/index.ts new file mode 100644 index 0000000000000..1a8f850f5f8ac --- /dev/null +++ b/packages/eventstream-serde-node/src/index.ts @@ -0,0 +1 @@ +export * from "./provider"; diff --git a/packages/eventstream-serde-node/src/provider.ts b/packages/eventstream-serde-node/src/provider.ts new file mode 100644 index 0000000000000..f769b94bd5122 --- /dev/null +++ b/packages/eventstream-serde-node/src/provider.ts @@ -0,0 +1,15 @@ +import { + Encoder, + Decoder, + EventSigner, + EventStreamSerdeProvider, + Provider +} from "@aws-sdk/types"; +import { EventStreamMarshaller } from "./EventStreamMarshaller"; + +/** NodeJS event stream utils provider */ +export const eventStreamSerdeProvider: EventStreamSerdeProvider = (options: { + utf8Encoder: Encoder; + utf8Decoder: Decoder; + eventSigner: EventSigner | Provider; +}) => new EventStreamMarshaller(options); diff --git a/packages/eventstream-serde-node/src/utils.ts b/packages/eventstream-serde-node/src/utils.ts new file mode 100644 index 0000000000000..b57be21251262 --- /dev/null +++ b/packages/eventstream-serde-node/src/utils.ts @@ -0,0 +1,58 @@ +import { Buffer } from "buffer"; +import { Readable } from "stream"; + +export function getSignatureBinary(signature: string): Uint8Array { + const buf = Buffer.from(signature, "hex"); + return new Uint8Array( + buf.buffer, + buf.byteOffset, + buf.byteLength / Uint8Array.BYTES_PER_ELEMENT + ); +} + +/** + * Convert object stream piped in into an async iterable. This + * daptor should be deprecated when Node stream iterator is stable. + * Caveat: this adaptor won't have backpressure to inwards stream + * + * Reference: https://nodejs.org/docs/latest-v11.x/api/stream.html#stream_readable_symbol_asynciterator + */ + +export async function* ReadabletoIterable( + readStream: Readable +): AsyncIterable { + if (typeof readStream[Symbol.asyncIterator] === "function") { + // use the experimental feature if available. + throw readStream; + } + let streamEnded = false; + let generationEnded = false; + const records = new Array(); + + readStream.on("error", err => { + if (!streamEnded) { + streamEnded = true; + } + if (err) { + throw err; + } + }); + + readStream.on("data", data => { + records.push(data); + }); + + readStream.on("end", () => { + streamEnded = true; + }); + + while (!generationEnded) { + const value = await new Promise(resolve => + setTimeout(() => resolve(records.shift()), 0) + ); + if (value) { + yield value; + } + generationEnded = streamEnded && records.length === 0; + } +} diff --git a/packages/eventstream-serde-node/tsconfig.json b/packages/eventstream-serde-node/tsconfig.json new file mode 100644 index 0000000000000..30f910550c82a --- /dev/null +++ b/packages/eventstream-serde-node/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "es2017", + "module": "commonjs", + "declaration": true, + "strict": true, + "sourceMap": true, + "downlevelIteration": true, + "importHelpers": true, + "noEmitHelpers": true, + "lib": ["es2018.asynciterable"], + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/eventstream-serde-node/tsconfig.test.json b/packages/eventstream-serde-node/tsconfig.test.json new file mode 100644 index 0000000000000..17d0f1b7321fb --- /dev/null +++ b/packages/eventstream-serde-node/tsconfig.test.json @@ -0,0 +1,11 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "sourceMap": false, + "inlineSourceMap": true, + "inlineSources": true, + "rootDir": "./src", + "outDir": "./build", + "incremental": true + } +} diff --git a/packages/types/src/eventStream.ts b/packages/types/src/eventStream.ts new file mode 100644 index 0000000000000..fdc8fef51865f --- /dev/null +++ b/packages/types/src/eventStream.ts @@ -0,0 +1,106 @@ +import { HttpRequest } from "./http"; +/** + * An event stream message. The headers and body properties will always be + * defined, with empty headers represented as an object with no keys and an + * empty body represented as a zero-length Uint8Array. + */ +export interface Message { + headers: MessageHeaders; + body: Uint8Array; +} + +export interface MessageHeaders { + [name: string]: MessageHeaderValue; +} + +export interface BooleanHeaderValue { + type: "boolean"; + value: boolean; +} + +export interface ByteHeaderValue { + type: "byte"; + value: number; +} + +export interface ShortHeaderValue { + type: "short"; + value: number; +} + +export interface IntegerHeaderValue { + type: "integer"; + value: number; +} + +export interface LongHeaderValue { + type: "long"; + value: Int64; +} + +export interface BinaryHeaderValue { + type: "binary"; + value: Uint8Array; +} + +export interface StringHeaderValue { + type: "string"; + value: string; +} + +export interface TimestampHeaderValue { + type: "timestamp"; + value: Date; +} + +export interface UuidHeaderValue { + type: "uuid"; + value: string; +} + +export type MessageHeaderValue = + | BooleanHeaderValue + | ByteHeaderValue + | ShortHeaderValue + | IntegerHeaderValue + | LongHeaderValue + | BinaryHeaderValue + | StringHeaderValue + | TimestampHeaderValue + | UuidHeaderValue; + +export interface Int64 { + readonly bytes: Uint8Array; + valueOf: () => number; + toString: () => string; +} + +/** + * Util functions for serializing or deserializing event stream + */ +export interface EventStreamSerdeContext { + eventStreamMarshaller: EventStreamMarshaller; +} + +export interface EventStreamMarshaller { + deserialize: ( + body: any, + deserializer: (input: { [event: string]: Message }) => any + ) => AsyncIterable; + serialize: ( + input: AsyncIterable, + serializer: (event: any) => Message + ) => any; +} + +export interface EventStreamRequestSigner { + sign(request: HttpRequest): Promise; +} + +export interface EventStreamSerdeProvider { + (options: any): EventStreamMarshaller; +} + +export interface EventStreamSignerProvider { + (options: any): EventStreamRequestSigner; +} diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 7f7c413ed3251..f34cbbac2e5c0 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -3,6 +3,7 @@ export * from "./client"; export * from "./command"; export * from "./credentials"; export * from "./crypto"; +export * from "./eventStream"; export * from "./http"; export * from "./logger"; export * from "./serde";