Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-rest-pipeline] support resettable stream request body #21013

Merged
merged 6 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/core/core-rest-pipeline/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Release History

## 1.7.1 (Unreleased)
## 1.8.0 (Unreleased)

### Features Added

- Support resettable streams in the form of `() => NodeJS.ReadableStream` for NodeJS and `() => ReadableStream` for browser.

### Breaking Changes

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-rest-pipeline/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@azure/core-rest-pipeline",
"version": "1.7.1",
"version": "1.8.0",
"description": "Isomorphic client library for making HTTP requests in node.js and browser.",
"sdk-type": "client",
"main": "dist/index.js",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ export interface RedirectPolicyOptions {
}

// @public
export type RequestBodyType = NodeJS.ReadableStream | ReadableStream<Uint8Array> | Blob | ArrayBuffer | ArrayBufferView | FormData | string | null;
export type RequestBodyType = NodeJS.ReadableStream | (() => NodeJS.ReadableStream) | ReadableStream<Uint8Array> | (() => ReadableStream<Uint8Array>) | Blob | ArrayBuffer | ArrayBufferView | FormData | string | null;

// @public
export class RestError extends Error {
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-rest-pipeline/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

export const SDK_VERSION: string = "1.7.1";
export const SDK_VERSION: string = "1.8.0";

export const DEFAULT_RETRY_POLICY_COUNT = 3;
7 changes: 3 additions & 4 deletions sdk/core/core-rest-pipeline/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,12 @@ function buildPipelineHeaders(httpResponse: Response): PipelineHeaders {
}

function buildRequestBody(request: PipelineRequest) {
if (isNodeReadableStream(request.body)) {
const body = typeof request.body === "function" ? request.body() : request.body;
if (isNodeReadableStream(body)) {
throw new Error("Node streams are not supported in browser environment.");
}

return isReadableStream(request.body)
? buildBodyStream(request.body, request.onUploadProgress)
: request.body;
return isReadableStream(body) ? buildBodyStream(body, request.onUploadProgress) : body;
}

/**
Expand Down
6 changes: 4 additions & 2 deletions sdk/core/core-rest-pipeline/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ export interface HttpHeaders extends Iterable<[string, string]> {

/**
* Types of bodies supported on the request.
* NodeJS.ReadableStream is Node only.
* Blob and ReadableStream<Uint8Array> are browser only.
* NodeJS.ReadableStream and () =\> NodeJS.ReadableStream is Node only.
* Blob, ReadableStream<Uint8Array>, and () =\> ReadableStream<Uint8Array> are browser only.
*/
export type RequestBodyType =
| NodeJS.ReadableStream
| (() => NodeJS.ReadableStream)
| ReadableStream<Uint8Array>
| (() => ReadableStream<Uint8Array>)
| Blob
| ArrayBuffer
| ArrayBufferView
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-rest-pipeline/src/nodeHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class NodeHttpClient implements HttpClient {
const acceptEncoding = request.headers.get("Accept-Encoding");
const shouldDecompress =
acceptEncoding?.includes("gzip") || acceptEncoding?.includes("deflate");
let body = request.body;

let body = typeof request.body === "function" ? request.body() : request.body;
if (body && !request.headers.has("Content-Length")) {
const bodyLength = getBodyLength(body);
if (bodyLength !== null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,84 @@ describe("FetchHttpClient", function () {
assert.isTrue(downloadCalled, "no download progress");
});

it("should handle ReadableStream request body type", async () => {
const client = createFetchHttpClient();
const requestText = "testing resettable stream";
const url = `http://localhost:3000/formdata/stream/uploadfile`;

let bodySent = false;
const stream = new ReadableStream({
start(controller) {
controller.enqueue(requestText);
controller.close();
},
});
fetchMock.callsFake(async (_url, options) => {
const body = options.body;
assert.isTrue(
body &&
typeof (body as ReadableStream).getReader === "function" &&
typeof (body as ReadableStream).tee === "function",
"expecting ReadableStream request body"
);
const reader = (body as ReadableStream).getReader();
const data = await reader.read();
assert.equal(data.value, requestText, "unexpected request text");
bodySent = true;
return new Response(undefined, { status: 200 });
});
const request = createPipelineRequest({
url,
method: "PUT",
body: stream,
headers: createHttpHeaders({ "content-type": "application/octet-stream" }),
allowInsecureConnection: true,
streamResponseStatusCodes: new Set([Number.POSITIVE_INFINITY]),
});
await client.sendRequest(request);
assert.isTrue(bodySent, "body should have been sent to request");
});

it("should handle () => ReadableStream request body type", async () => {
const client = createFetchHttpClient();
const requestText = "testing resettable stream";
const url = `http://localhost:3000/formdata/stream/uploadfile`;

let bodySent = false;
const factoryMethod = () => {
return new ReadableStream({
start(controller) {
controller.enqueue(requestText);
controller.close();
},
});
};
fetchMock.callsFake(async (_url, options) => {
const body = options.body;
assert.isTrue(
body &&
typeof (body as ReadableStream).getReader === "function" &&
typeof (body as ReadableStream).tee === "function",
"expecting ReadableStream request body"
);
const reader = (body as ReadableStream).getReader();
const data = await reader.read();
assert.equal(data.value, requestText, "unexpected request text");
bodySent = true;
return new Response(undefined, { status: 200 });
});
const request = createPipelineRequest({
url,
method: "PUT",
body: factoryMethod,
headers: createHttpHeaders({ "content-type": "application/octet-stream" }),
allowInsecureConnection: true,
streamResponseStatusCodes: new Set([Number.POSITIVE_INFINITY]),
});
await client.sendRequest(request);
assert.isTrue(bodySent, "body should have been sent to request");
});

it("should honor timeout", async function () {
const timeoutLength = 2000;
const mockedResponse = createResponse(404);
Expand Down
52 changes: 51 additions & 1 deletion sdk/core/core-rest-pipeline/test/node/nodeHttpClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { assert } from "chai";
import * as sinon from "sinon";
import { PassThrough } from "stream";
import { PassThrough, Writable } from "stream";
import { ClientRequest, IncomingHttpHeaders, IncomingMessage } from "http";
import * as https from "https";
import * as http from "http";
Expand Down Expand Up @@ -344,6 +344,56 @@ describe("NodeHttpClient", function () {
assert.strictEqual(response.status, 200);
});

it("should handle NodeJS.ReadableStream bodies correctly", async function () {
const requestText = "testing resettable stream";
const client = createDefaultHttpClient();
let bodySent = false;
const writable = new Writable({
write: (chunk, _, next) => {
bodySent = true;
assert.equal(chunk.toString(), requestText, "Unexpected body");
next();
},
});
stubbedHttpsRequest.returns(writable);

const stream = new PassThrough();
stream.write(requestText);
stream.end();
const body = stream;
const request = createPipelineRequest({ url: "https://example.com", body });
const promise = client.sendRequest(request);
stubbedHttpsRequest.yield(createResponse(200));
await promise;
assert.isTrue(bodySent, "body should have been piped to request");
});

it("should handle () => NodeJS.ReadableStream bodies correctly", async function () {
const requestText = "testing resettable stream";
const client = createDefaultHttpClient();
let bodySent = false;
const writable = new Writable({
write: (chunk, _, next) => {
bodySent = true;
assert.equal(chunk.toString(), requestText, "Unexpected body");
next();
},
});
stubbedHttpsRequest.returns(writable);

const body = () => {
const stream = new PassThrough();
stream.write(requestText);
stream.end();
return stream;
};
const request = createPipelineRequest({ url: "https://example.com", body });
const promise = client.sendRequest(request);
stubbedHttpsRequest.yield(createResponse(200));
await promise;
assert.isTrue(bodySent, "body should have been piped to request");
});

it("should return an AbortError when aborted while reading the HTTP response", async function () {
clock.restore();
const client = createDefaultHttpClient();
Expand Down
5 changes: 3 additions & 2 deletions sdk/test-utils/test-utils/src/xhrHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ class XhrHttpClient implements HttpClient {

xhr.responseType = request.streamResponseStatusCodes?.size ? "blob" : "text";

if (isReadableStream(request.body)) {
const body = typeof request.body === "function" ? request.body() : request.body;
if (isReadableStream(body)) {
throw new Error("Node streams are not supported in browser environment.");
}

xhr.send(request.body === undefined ? null : request.body);
xhr.send(body === undefined ? null : body);

if (xhr.responseType === "blob") {
return new Promise((resolve, reject) => {
Expand Down