Skip to content

Commit

Permalink
[core-http] Add NDJSON support (#11325)
Browse files Browse the repository at this point in the history
  • Loading branch information
xirzec authored Sep 23, 2020
1 parent 6f0fbca commit ff3509f
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdk/core/core-http/review/core-http.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ export interface InternalPipelineOptions extends PipelineOptions {
decompressResponse?: boolean;
deserializationOptions?: DeserializationOptions;
loggingOptions?: LogPolicyOptions;
sendStreamingJson?: boolean;
}

// @public
Expand Down
5 changes: 5 additions & 0 deletions sdk/core/core-http/src/pipelineOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ export interface InternalPipelineOptions extends PipelineOptions {
* Configure whether to decompress response according to Accept-Encoding header (node-fetch only)
*/
decompressResponse?: boolean;

/**
* Send JSON Array payloads as NDJSON.
*/
sendStreamingJson?: boolean;
}
53 changes: 53 additions & 0 deletions sdk/core/core-http/src/policies/ndJsonPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

// BaseRequestPolicy has a protected constructor.
/* eslint-disable @typescript-eslint/no-useless-constructor */

import {
BaseRequestPolicy,
RequestPolicy,
RequestPolicyOptions,
RequestPolicyFactory
} from "./requestPolicy";
import { WebResourceLike } from "../webResource";
import { HttpOperationResponse } from "../httpOperationResponse";

export function ndJsonPolicy(): RequestPolicyFactory {
return {
create: (nextPolicy: RequestPolicy, options: RequestPolicyOptions) => {
return new NdJsonPolicy(nextPolicy, options);
}
};
}

/**
* NdJsonPolicy that formats a JSON array as newline-delimited JSON
*/
class NdJsonPolicy extends BaseRequestPolicy {
/**
* Creates an instance of KeepAlivePolicy.
*
* @param nextPolicy
* @param options
*/
constructor(nextPolicy: RequestPolicy, options: RequestPolicyOptions) {
super(nextPolicy, options);
}

/**
* Sends a request.
*
* @param request
*/
public async sendRequest(request: WebResourceLike): Promise<HttpOperationResponse> {
// There currently isn't a good way to bypass the serializer
if (typeof request.body === "string" && request.body.startsWith("[")) {
const body = JSON.parse(request.body);
if (Array.isArray(body)) {
request.body = body.map((item) => JSON.stringify(item) + "\n").join("");
}
}
return this._nextPolicy.sendRequest(request);
}
}
5 changes: 5 additions & 0 deletions sdk/core/core-http/src/serviceClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import { InternalPipelineOptions } from "./pipelineOptions";
import { DefaultKeepAliveOptions, keepAlivePolicy } from "./policies/keepAlivePolicy";
import { tracingPolicy } from "./policies/tracingPolicy";
import { disableResponseDecompressionPolicy } from "./policies/disableResponseDecompressionPolicy";
import { ndJsonPolicy } from "./policies/ndJsonPolicy";

/**
* Options to configure a proxy for outgoing requests (Node.js only).
Expand Down Expand Up @@ -711,6 +712,10 @@ export function createPipelineFromOptions(
): ServiceClientOptions {
const requestPolicyFactories: RequestPolicyFactory[] = [];

if (pipelineOptions.sendStreamingJson) {
requestPolicyFactories.push(ndJsonPolicy());
}

let userAgentValue = undefined;
if (pipelineOptions.userAgentOptions && pipelineOptions.userAgentOptions.userAgentPrefix) {
const userAgentInfo: string[] = [];
Expand Down
31 changes: 31 additions & 0 deletions sdk/core/core-http/test/policies/ndJsonPolicyTests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { assert } from "chai";
import { RequestPolicyOptions } from "../../src/policies/requestPolicy";
import { WebResource } from "../../src/webResource";
import { HttpHeaders } from "../../src/httpHeaders";
import { ndJsonPolicy } from "../../src/policies/ndJsonPolicy";

describe("NdJsonPolicy", function() {
const returnOk = {
sendRequest: async (request: WebResource) => {
return {
request,
status: 200,
headers: new HttpHeaders()
};
}
};

const emptyPolicyOptions = new RequestPolicyOptions();

it("Formats arrays correctly", async function() {
const factory = ndJsonPolicy();
const policy = factory.create(returnOk, emptyPolicyOptions);
const request = new WebResource();
request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]);
const result = await policy.sendRequest(request);
assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`);
});
});
7 changes: 7 additions & 0 deletions sdk/core/core-https/review/core-https.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export interface HttpsClient {
export interface InternalPipelineOptions extends PipelineOptions {
decompressResponse?: boolean;
loggingOptions?: LogPolicyOptions;
sendStreamingJson?: boolean;
}

// @public
Expand All @@ -130,6 +131,12 @@ export interface LogPolicyOptions {
logger?: Debugger;
}

// @public
export function ndJsonPolicy(): PipelinePolicy;

// @public
export const ndJsonPolicyName = "ndJsonPolicy";

// @public
export interface Pipeline {
addPolicy(policy: PipelinePolicy, options?: AddPipelineOptions): void;
Expand Down
1 change: 1 addition & 0 deletions sdk/core/core-https/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ export {
BearerTokenAuthenticationPolicyOptions,
bearerTokenAuthenticationPolicyName
} from "./policies/bearerTokenAuthenticationPolicy";
export { ndJsonPolicy, ndJsonPolicyName } from "./policies/ndJsonPolicy";
10 changes: 10 additions & 0 deletions sdk/core/core-https/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { disableResponseDecompressionPolicy } from "./policies/disableResponseDe
import { proxyPolicy } from "./policies/proxyPolicy";
import { isNode } from "./util/helpers";
import { formDataPolicy } from "./policies/formDataPolicy";
import { ndJsonPolicy } from "./policies/ndJsonPolicy";

/**
* Policies are executed in phases.
Expand Down Expand Up @@ -428,6 +429,11 @@ export interface InternalPipelineOptions extends PipelineOptions {
* Configure whether to decompress response according to Accept-Encoding header (node-fetch only)
*/
decompressResponse?: boolean;

/**
* Send JSON Array payloads as NDJSON.
*/
sendStreamingJson?: boolean;
}

