Skip to content

Commit

Permalink
[ServiceBus] Add optional boolean skipParsingBodyAsJson option (#18692
Browse files Browse the repository at this point in the history
)

* [ServiceBus] Add optional boolean `skipParsingBodyAsJson` option

to `ReceiveMessagesOptions`, `SubscribeOptions`, and
`ServiceBusSessionReceiverOptions`. This allows users to control whether the SDK
should skip parsing message body as Json object. By default the SDK will attempt
parsing message body as Json object.

While updating code, I also fixed sample unit test which is using out-dated
paths thus not finding any sample code files, and removed two `await` that are
redundant.

Resolves #18630

* Add optional `skipParsingBodyAsJson` boolean property

to `ServiceBusReceiverOptions` and remove those on `ReceiveMessagesOptions` and
`SubscribeOptions`

* Remove commented code that was trying to decode again

but the message body is already decoded, and _rawAmqpMessage is assigned in
`fromRheaMessage()`

* Remove double decoding

message.body has been decoded already above by `fromRheaMessage()` call.

* Update CHANGELOG and package version

* Add comments

* Update sdk/servicebus/service-bus/CHANGELOG.md

Co-authored-by: Deyaaeldeen Almahallawi <[email protected]>

Co-authored-by: Deyaaeldeen Almahallawi <[email protected]>
  • Loading branch information
jeremymeng and deyaaeldeen authored Dec 10, 2021
1 parent d278d35 commit 3bdb490
Show file tree
Hide file tree
Showing 27 changed files with 213 additions and 107 deletions.
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

### Features Added

- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR 18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)
- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR #18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)
- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. By default, the client attempts to parse message body as JSON object, and this new parameter controls whether the client should skip performing this parsing. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692)

### Breaking Changes

Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ export interface ServiceBusReceiver {
export interface ServiceBusReceiverOptions {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
subQueueType?: "deadLetter" | "transferDeadLetter";
}

Expand Down Expand Up @@ -489,6 +490,7 @@ export interface ServiceBusSessionReceiver extends ServiceBusReceiver {
export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
maxAutoLockRenewalDurationInMs?: number;
receiveMode?: "peekLock" | "receiveAndDelete";
skipParsingBodyAsJson?: boolean;
}

// @public
Expand Down
9 changes: 6 additions & 3 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export class BatchingReceiver extends MessageReceiver {

return this.link;
},
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);
}

Expand Down Expand Up @@ -248,7 +249,8 @@ export class BatchingReceiverLite {
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
private _receiveMode: ReceiveMode,
_skipParsingBodyAsJson: boolean
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

Expand All @@ -257,7 +259,8 @@ export class BatchingReceiverLite {
context.message!,
context.delivery!,
true,
this._receiveMode
this._receiveMode,
_skipParsingBodyAsJson
);
};

Expand Down
16 changes: 12 additions & 4 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ export interface SendManagementRequestOptions extends SendRequestOptions {
* This is used for service side optimization.
*/
associatedLinkName?: string;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -499,9 +505,10 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const messages = result.body.messages as { message: Buffer }[];
for (const msg of messages) {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = fromRheaMessage(decodedMessage as any);

message.body = defaultDataTransformer.decode(message.body);
const message = fromRheaMessage(
decodedMessage as any,
options?.skipParsingBodyAsJson ?? false
);
messageList.push(message);
this._lastPeekedSequenceNumber = message.sequenceNumber!;
}
Expand Down Expand Up @@ -813,7 +820,8 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false,
receiveMode
receiveMode,
options?.skipParsingBodyAsJson ?? false
);
messageList.push(message);
}
Expand Down
6 changes: 6 additions & 0 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export interface ReceiveOptions extends SubscribeOptions {
* maxAutoRenewLockDurationInMs value when they created their receiver.
*/
lockRenewer: LockRenewer | undefined;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson: boolean;
}

