Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Add optional boolean skipParsingBodyAsJson option #18692

Merged
merged 10 commits into from
Dec 10, 2021
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

### Features Added

- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR 18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)
- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR #18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)
- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. This allows users to control whether the SDK should skip parsing message body as Json object. By default, the SDK will attempt to parse message body as Json object. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692)
jeremymeng marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ export interface ServiceBusReceiver {
export interface ServiceBusReceiverOptions {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
subQueueType?: "deadLetter" | "transferDeadLetter";
}

Expand Down Expand Up @@ -489,6 +490,7 @@ export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
}

// @public
Expand Down
9 changes: 6 additions & 3 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export class BatchingReceiver extends MessageReceiver {

return this.link;
},
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);
}

Expand Down Expand Up @@ -248,7 +249,8 @@ export class BatchingReceiverLite {
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

Expand All @@ -257,7 +259,8 @@ export class BatchingReceiverLite {
context.message!,
context.delivery!,
true,
this._receiveMode
this._receiveMode,
_skipParsingBodyAsJson
);
};

Expand Down
16 changes: 12 additions & 4 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ export interface SendManagementRequestOptions extends SendRequestOptions {
* This is used for service side optimization.
*/
associatedLinkName?: string;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -499,9 +505,10 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const messages = result.body.messages as { message: Buffer }[];
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = fromRheaMessage(decodedMessage as any);

message.body = defaultDataTransformer.decode(message.body);
const message = fromRheaMessage(
decodedMessage as any,
options?.skipParsingBodyAsJson ?? false
jeremymeng marked this conversation as resolved.
Show resolved Hide resolved
);
messageList.push(message);
this._lastPeekedSequenceNumber = message.sequenceNumber!;
}
Expand Down Expand Up @@ -813,7 +820,8 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false,
receiveMode
receiveMode,
options?.skipParsingBodyAsJson ?? false
);
messageList.push(message);
}
Expand Down
6 changes: 6 additions & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export interface ReceiveOptions extends SubscribeOptions {
* maxAutoRenewLockDurationInMs value when they created their receiver.
*/
lockRenewer: LockRenewer | undefined;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson: boolean;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ export class StreamingReceiver extends MessageReceiver {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);

this._lockRenewer?.start(this, bMessage, (err) => {
Expand Down
16 changes: 11 additions & 5 deletions sdk/servicebus/service-bus/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ export const defaultDataTransformer = {
* of the AMQP mesage.
*
* @param body - The AMQP message body
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body content.
* @returns decoded body or the given body as-is.
*/
decode(body: unknown): unknown {
decode(body: unknown, skipParsingBodyAsJson: boolean): unknown {
let actualContent = body;

if (isRheaAmqpSection(body)) {
actualContent = body.content;
}

return tryToJsonDecode(actualContent);
return skipParsingBodyAsJson ? actualContent : tryToJsonDecode(actualContent);
},
/**
* A function that takes the body property from an AMQP message, which can come from either
Expand All @@ -103,16 +104,21 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body.
* @returns The decoded/raw body and the body type.
*/
decodeWithType(
body: unknown | RheaAmqpSection
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: "data" | "sequence" | "value" } {
try {
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
// typecode:
// handle sequences
Expand All @@ -125,7 +131,7 @@ export const defaultDataTransformer = {
// not sure - we have to try to infer the proper bodyType and content
if (isBuffer(body)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
} else {
return { body: body, bodyType: "value" };
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ export interface ServiceBusReceiverOptions {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -233,6 +239,12 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
public entityPath: string,
public receiveMode: "peekLock" | "receiveAndDelete",
maxAutoRenewLockDurationInMs: number,
private skipParsingBodyAsJson: boolean,
retryOptions: RetryOptions = {}
) {
throwErrorIfConnectionClosed(_context);
Expand Down Expand Up @@ -357,7 +358,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
const receiveOptions: ReceiveOptions = {
maxConcurrentCalls: 0,
receiveMode: this.receiveMode,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
};
this._batchingReceiver = this._createBatchingReceiver(
this._context,
Expand Down Expand Up @@ -507,7 +509,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
...options,
receiveMode: this.receiveMode,
retryOptions: this._retryOptions,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
});

// this ensures that if the outer service bus client is closed that this receiver is cleaned up.
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class ServiceBusClient {
entityPathWithSubQueue,
receiveMode,
maxLockAutoRenewDurationInMs,
options?.skipParsingBodyAsJson ?? false,
this._clientOptions.retryOptions
);
}
Expand Down Expand Up @@ -321,7 +322,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down Expand Up @@ -406,7 +408,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down
28 changes: 9 additions & 19 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
*/
export function fromRheaMessage(
rheaMessage: RheaMessage,
skipParsingBodyAsJson: boolean,
delivery?: Delivery,
shouldReorderLockToken?: boolean
): ServiceBusReceivedMessage {
Expand All @@ -525,7 +526,10 @@ export function fromRheaMessage(
};
}

const { body, bodyType } = defaultDataTransformer.decodeWithType(rheaMessage.body);
const { body, bodyType } = defaultDataTransformer.decodeWithType(
rheaMessage.body,
skipParsingBodyAsJson
);

const sbmsg: ServiceBusMessage = {
body: body
Expand Down Expand Up @@ -896,13 +900,16 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
msg: RheaMessage,
delivery: Delivery,
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode
receiveMode: ReceiveMode,
skipParsingBodyAsJson: boolean
) {
const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage(
msg,
skipParsingBodyAsJson,
delivery,
shouldReorderLockToken
);
this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy
Object.assign(this, restOfMessageProps);
this.state = restOfMessageProps.state; // to suppress error TS2564: Property 'state' has no initializer and is not definitely assigned in the constructor.

Expand All @@ -911,23 +918,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
if (receiveMode === "receiveAndDelete") {
this.lockToken = undefined;
}

let actualBodyType:
| ReturnType<typeof defaultDataTransformer["decodeWithType"]>["bodyType"]
| undefined = undefined;

if (msg.body) {
try {
const result = defaultDataTransformer.decodeWithType(msg.body);

this.body = result.body;
actualBodyType = result.bodyType;
} catch (err) {
this.body = undefined;
}
}
this._rawAmqpMessage = _rawAmqpMessage;
this._rawAmqpMessage.bodyType = actualBodyType;
this.delivery = delivery;
}

Expand Down
13 changes: 11 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export type MessageSessionOptions = Pick<
> & {
receiveMode?: ReceiveMode;
retryOptions: RetryOptions | undefined;
skipParsingBodyAsJson: boolean;
};

/**
Expand Down Expand Up @@ -180,6 +181,11 @@ export class MessageSession extends LinkEntity<Receiver> {

private _totalAutoLockRenewDuration: number;

/**
* Whether to prevent the client from running JSON.parse() on the message body when receiving the message.
*/
private skipParsingBodyAsJson: boolean;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
Expand Down Expand Up @@ -375,6 +381,7 @@ export class MessageSession extends LinkEntity<Receiver> {
this.autoComplete = false;
if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId;
this.receiveMode = options.receiveMode || "peekLock";
this.skipParsingBodyAsJson = options.skipParsingBodyAsJson;
this.maxAutoRenewDurationInMs =
options.maxAutoLockRenewalDurationInMs != null
? options.maxAutoLockRenewalDurationInMs
Expand All @@ -389,7 +396,8 @@ export class MessageSession extends LinkEntity<Receiver> {
async (_abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
return this.link!;
},
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

// setting all the handlers
Expand Down Expand Up @@ -628,7 +636,8 @@ export class MessageSession extends LinkEntity<Receiver> {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ describe("Ensure typescript samples use published package", function(): void {
}

it("Ensure TypeScript samples use published package", async () => {
const pattern = "samples/typescript/src/**/*.ts";
const pattern = "samples/v7/typescript/src/**/*.ts";
const files = await globAsync(pattern);
testSamples(files, new RegExp('from\\s"@azure/service-bus"'));
});

it("Ensure JavaScript samples use published package", async () => {
const pattern = "samples/javascript/**/*.js";
const pattern = "samples/v7/javascript/**/*.js";
const files = await globAsync(pattern);
testSamples(files, new RegExp('=\\srequire\\("@azure/service-bus"\\)'));
});
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/test/internal/retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ describe("Retries - Receive methods", () => {
"dummyEntityPath",
{
lockRenewer: undefined,
receiveMode: "peekLock"
receiveMode: "peekLock",
skipParsingBodyAsJson: false
}
);
batchingReceiver.isOpen = () => true;
Expand Down
Loading