/**
Expand All @@ -437,6 +443,10 @@ export interface InternalPipelineOptions extends PipelineOptions {
export function createPipelineFromOptions(options: InternalPipelineOptions): Pipeline {
const pipeline = HttpsPipeline.create();

if (options.sendStreamingJson) {
pipeline.addPolicy(ndJsonPolicy());
}

if (isNode) {
pipeline.addPolicy(proxyPolicy(options.proxyOptions));

Expand Down
29 changes: 29 additions & 0 deletions sdk/core/core-https/src/policies/ndJsonPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { PipelineResponse, PipelineRequest, SendRequest } from "../interfaces";
import { PipelinePolicy } from "../pipeline";

/**
* The programmatic identifier of the keepAlivePolicy.
*/
export const ndJsonPolicyName = "ndJsonPolicy";

/**
* ndJsonPolicy is a policy used to control keep alive settings for every request.
*/
export function ndJsonPolicy(): PipelinePolicy {
return {
name: ndJsonPolicyName,
async sendRequest(request: PipelineRequest, next: SendRequest): Promise<PipelineResponse> {
// There currently isn't a good way to bypass the serializer
if (typeof request.body === "string" && request.body.startsWith("[")) {
const body = JSON.parse(request.body);
if (Array.isArray(body)) {
request.body = body.map((item) => JSON.stringify(item) + "\n").join("");
}
}
return next(request);
}
};
}
37 changes: 37 additions & 0 deletions sdk/core/core-https/test/ndJsonPolicy.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { assert } from "chai";
import * as sinon from "sinon";
import {
createPipelineRequest,
SendRequest,
PipelineResponse,
createHttpHeaders,
ndJsonPolicy
} from "../src";

describe("NdJsonPolicy", function() {
afterEach(function() {
sinon.restore();
});

it("Formats arrays correctly", async function() {
const request = createPipelineRequest({
url: "https://bing.com"
});
request.body = JSON.stringify([{ a: 1 }, { b: 2 }, { c: 3 }]);
const successResponse: PipelineResponse = {
headers: createHttpHeaders(),
request,
status: 200
};
const next = sinon.stub<Parameters<SendRequest>, ReturnType<SendRequest>>();
next.resolves(successResponse);

const policy = ndJsonPolicy();

const result = await policy.sendRequest(request, next);
assert.strictEqual(result.request.body, `{"a":1}\n{"b":2}\n{"c":3}\n`);
});
});

0 comments on commit ff3509f

Please sign in to comment.