From 3183e8dc0c89c1fd4d5d26554193024c6d0bf584 Mon Sep 17 00:00:00 2001 From: Jeff Fisher Date: Thu, 17 Sep 2020 18:05:55 -0700 Subject: [PATCH 1/2] An initial shot at sending ndjson --- sdk/core/core-http/review/core-http.api.md | 1 + sdk/core/core-http/src/pipelineOptions.ts | 5 ++ .../core-http/src/policies/ndJsonPolicy.ts | 51 +++++++++++++++++++ sdk/core/core-http/src/serviceClient.ts | 5 ++ .../test/policies/ndJsonPolicyTests.ts | 31 +++++++++++ 5 files changed, 93 insertions(+) create mode 100644 sdk/core/core-http/src/policies/ndJsonPolicy.ts create mode 100644 sdk/core/core-http/test/policies/ndJsonPolicyTests.ts diff --git a/sdk/core/core-http/review/core-http.api.md b/sdk/core/core-http/review/core-http.api.md index ec6cd89c7816..b9719e237eea 100644 --- a/sdk/core/core-http/review/core-http.api.md +++ b/sdk/core/core-http/review/core-http.api.md @@ -351,6 +351,7 @@ export interface InternalPipelineOptions extends PipelineOptions { decompressResponse?: boolean; deserializationOptions?: DeserializationOptions; loggingOptions?: LogPolicyOptions; + sendStreamingJson?: boolean; } // @public diff --git a/sdk/core/core-http/src/pipelineOptions.ts b/sdk/core/core-http/src/pipelineOptions.ts index 673f5ed86efc..96e001c84e5c 100644 --- a/sdk/core/core-http/src/pipelineOptions.ts +++ b/sdk/core/core-http/src/pipelineOptions.ts @@ -67,4 +67,9 @@ export interface InternalPipelineOptions extends PipelineOptions { * Configure whether to decompress response according to Accept-Encoding header (node-fetch only) */ decompressResponse?: boolean; + + /** + * Send JSON Array payloads as NDJSON. + */ + sendStreamingJson?: boolean; } diff --git a/sdk/core/core-http/src/policies/ndJsonPolicy.ts b/sdk/core/core-http/src/policies/ndJsonPolicy.ts new file mode 100644 index 000000000000..42cb1ac621c3 --- /dev/null +++ b/sdk/core/core-http/src/policies/ndJsonPolicy.ts @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +// BaseRequestPolicy has a protected constructor. +/* eslint-disable @typescript-eslint/no-useless-constructor */ + +import { + BaseRequestPolicy, + RequestPolicy, + RequestPolicyOptions, + RequestPolicyFactory +} from "./requestPolicy"; +import { WebResourceLike } from "../webResource"; +import { HttpOperationResponse } from "../httpOperationResponse"; + +export function ndJsonPolicy(): RequestPolicyFactory { + return { + create: (nextPolicy: RequestPolicy, options: RequestPolicyOptions) => { + return new NdJsonPolicy(nextPolicy, options); + } + }; +} + +/** + * NdJsonPolicy that formats a JSON array as newline-delimited JSON + */ +class NdJsonPolicy extends BaseRequestPolicy { + /** + * Creates an instance of KeepAlivePolicy. + * + * @param nextPolicy + * @param options + */ + constructor(nextPolicy: RequestPolicy, options: RequestPolicyOptions) { + super(nextPolicy, options); + } + + /** + * Sends a request. + * + * @param request + */ + public async sendRequest(request: WebResourceLike): Promise { + // There currently isn't a good way to bypass the serializer + const body = JSON.parse(request.body); + if (Array.isArray(body)) { + request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + } + return this._nextPolicy.sendRequest(request); + } +} diff --git a/sdk/core/core-http/src/serviceClient.ts b/sdk/core/core-http/src/serviceClient.ts index 1bed817c86d4..c63433ca30ce 100644 --- a/sdk/core/core-http/src/serviceClient.ts +++ b/sdk/core/core-http/src/serviceClient.ts @@ -59,6 +59,7 @@ import { InternalPipelineOptions } from "./pipelineOptions"; import { DefaultKeepAliveOptions, keepAlivePolicy } from "./policies/keepAlivePolicy"; import { tracingPolicy } from "./policies/tracingPolicy"; import { disableResponseDecompressionPolicy } from "./policies/disableResponseDecompressionPolicy"; +import { ndJsonPolicy } from "./policies/ndJsonPolicy"; /** * Options to configure a proxy for outgoing requests (Node.js only). @@ -677,6 +678,10 @@ export function createPipelineFromOptions( ): ServiceClientOptions { const requestPolicyFactories: RequestPolicyFactory[] = []; + if (pipelineOptions.sendStreamingJson) { + requestPolicyFactories.push(ndJsonPolicy()); + } + let userAgentValue = undefined; if (pipelineOptions.userAgentOptions && pipelineOptions.userAgentOptions.userAgentPrefix) { const userAgentInfo: string[] = []; diff --git a/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts b/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts new file mode 100644 index 000000000000..26394d139306 --- /dev/null +++ b/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import { RequestPolicyOptions } from "../../src/policies/requestPolicy"; +import { WebResource } from "../../src/webResource"; +import { HttpHeaders } from "../../src/httpHeaders"; +import { ndJsonPolicy } from "../../src/policies/ndJsonPolicy"; + +describe("NdJsonPolicy", function() { + const returnOk = { + sendRequest: async (request: WebResource) => { + return { + request, + status: 200, + headers: new HttpHeaders() + }; + } + }; + + const emptyPolicyOptions = new RequestPolicyOptions(); + + it("Formats arrays correctly", async function() { + const factory = ndJsonPolicy(); + const policy = factory.create(returnOk, emptyPolicyOptions); + const request = new WebResource(); + request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]); + const result = await policy.sendRequest(request); + assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`); + }); +}); From 96d050739774f496aa07718a92f869a805faad5a Mon Sep 17 00:00:00 2001 From: Jeff Fisher Date: Wed, 23 Sep 2020 15:21:21 -0700 Subject: [PATCH 2/2] Feedback & core-https support --- .../core-http/src/policies/ndJsonPolicy.ts | 8 ++-- sdk/core/core-https/review/core-https.api.md | 7 ++++ sdk/core/core-https/src/index.ts | 1 + sdk/core/core-https/src/pipeline.ts | 10 +++++ .../core-https/src/policies/ndJsonPolicy.ts | 29 +++++++++++++++ sdk/core/core-https/test/ndJsonPolicy.spec.ts | 37 +++++++++++++++++++ 6 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 sdk/core/core-https/src/policies/ndJsonPolicy.ts create mode 100644 sdk/core/core-https/test/ndJsonPolicy.spec.ts diff --git a/sdk/core/core-http/src/policies/ndJsonPolicy.ts b/sdk/core/core-http/src/policies/ndJsonPolicy.ts index 42cb1ac621c3..85719ceac038 100644 --- a/sdk/core/core-http/src/policies/ndJsonPolicy.ts +++ b/sdk/core/core-http/src/policies/ndJsonPolicy.ts @@ -42,9 +42,11 @@ class NdJsonPolicy extends BaseRequestPolicy { */ public async sendRequest(request: WebResourceLike): Promise { // There currently isn't a good way to bypass the serializer - const body = JSON.parse(request.body); - if (Array.isArray(body)) { - request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + if (typeof request.body === "string" && request.body.startsWith("[")) { + const body = JSON.parse(request.body); + if (Array.isArray(body)) { + request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + } } return this._nextPolicy.sendRequest(request); } diff --git a/sdk/core/core-https/review/core-https.api.md b/sdk/core/core-https/review/core-https.api.md index 15952cf14592..f7a8463ce2ae 100644 --- a/sdk/core/core-https/review/core-https.api.md +++ b/sdk/core/core-https/review/core-https.api.md @@ -104,6 +104,7 @@ export interface HttpsClient { export interface InternalPipelineOptions extends PipelineOptions { decompressResponse?: boolean; loggingOptions?: LogPolicyOptions; + sendStreamingJson?: boolean; } // @public @@ -130,6 +131,12 @@ export interface LogPolicyOptions { logger?: Debugger; } +// @public +export function ndJsonPolicy(): PipelinePolicy; + +// @public +export const ndJsonPolicyName = "ndJsonPolicy"; + // @public export interface Pipeline { addPolicy(policy: PipelinePolicy, options?: AddPipelineOptions): void; diff --git a/sdk/core/core-https/src/index.ts b/sdk/core/core-https/src/index.ts index 649c6122f06f..be122a8aabeb 100644 --- a/sdk/core/core-https/src/index.ts +++ b/sdk/core/core-https/src/index.ts @@ -73,3 +73,4 @@ export { BearerTokenAuthenticationPolicyOptions, bearerTokenAuthenticationPolicyName } from "./policies/bearerTokenAuthenticationPolicy"; +export { ndJsonPolicy, ndJsonPolicyName } from "./policies/ndJsonPolicy"; diff --git a/sdk/core/core-https/src/pipeline.ts b/sdk/core/core-https/src/pipeline.ts index 4b66e7c6be3d..6970ff598b91 100644 --- a/sdk/core/core-https/src/pipeline.ts +++ b/sdk/core/core-https/src/pipeline.ts @@ -24,6 +24,7 @@ import { disableResponseDecompressionPolicy } from "./policies/disableResponseDe import { proxyPolicy } from "./policies/proxyPolicy"; import { isNode } from "./util/helpers"; import { formDataPolicy } from "./policies/formDataPolicy"; +import { ndJsonPolicy } from "./policies/ndJsonPolicy"; /** * Policies are executed in phases. @@ -428,6 +429,11 @@ export interface InternalPipelineOptions extends PipelineOptions { * Configure whether to decompress response according to Accept-Encoding header (node-fetch only) */ decompressResponse?: boolean; + + /** + * Send JSON Array payloads as NDJSON. + */ + sendStreamingJson?: boolean; } /** @@ -437,6 +443,10 @@ export interface InternalPipelineOptions extends PipelineOptions { export function createPipelineFromOptions(options: InternalPipelineOptions): Pipeline { const pipeline = HttpsPipeline.create(); + if (options.sendStreamingJson) { + pipeline.addPolicy(ndJsonPolicy()); + } + if (isNode) { pipeline.addPolicy(proxyPolicy(options.proxyOptions)); diff --git a/sdk/core/core-https/src/policies/ndJsonPolicy.ts b/sdk/core/core-https/src/policies/ndJsonPolicy.ts new file mode 100644 index 000000000000..d476205b1ebb --- /dev/null +++ b/sdk/core/core-https/src/policies/ndJsonPolicy.ts @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PipelineResponse, PipelineRequest, SendRequest } from "../interfaces"; +import { PipelinePolicy } from "../pipeline"; + +/** + * The programmatic identifier of the keepAlivePolicy. + */ +export const ndJsonPolicyName = "ndJsonPolicy"; + +/** + * ndJsonPolicy is a policy used to control keep alive settings for every request. + */ +export function ndJsonPolicy(): PipelinePolicy { + return { + name: ndJsonPolicyName, + async sendRequest(request: PipelineRequest, next: SendRequest): Promise { + // There currently isn't a good way to bypass the serializer + if (typeof request.body === "string" && request.body.startsWith("[")) { + const body = JSON.parse(request.body); + if (Array.isArray(body)) { + request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + } + } + return next(request); + } + }; +} diff --git a/sdk/core/core-https/test/ndJsonPolicy.spec.ts b/sdk/core/core-https/test/ndJsonPolicy.spec.ts new file mode 100644 index 000000000000..22b00171845d --- /dev/null +++ b/sdk/core/core-https/test/ndJsonPolicy.spec.ts @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import * as sinon from "sinon"; +import { + createPipelineRequest, + SendRequest, + PipelineResponse, + createHttpHeaders, + ndJsonPolicy +} from "../src"; + +describe("NdJsonPolicy", function() { + afterEach(function() { + sinon.restore(); + }); + + it("Formats arrays correctly", async function() { + const request = createPipelineRequest({ + url: "https://bing.com" + }); + request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]); + const successResponse: PipelineResponse = { + headers: createHttpHeaders(), + request, + status: 200 + }; + const next = sinon.stub, ReturnType>(); + next.resolves(successResponse); + + const policy = ndJsonPolicy(); + + const result = await policy.sendRequest(request, next); + assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`); + }); +});