/**
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ export class StreamingReceiver extends MessageReceiver {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
options.skipParsingBodyAsJson ?? false
);

this._lockRenewer?.start(this, bMessage, (err) => {
Expand Down
16 changes: 11 additions & 5 deletions sdk/servicebus/service-bus/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ export const defaultDataTransformer = {
* of the AMQP mesage.
*
* @param body - The AMQP message body
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body content.
* @returns decoded body or the given body as-is.
*/
decode(body: unknown): unknown {
decode(body: unknown, skipParsingBodyAsJson: boolean): unknown {
let actualContent = body;

if (isRheaAmqpSection(body)) {
actualContent = body.content;
}

return tryToJsonDecode(actualContent);
return skipParsingBodyAsJson ? actualContent : tryToJsonDecode(actualContent);
},
/**
* A function that takes the body property from an AMQP message, which can come from either
Expand All @@ -103,16 +104,21 @@ export const defaultDataTransformer = {
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body.
* @returns The decoded/raw body and the body type.
*/
decodeWithType(
body: unknown | RheaAmqpSection
body: unknown | RheaAmqpSection,
skipParsingBodyAsJson: boolean
): { body: unknown; bodyType: "data" | "sequence" | "value" } {
try {
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
return {
body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content),
bodyType: "data"
};
case sequenceSectionTypeCode:
// typecode:
// handle sequences
Expand All @@ -125,7 +131,7 @@ export const defaultDataTransformer = {
// not sure - we have to try to infer the proper bodyType and content
if (isBuffer(body)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
return { body: tryToJsonDecode(body), bodyType: "data" };
return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" };
} else {
return { body: body, bodyType: "value" };
}
Expand Down
12 changes: 12 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ export interface ServiceBusReceiverOptions {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down Expand Up @@ -233,6 +239,12 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase {
* - **To disable autolock renewal**, set this to `0`.
*/
maxAutoLockRenewalDurationInMs?: number;
/**
* Option to disable the client from running JSON.parse() on the message body when receiving the message.
* Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you
* prefer to work directly with the bytes present in the message body than have the client attempt to parse it.
*/
skipParsingBodyAsJson?: boolean;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
public entityPath: string,
public receiveMode: "peekLock" | "receiveAndDelete",
maxAutoRenewLockDurationInMs: number,
private skipParsingBodyAsJson: boolean,
retryOptions: RetryOptions = {}
) {
throwErrorIfConnectionClosed(_context);
Expand Down Expand Up @@ -357,7 +358,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
const receiveOptions: ReceiveOptions = {
maxConcurrentCalls: 0,
receiveMode: this.receiveMode,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
};
this._batchingReceiver = this._createBatchingReceiver(
this._context,
Expand Down Expand Up @@ -507,7 +509,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
...options,
receiveMode: this.receiveMode,
retryOptions: this._retryOptions,
lockRenewer: this._lockRenewer
lockRenewer: this._lockRenewer,
skipParsingBodyAsJson: this.skipParsingBodyAsJson
});

// this ensures that if the outer service bus client is closed that this receiver is cleaned up.
Expand Down
7 changes: 5 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export class ServiceBusClient {
entityPathWithSubQueue,
receiveMode,
maxLockAutoRenewDurationInMs,
options?.skipParsingBodyAsJson ?? false,
this._clientOptions.retryOptions
);
}
Expand Down Expand Up @@ -321,7 +322,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down Expand Up @@ -406,7 +408,8 @@ export class ServiceBusClient {
maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs,
receiveMode,
abortSignal: options?.abortSignal,
retryOptions: this._clientOptions.retryOptions
retryOptions: this._clientOptions.retryOptions,
skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false
}
);

Expand Down
28 changes: 9 additions & 19 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
*/
export function fromRheaMessage(
rheaMessage: RheaMessage,
skipParsingBodyAsJson: boolean,
delivery?: Delivery,
shouldReorderLockToken?: boolean
): ServiceBusReceivedMessage {
Expand All @@ -525,7 +526,10 @@ export function fromRheaMessage(
};
}

const { body, bodyType } = defaultDataTransformer.decodeWithType(rheaMessage.body);
const { body, bodyType } = defaultDataTransformer.decodeWithType(
rheaMessage.body,
skipParsingBodyAsJson
);

const sbmsg: ServiceBusMessage = {
body: body
Expand Down Expand Up @@ -896,13 +900,16 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
msg: RheaMessage,
delivery: Delivery,
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode
receiveMode: ReceiveMode,
skipParsingBodyAsJson: boolean
) {
const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage(
msg,
skipParsingBodyAsJson,
delivery,
shouldReorderLockToken
);
this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy
Object.assign(this, restOfMessageProps);
this.state = restOfMessageProps.state; // to suppress error TS2564: Property 'state' has no initializer and is not definitely assigned in the constructor.

Expand All @@ -911,23 +918,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
if (receiveMode === "receiveAndDelete") {
this.lockToken = undefined;
}

let actualBodyType:
| ReturnType<typeof defaultDataTransformer["decodeWithType"]>["bodyType"]
| undefined = undefined;

if (msg.body) {
try {
const result = defaultDataTransformer.decodeWithType(msg.body);

this.body = result.body;
actualBodyType = result.bodyType;
} catch (err) {
this.body = undefined;
}
}
this._rawAmqpMessage = _rawAmqpMessage;
this._rawAmqpMessage.bodyType = actualBodyType;
this.delivery = delivery;
}

Expand Down
13 changes: 11 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export type MessageSessionOptions = Pick<
> & {
receiveMode?: ReceiveMode;
retryOptions: RetryOptions | undefined;
skipParsingBodyAsJson: boolean;
};

/**
Expand Down Expand Up @@ -180,6 +181,11 @@ export class MessageSession extends LinkEntity<Receiver> {

private _totalAutoLockRenewDuration: number;

/**
* Whether to prevent the client from running JSON.parse() on the message body when receiving the message.
*/
private skipParsingBodyAsJson: boolean;

public get receiverHelper(): ReceiverHelper {
return this._receiverHelper;
}
Expand Down Expand Up @@ -375,6 +381,7 @@ export class MessageSession extends LinkEntity<Receiver> {
this.autoComplete = false;
if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId;
this.receiveMode = options.receiveMode || "peekLock";
this.skipParsingBodyAsJson = options.skipParsingBodyAsJson;
this.maxAutoRenewDurationInMs =
options.maxAutoLockRenewalDurationInMs != null
? options.maxAutoLockRenewalDurationInMs
Expand All @@ -389,7 +396,8 @@ export class MessageSession extends LinkEntity<Receiver> {
async (_abortSignal?: AbortSignalLike): Promise<MinimalReceiver> => {
return this.link!;
},
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

// setting all the handlers
Expand Down Expand Up @@ -628,7 +636,8 @@ export class MessageSession extends LinkEntity<Receiver> {
context.message!,
context.delivery!,
true,
this.receiveMode
this.receiveMode,
this.skipParsingBodyAsJson
);

try {
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/test/internal/node/samples.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ describe("Ensure typescript samples use published package", function(): void {
}

it("Ensure TypeScript samples use published package", async () => {
const pattern = "samples/typescript/src/**/*.ts";
const pattern = "samples/v7/typescript/src/**/*.ts";
const files = await globAsync(pattern);
testSamples(files, new RegExp('from\\s"@azure/service-bus"'));
});

it("Ensure JavaScript samples use published package", async () => {
const pattern = "samples/javascript/**/*.js";
const pattern = "samples/v7/javascript/**/*.js";
const files = await globAsync(pattern);
testSamples(files, new RegExp('=\\srequire\\("@azure/service-bus"\\)'));
});
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/test/internal/retries.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ describe("Retries - Receive methods", () => {
"dummyEntityPath",
{
lockRenewer: undefined,
receiveMode: "peekLock"
receiveMode: "peekLock",
skipParsingBodyAsJson: false
}
);
batchingReceiver.isOpen = () => true;
Expand Down
Loading

0 comments on commit 3bdb490

Please sign in to comment.