diff --git a/sdk/core/core-http/review/core-http.api.md b/sdk/core/core-http/review/core-http.api.md index 0586887a7575..105b283bbf6e 100644 --- a/sdk/core/core-http/review/core-http.api.md +++ b/sdk/core/core-http/review/core-http.api.md @@ -341,6 +341,7 @@ export interface InternalPipelineOptions extends PipelineOptions { decompressResponse?: boolean; deserializationOptions?: DeserializationOptions; loggingOptions?: LogPolicyOptions; + sendStreamingJson?: boolean; } // @public diff --git a/sdk/core/core-http/src/pipelineOptions.ts b/sdk/core/core-http/src/pipelineOptions.ts index 673f5ed86efc..96e001c84e5c 100644 --- a/sdk/core/core-http/src/pipelineOptions.ts +++ b/sdk/core/core-http/src/pipelineOptions.ts @@ -67,4 +67,9 @@ export interface InternalPipelineOptions extends PipelineOptions { * Configure whether to decompress response according to Accept-Encoding header (node-fetch only) */ decompressResponse?: boolean; + + /** + * Send JSON Array payloads as NDJSON. + */ + sendStreamingJson?: boolean; } diff --git a/sdk/core/core-http/src/policies/ndJsonPolicy.ts b/sdk/core/core-http/src/policies/ndJsonPolicy.ts new file mode 100644 index 000000000000..85719ceac038 --- /dev/null +++ b/sdk/core/core-http/src/policies/ndJsonPolicy.ts @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +// BaseRequestPolicy has a protected constructor. +/* eslint-disable @typescript-eslint/no-useless-constructor */ + +import { + BaseRequestPolicy, + RequestPolicy, + RequestPolicyOptions, + RequestPolicyFactory +} from "./requestPolicy"; +import { WebResourceLike } from "../webResource"; +import { HttpOperationResponse } from "../httpOperationResponse"; + +export function ndJsonPolicy(): RequestPolicyFactory { + return { + create: (nextPolicy: RequestPolicy, options: RequestPolicyOptions) => { + return new NdJsonPolicy(nextPolicy, options); + } + }; +} + +/** + * NdJsonPolicy that formats a JSON array as newline-delimited JSON + */ +class NdJsonPolicy extends BaseRequestPolicy { + /** + * Creates an instance of KeepAlivePolicy. + * + * @param nextPolicy + * @param options + */ + constructor(nextPolicy: RequestPolicy, options: RequestPolicyOptions) { + super(nextPolicy, options); + } + + /** + * Sends a request. + * + * @param request + */ + public async sendRequest(request: WebResourceLike): Promise { + // There currently isn't a good way to bypass the serializer + if (typeof request.body === "string" && request.body.startsWith("[")) { + const body = JSON.parse(request.body); + if (Array.isArray(body)) { + request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + } + } + return this._nextPolicy.sendRequest(request); + } +} diff --git a/sdk/core/core-http/src/serviceClient.ts b/sdk/core/core-http/src/serviceClient.ts index 4f67810a9f6f..9e3140176e00 100644 --- a/sdk/core/core-http/src/serviceClient.ts +++ b/sdk/core/core-http/src/serviceClient.ts @@ -59,6 +59,7 @@ import { InternalPipelineOptions } from "./pipelineOptions"; import { DefaultKeepAliveOptions, keepAlivePolicy } from "./policies/keepAlivePolicy"; import { tracingPolicy } from "./policies/tracingPolicy"; import { disableResponseDecompressionPolicy } from "./policies/disableResponseDecompressionPolicy"; +import { ndJsonPolicy } from "./policies/ndJsonPolicy"; /** * Options to configure a proxy for outgoing requests (Node.js only). @@ -711,6 +712,10 @@ export function createPipelineFromOptions( ): ServiceClientOptions { const requestPolicyFactories: RequestPolicyFactory[] = []; + if (pipelineOptions.sendStreamingJson) { + requestPolicyFactories.push(ndJsonPolicy()); + } + let userAgentValue = undefined; if (pipelineOptions.userAgentOptions && pipelineOptions.userAgentOptions.userAgentPrefix) { const userAgentInfo: string[] = []; diff --git a/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts b/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts new file mode 100644 index 000000000000..26394d139306 --- /dev/null +++ b/sdk/core/core-http/test/policies/ndJsonPolicyTests.ts @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import { RequestPolicyOptions } from "../../src/policies/requestPolicy"; +import { WebResource } from "../../src/webResource"; +import { HttpHeaders } from "../../src/httpHeaders"; +import { ndJsonPolicy } from "../../src/policies/ndJsonPolicy"; + +describe("NdJsonPolicy", function() { + const returnOk = { + sendRequest: async (request: WebResource) => { + return { + request, + status: 200, + headers: new HttpHeaders() + }; + } + }; + + const emptyPolicyOptions = new RequestPolicyOptions(); + + it("Formats arrays correctly", async function() { + const factory = ndJsonPolicy(); + const policy = factory.create(returnOk, emptyPolicyOptions); + const request = new WebResource(); + request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]); + const result = await policy.sendRequest(request); + assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`); + }); +}); diff --git a/sdk/core/core-https/review/core-https.api.md b/sdk/core/core-https/review/core-https.api.md index 15952cf14592..f7a8463ce2ae 100644 --- a/sdk/core/core-https/review/core-https.api.md +++ b/sdk/core/core-https/review/core-https.api.md @@ -104,6 +104,7 @@ export interface HttpsClient { export interface InternalPipelineOptions extends PipelineOptions { decompressResponse?: boolean; loggingOptions?: LogPolicyOptions; + sendStreamingJson?: boolean; } // @public @@ -130,6 +131,12 @@ export interface LogPolicyOptions { logger?: Debugger; } +// @public +export function ndJsonPolicy(): PipelinePolicy; + +// @public +export const ndJsonPolicyName = "ndJsonPolicy"; + // @public export interface Pipeline { addPolicy(policy: PipelinePolicy, options?: AddPipelineOptions): void; diff --git a/sdk/core/core-https/src/index.ts b/sdk/core/core-https/src/index.ts index 649c6122f06f..be122a8aabeb 100644 --- a/sdk/core/core-https/src/index.ts +++ b/sdk/core/core-https/src/index.ts @@ -73,3 +73,4 @@ export { BearerTokenAuthenticationPolicyOptions, bearerTokenAuthenticationPolicyName } from "./policies/bearerTokenAuthenticationPolicy"; +export { ndJsonPolicy, ndJsonPolicyName } from "./policies/ndJsonPolicy"; diff --git a/sdk/core/core-https/src/pipeline.ts b/sdk/core/core-https/src/pipeline.ts index 4b66e7c6be3d..6970ff598b91 100644 --- a/sdk/core/core-https/src/pipeline.ts +++ b/sdk/core/core-https/src/pipeline.ts @@ -24,6 +24,7 @@ import { disableResponseDecompressionPolicy } from "./policies/disableResponseDe import { proxyPolicy } from "./policies/proxyPolicy"; import { isNode } from "./util/helpers"; import { formDataPolicy } from "./policies/formDataPolicy"; +import { ndJsonPolicy } from "./policies/ndJsonPolicy"; /** * Policies are executed in phases. @@ -428,6 +429,11 @@ export interface InternalPipelineOptions extends PipelineOptions { * Configure whether to decompress response according to Accept-Encoding header (node-fetch only) */ decompressResponse?: boolean; + + /** + * Send JSON Array payloads as NDJSON. + */ + sendStreamingJson?: boolean; } /** @@ -437,6 +443,10 @@ export interface InternalPipelineOptions extends PipelineOptions { export function createPipelineFromOptions(options: InternalPipelineOptions): Pipeline { const pipeline = HttpsPipeline.create(); + if (options.sendStreamingJson) { + pipeline.addPolicy(ndJsonPolicy()); + } + if (isNode) { pipeline.addPolicy(proxyPolicy(options.proxyOptions)); diff --git a/sdk/core/core-https/src/policies/ndJsonPolicy.ts b/sdk/core/core-https/src/policies/ndJsonPolicy.ts new file mode 100644 index 000000000000..d476205b1ebb --- /dev/null +++ b/sdk/core/core-https/src/policies/ndJsonPolicy.ts @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { PipelineResponse, PipelineRequest, SendRequest } from "../interfaces"; +import { PipelinePolicy } from "../pipeline"; + +/** + * The programmatic identifier of the keepAlivePolicy. + */ +export const ndJsonPolicyName = "ndJsonPolicy"; + +/** + * ndJsonPolicy is a policy used to control keep alive settings for every request. + */ +export function ndJsonPolicy(): PipelinePolicy { + return { + name: ndJsonPolicyName, + async sendRequest(request: PipelineRequest, next: SendRequest): Promise { + // There currently isn't a good way to bypass the serializer + if (typeof request.body === "string" && request.body.startsWith("[")) { + const body = JSON.parse(request.body); + if (Array.isArray(body)) { + request.body = body.map((item) => JSON.stringify(item) + "\n").join(""); + } + } + return next(request); + } + }; +} diff --git a/sdk/core/core-https/test/ndJsonPolicy.spec.ts b/sdk/core/core-https/test/ndJsonPolicy.spec.ts new file mode 100644 index 000000000000..22b00171845d --- /dev/null +++ b/sdk/core/core-https/test/ndJsonPolicy.spec.ts @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert } from "chai"; +import * as sinon from "sinon"; +import { + createPipelineRequest, + SendRequest, + PipelineResponse, + createHttpHeaders, + ndJsonPolicy +} from "../src"; + +describe("NdJsonPolicy", function() { + afterEach(function() { + sinon.restore(); + }); + + it("Formats arrays correctly", async function() { + const request = createPipelineRequest({ + url: "https://bing.com" + }); + request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]); + const successResponse: PipelineResponse = { + headers: createHttpHeaders(), + request, + status: 200 + }; + const next = sinon.stub, ReturnType>(); + next.resolves(successResponse); + + const policy = ndJsonPolicy(); + + const result = await policy.sendRequest(request, next); + assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`); + }); +});