From c691029797e784dc9969b9cf9331a2c2ab712db1 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Wed, 23 Mar 2022 21:23:47 +0000 Subject: [PATCH 1/5] [core-rest-pipeline] support resettable stream request body add support of the following request body types - `() => NodeJS.ReadableStream` for NodeJS - `() => ReadableStream for browser --- sdk/core/core-rest-pipeline/CHANGELOG.md | 4 +- .../core-rest-pipeline/src/fetchHttpClient.ts | 7 +- sdk/core/core-rest-pipeline/src/interfaces.ts | 6 +- .../core-rest-pipeline/src/nodeHttpClient.ts | 2 +- .../test/browser/fetchHttpClient.spec.ts | 78 +++++++++++++++++++ .../test/node/nodeHttpClient.spec.ts | 52 ++++++++++++- 6 files changed, 140 insertions(+), 9 deletions(-) diff --git a/sdk/core/core-rest-pipeline/CHANGELOG.md b/sdk/core/core-rest-pipeline/CHANGELOG.md index 089e733de72c..835efb44590d 100644 --- a/sdk/core/core-rest-pipeline/CHANGELOG.md +++ b/sdk/core/core-rest-pipeline/CHANGELOG.md @@ -1,9 +1,11 @@ # Release History -## 1.7.1 (Unreleased) +## 1.8.0 (Unreleased) ### Features Added +- Support resettable streams in the form of `() => NodeJS.ReadableStream` for NodeJS and `() => ReadableStream` for browser. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/core/core-rest-pipeline/src/fetchHttpClient.ts b/sdk/core/core-rest-pipeline/src/fetchHttpClient.ts index 936a1f2d3b0d..d540427c1b71 100644 --- a/sdk/core/core-rest-pipeline/src/fetchHttpClient.ts +++ b/sdk/core/core-rest-pipeline/src/fetchHttpClient.ts @@ -198,13 +198,12 @@ function buildPipelineHeaders(httpResponse: Response): PipelineHeaders { } function buildRequestBody(request: PipelineRequest) { - if (isNodeReadableStream(request.body)) { + const body = typeof request.body === "function" ? request.body() : request.body; + if (isNodeReadableStream(body)) { throw new Error("Node streams are not supported in browser environment."); } - return isReadableStream(request.body) - ? buildBodyStream(request.body, request.onUploadProgress) - : request.body; + return isReadableStream(body) ? buildBodyStream(body, request.onUploadProgress) : body; } /** diff --git a/sdk/core/core-rest-pipeline/src/interfaces.ts b/sdk/core/core-rest-pipeline/src/interfaces.ts index d27d1dfbf436..52cf963c895f 100644 --- a/sdk/core/core-rest-pipeline/src/interfaces.ts +++ b/sdk/core/core-rest-pipeline/src/interfaces.ts @@ -49,12 +49,14 @@ export interface HttpHeaders extends Iterable<[string, string]> { /** * Types of bodies supported on the request. - * NodeJS.ReadableStream is Node only. - * Blob and ReadableStream are browser only. + * NodeJS.ReadableStream and () =\> NodeJS.ReadableStream is Node only. + * Blob, ReadableStream, and () =\> ReadableStream are browser only. */ export type RequestBodyType = | NodeJS.ReadableStream + | (() => NodeJS.ReadableStream) | ReadableStream + | (() => ReadableStream) | Blob | ArrayBuffer | ArrayBufferView diff --git a/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts b/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts index 36cb32b860a4..c400cc790004 100644 --- a/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts +++ b/sdk/core/core-rest-pipeline/src/nodeHttpClient.ts @@ -95,8 +95,8 @@ class NodeHttpClient implements HttpClient { const acceptEncoding = request.headers.get("Accept-Encoding"); const shouldDecompress = acceptEncoding?.includes("gzip") || acceptEncoding?.includes("deflate"); - let body = request.body; + let body = typeof request.body === "function" ? request.body() : request.body; if (body && !request.headers.has("Content-Length")) { const bodyLength = getBodyLength(body); if (bodyLength !== null) { diff --git a/sdk/core/core-rest-pipeline/test/browser/fetchHttpClient.spec.ts b/sdk/core/core-rest-pipeline/test/browser/fetchHttpClient.spec.ts index 3de6e00b980a..9231de3c0759 100644 --- a/sdk/core/core-rest-pipeline/test/browser/fetchHttpClient.spec.ts +++ b/sdk/core/core-rest-pipeline/test/browser/fetchHttpClient.spec.ts @@ -321,6 +321,84 @@ describe("FetchHttpClient", function () { assert.isTrue(downloadCalled, "no download progress"); }); + it("should handle ReadableStream request body type", async () => { + const client = createFetchHttpClient(); + const requestText = "testing resettable stream"; + const url = `http://localhost:3000/formdata/stream/uploadfile`; + + let bodySent = false; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(requestText); + controller.close(); + }, + }); + fetchMock.callsFake(async (_url, options) => { + const body = options.body; + assert.isTrue( + body && + typeof (body as ReadableStream).getReader === "function" && + typeof (body as ReadableStream).tee === "function", + "expecting ReadableStream request body" + ); + const reader = (body as ReadableStream).getReader(); + const data = await reader.read(); + assert.equal(data.value, requestText, "unexpected request text"); + bodySent = true; + return new Response(undefined, { status: 200 }); + }); + const request = createPipelineRequest({ + url, + method: "PUT", + body: stream, + headers: createHttpHeaders({ "content-type": "application/octet-stream" }), + allowInsecureConnection: true, + streamResponseStatusCodes: new Set([Number.POSITIVE_INFINITY]), + }); + await client.sendRequest(request); + assert.isTrue(bodySent, "body should have been sent to request"); + }); + + it("should handle () => ReadableStream request body type", async () => { + const client = createFetchHttpClient(); + const requestText = "testing resettable stream"; + const url = `http://localhost:3000/formdata/stream/uploadfile`; + + let bodySent = false; + const factoryMethod = () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(requestText); + controller.close(); + }, + }); + }; + fetchMock.callsFake(async (_url, options) => { + const body = options.body; + assert.isTrue( + body && + typeof (body as ReadableStream).getReader === "function" && + typeof (body as ReadableStream).tee === "function", + "expecting ReadableStream request body" + ); + const reader = (body as ReadableStream).getReader(); + const data = await reader.read(); + assert.equal(data.value, requestText, "unexpected request text"); + bodySent = true; + return new Response(undefined, { status: 200 }); + }); + const request = createPipelineRequest({ + url, + method: "PUT", + body: factoryMethod, + headers: createHttpHeaders({ "content-type": "application/octet-stream" }), + allowInsecureConnection: true, + streamResponseStatusCodes: new Set([Number.POSITIVE_INFINITY]), + }); + await client.sendRequest(request); + assert.isTrue(bodySent, "body should have been sent to request"); + }); + it("should honor timeout", async function () { const timeoutLength = 2000; const mockedResponse = createResponse(404); diff --git a/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts b/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts index 3085d15b08f0..ae3156226be7 100644 --- a/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts +++ b/sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts @@ -3,7 +3,7 @@ import { assert } from "chai"; import * as sinon from "sinon"; -import { PassThrough } from "stream"; +import { PassThrough, Writable } from "stream"; import { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http"; import * as https from "https"; import * as http from "http"; @@ -344,6 +344,56 @@ describe("NodeHttpClient", function () { assert.strictEqual(response.status, 200); }); + it("should handle NodeJS.ReadableStream bodies correctly", async function () { + const requestText = "testing resettable stream"; + const client = createDefaultHttpClient(); + let bodySent = false; + const writable = new Writable({ + write: (chunk, _, next) => { + bodySent = true; + assert.equal(chunk.toString(), requestText, "Unexpected body"); + next(); + }, + }); + stubbedHttpsRequest.returns(writable); + + const stream = new PassThrough(); + stream.write(requestText); + stream.end(); + const body = stream; + const request = createPipelineRequest({ url: "https://example.com", body }); + const promise = client.sendRequest(request); + stubbedHttpsRequest.yield(createResponse(200)); + await promise; + assert.isTrue(bodySent, "body should have been piped to request"); + }); + + it("should handle () => NodeJS.ReadableStream bodies correctly", async function () { + const requestText = "testing resettable stream"; + const client = createDefaultHttpClient(); + let bodySent = false; + const writable = new Writable({ + write: (chunk, _, next) => { + bodySent = true; + assert.equal(chunk.toString(), requestText, "Unexpected body"); + next(); + }, + }); + stubbedHttpsRequest.returns(writable); + + const body = () => { + const stream = new PassThrough(); + stream.write(requestText); + stream.end(); + return stream; + }; + const request = createPipelineRequest({ url: "https://example.com", body }); + const promise = client.sendRequest(request); + stubbedHttpsRequest.yield(createResponse(200)); + await promise; + assert.isTrue(bodySent, "body should have been piped to request"); + }); + it("should return an AbortError when aborted while reading the HTTP response", async function () { clock.restore(); const client = createDefaultHttpClient(); From f78649ae14e3b289c8893ce9d36fdaa9f6669241 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 24 Mar 2022 16:47:20 +0000 Subject: [PATCH 2/5] bump version --- sdk/core/core-rest-pipeline/package.json | 2 +- sdk/core/core-rest-pipeline/src/constants.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/core/core-rest-pipeline/package.json b/sdk/core/core-rest-pipeline/package.json index 09da83a8c461..fb32bf483c39 100644 --- a/sdk/core/core-rest-pipeline/package.json +++ b/sdk/core/core-rest-pipeline/package.json @@ -1,6 +1,6 @@ { "name": "@azure/core-rest-pipeline", - "version": "1.7.1", + "version": "1.8.0", "description": "Isomorphic client library for making HTTP requests in node.js and browser.", "sdk-type": "client", "main": "dist/index.js", diff --git a/sdk/core/core-rest-pipeline/src/constants.ts b/sdk/core/core-rest-pipeline/src/constants.ts index 277310861409..c3ea4dd5cdde 100644 --- a/sdk/core/core-rest-pipeline/src/constants.ts +++ b/sdk/core/core-rest-pipeline/src/constants.ts @@ -1,6 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -export const SDK_VERSION: string = "1.7.1"; +export const SDK_VERSION: string = "1.8.0"; export const DEFAULT_RETRY_POLICY_COUNT = 3; From 717e07a069b603f7171bcd69a4ef36f523d6f0c2 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 24 Mar 2022 16:51:37 +0000 Subject: [PATCH 3/5] fix xhrHttpClient in test-uttils --- sdk/test-utils/test-utils/src/xhrHttpClient.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/test-utils/test-utils/src/xhrHttpClient.ts b/sdk/test-utils/test-utils/src/xhrHttpClient.ts index 836aee973b53..8aa4e185273d 100644 --- a/sdk/test-utils/test-utils/src/xhrHttpClient.ts +++ b/sdk/test-utils/test-utils/src/xhrHttpClient.ts @@ -68,11 +68,12 @@ class XhrHttpClient implements HttpClient { xhr.responseType = request.streamResponseStatusCodes?.size ? "blob" : "text"; - if (isReadableStream(request.body)) { + const body = typeof request.body === "function" ? request.body() : request.body; + if (isReadableStream(body)) { throw new Error("Node streams are not supported in browser environment."); } - xhr.send(request.body === undefined ? null : request.body); + xhr.send(body === undefined ? null : body); if (xhr.responseType === "blob") { return new Promise((resolve, reject) => { From c6b8f79cec943ca6094469653f34b89d0157a285 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 24 Mar 2022 16:54:58 +0000 Subject: [PATCH 4/5] update api.md --- sdk/core/core-rest-pipeline/review/core-rest-pipeline.api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/core-rest-pipeline/review/core-rest-pipeline.api.md b/sdk/core/core-rest-pipeline/review/core-rest-pipeline.api.md index 19ebd01b7285..299187b4de46 100644 --- a/sdk/core/core-rest-pipeline/review/core-rest-pipeline.api.md +++ b/sdk/core/core-rest-pipeline/review/core-rest-pipeline.api.md @@ -295,7 +295,7 @@ export interface RedirectPolicyOptions { } // @public -export type RequestBodyType = NodeJS.ReadableStream | ReadableStream | Blob | ArrayBuffer | ArrayBufferView | FormData | string | null; +export type RequestBodyType = NodeJS.ReadableStream | (() => NodeJS.ReadableStream) | ReadableStream | (() => ReadableStream) | Blob | ArrayBuffer | ArrayBufferView | FormData | string | null; // @public export class RestError extends Error { From 24f6b1e5dfde73f64ea31b5668ac4896d00a0778 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 24 Mar 2022 22:42:28 +0000 Subject: [PATCH 5/5] add PR link to changelog --- sdk/core/core-rest-pipeline/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/core-rest-pipeline/CHANGELOG.md b/sdk/core/core-rest-pipeline/CHANGELOG.md index 835efb44590d..9f12a6ed2177 100644 --- a/sdk/core/core-rest-pipeline/CHANGELOG.md +++ b/sdk/core/core-rest-pipeline/CHANGELOG.md @@ -4,7 +4,7 @@ ### Features Added -- Support resettable streams in the form of `() => NodeJS.ReadableStream` for NodeJS and `() => ReadableStream` for browser. +- Support resettable streams in the form of `() => NodeJS.ReadableStream` for NodeJS and `() => ReadableStream` for browser. [#21013](https://github.com/Azure/azure-sdk-for-js/pull/21013) ### Breaking Changes