From e4ffdcab624ea256bfa2fe25621d14ecba4a7fcd Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 10 Dec 2021 12:20:24 -0800 Subject: [PATCH 01/12] Add tests for valid topic but non-existent subscription and change `testError()` to only verify non empty `entityPath`, thus allow verifying error messages with different format. --- .../test/internal/serviceBusClient.spec.ts | 105 ++++++++++++++++-- 1 file changed, 94 insertions(+), 11 deletions(-) diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 5f52c251a463..4acb65061e22 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -182,7 +182,9 @@ describe("ServiceBusClient live tests", () => { }); }); - describe("Errors with non existing Queue/Topic/Subscription", async function(): Promise { + describe.only("Errors with non existing Queue/Topic/Subscription", async function(): Promise< + void + > { let sbClient: ServiceBusClientForTests; let errorWasThrown: boolean; beforeEach(() => { @@ -194,17 +196,20 @@ describe("ServiceBusClient live tests", () => { await sbClient.test.after(); }); - const testError = (err: Error | ServiceBusError, entityPath: string): void => { + const testError = (err: Error | ServiceBusError, entityPath?: string): void => { if (!isServiceBusError(err)) { should.equal(true, false, "Error expected to be instance of ServiceBusError"); } else { should.equal(err.code, "MessagingEntityNotFound", "Error code is different than expected"); - should.equal( - err.message.includes( - `The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.` - ), - true - ); + if (entityPath) { + should.equal( + err.message.includes( + `The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.` + ), + true, + `Expecting error message to contain "The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found." but got ${err.message}` + ); + } errorWasThrown = true; } }; @@ -218,7 +223,7 @@ describe("ServiceBusClient live tests", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); - it("throws error when receiving batch data from a non existing subscription", async function(): Promise< + it("throws error when receiving batch data from a non existing topic", async function(): Promise< void > { const receiver = sbClient.createReceiver("some-topic-name", "some-subscription-name"); @@ -229,6 +234,29 @@ describe("ServiceBusClient live tests", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); + it.only("throws error when receiving batch data from a non existing subscription", async function(): Promise< + void + > { + const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); + if (!entityNames.topic) { + throw new Error("Expecting valid topic name"); + } + const receiver = sbClient.createReceiver(entityNames.topic, "some-subscription-name"); + await receiver.receiveMessages(1).catch((err) => { + testError(err); + console.log(err.message); + const namespace = sbClient.fullyQualifiedNamespace.split(".")[0]; + const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`; + should.equal( + new RegExp(entityPattern).test(err.message), + true, + `Expect error message to contain pattern "${entityPattern}" but got ${err.message}` + ); + }); + + should.equal(errorWasThrown, true, "Error thrown flag must be true"); + }); + it("throws error when receiving streaming data from a non existing queue", async function(): Promise< void > { @@ -264,7 +292,7 @@ describe("ServiceBusClient live tests", () => { await receiver.close(); }); - it("throws error when receiving streaming data from a non existing subscription", async function(): Promise< + it("throws error when receiving streaming data from a non existing topic", async function(): Promise< void > { const receiver = sbClient.createReceiver( @@ -275,7 +303,7 @@ describe("ServiceBusClient live tests", () => { receiver.subscribe({ async processMessage() { - throw "processMessage should not have been called when receive call is made from a non existing namespace"; + throw "processMessage should not have been called when subscribing to a non existing topic"; }, async processError(args) { const expected: Omit = { @@ -306,6 +334,61 @@ describe("ServiceBusClient live tests", () => { await receiver.close(); }); + + it.only("throws error when receiving streaming data from a non existing subscription", async function(): Promise< + void + > { + const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); + if (!entityNames.topic) { + throw new Error("Expecting valid topic name"); + } + const receiver = sbClient.createReceiver( + entityNames.topic, + "some-subscription-name" + ) as ServiceBusReceiverImpl; + reduceRetries(receiver); + + receiver.subscribe({ + async processMessage() { + throw "processMessage should not have been called when receive call when subscribing to a non existing subscription"; + }, + async processError(args) { + const expected: Omit = { + errorSource: args.errorSource, + entityPath: args.entityPath, + fullyQualifiedNamespace: args.fullyQualifiedNamespace + }; + + expected.should.deep.equal({ + errorSource: "receive", + entityPath: receiver.entityPath, + fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace + } as Omit); + + testError(args.error); + console.log(args.error.message); + const namespace = sbClient.fullyQualifiedNamespace.split(".")[0]; + const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`; + should.equal( + new RegExp(entityPattern).test(args.error.message), + true, + `Expect error message to contain pattern "${entityPattern}" but got ${args.error.message}` + ); + } + }); + + should.equal( + await checkWithTimeout( + () => errorWasThrown === true, + 1000, + CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short. + ), + true, + "Error thrown flag must be true" + ); + + await receiver.close(); + }); }); describe("Test ServiceBusClient with TokenCredentials", function(): void { From df3a4cfb2441413c9fcacfac2ceef9be2030b6d6 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 10 Dec 2021 13:03:37 -0800 Subject: [PATCH 02/12] Don't try to re-connect when the errors are not retry-able and invoke user error handler in that case. --- sdk/servicebus/service-bus/CHANGELOG.md | 2 ++ .../service-bus/src/core/streamingReceiver.ts | 19 ++++++++++++++++++- .../test/internal/serviceBusClient.spec.ts | 8 +++----- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index c8054ea94a8a..9560b9c10ca6 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -10,6 +10,8 @@ ### Bugs Fixed +- Fixes an issue where receiver keeps trying to re-connect when subscribing to non-existent subscription. + ### Other Changes ## 7.4.0 (2021-11-08) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index a3f81a6c15e6..87ae695ada0d 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -156,7 +156,24 @@ export class StreamingReceiver extends MessageReceiver { this._lockRenewer?.stopAll(this); - if (receiver && !receiver.isItselfClosed()) { + let retryable: boolean = true; + if (receiverError) { + const sbError = translateServiceBusError(receiverError) as MessagingError; + retryable = sbError.retryable; + if (!retryable) { + logger.verbose( + `${this.logPrefix} non-recoverable error. Hence not calling detached from the _onAmqpClose() handler.` + ); + // await this.close(); + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host + }); + } + } + if (receiver && !receiver.isItselfClosed() && retryable) { await this.onDetached(receiverError); } else { logger.verbose( diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 4acb65061e22..1cd6e93a93f0 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -182,9 +182,7 @@ describe("ServiceBusClient live tests", () => { }); }); - describe.only("Errors with non existing Queue/Topic/Subscription", async function(): Promise< - void - > { + describe("Errors with non existing Queue/Topic/Subscription", async function(): Promise { let sbClient: ServiceBusClientForTests; let errorWasThrown: boolean; beforeEach(() => { @@ -234,7 +232,7 @@ describe("ServiceBusClient live tests", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); - it.only("throws error when receiving batch data from a non existing subscription", async function(): Promise< + it("throws error when receiving batch data from a non existing subscription", async function(): Promise< void > { const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); @@ -335,7 +333,7 @@ describe("ServiceBusClient live tests", () => { await receiver.close(); }); - it.only("throws error when receiving streaming data from a non existing subscription", async function(): Promise< + it("throws error when receiving streaming data from a non existing subscription", async function(): Promise< void > { const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); From 47e0422b4dd6021eccd65a69ddc5a2b26cc5e53e Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 10 Dec 2021 13:10:48 -0800 Subject: [PATCH 03/12] Remove console.log --- .../service-bus/test/internal/serviceBusClient.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 1cd6e93a93f0..8c93fd381254 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -364,7 +364,6 @@ describe("ServiceBusClient live tests", () => { } as Omit); testError(args.error); - console.log(args.error.message); const namespace = sbClient.fullyQualifiedNamespace.split(".")[0]; const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`; should.equal( From 2b7f57f28ea6e3ae67b05888d033b7851fc00ade Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 10 Dec 2021 13:25:02 -0800 Subject: [PATCH 04/12] Remove commented code --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 84ae1e29ec50..4e545ee8f4b9 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -164,7 +164,6 @@ export class StreamingReceiver extends MessageReceiver { logger.verbose( `${this.logPrefix} non-recoverable error. Hence not calling detached from the _onAmqpClose() handler.` ); - // await this.close(); this._messageHandlers().processError({ error: sbError, errorSource: "receive", From 422ded8ceb0a211bc64a00fbd1fdf2bf86e6779c Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Fri, 17 Dec 2021 14:40:05 -0800 Subject: [PATCH 05/12] Fix an issue where user error handler is not called when subscribing to a valid topic but an invalid or disabled subscription. The problem is that in these two cases, the service accepts our client's cbs claim negotiation, and the receiver link is created. The [client receiver enters](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L539) the `retryForeverFn` to call `_initAndAddCreditOperation()`, where the following ```ts await this._messageHandlers().postInitialize(); this._receiverHelper.addCredit(this.maxConcurrentCalls); ``` will return successfully, despite that the fired-and-forgot `addCredit()` call would later leads to `MessagingEntityDisabled` or `MessagingEntityNotFound` errors in the underlying link. Since there's no errors thrown in our retry-forever loop, the `onErrors()` [callback](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L541) is not invoked. It is where the user error handler is called. Because of the error, the link is detatched. We have code to re-initializing the link when errors happen, so we call `_subscribeImpl()` where the `retryForeverFn()` is called again. This goes on in an endless loop. This PR adds code to invoke the user error handler in `_onAmqpError()` when the error code is `MessagingEntityDisabled` or `MessagingEntityNotFound` so users have a chance to stop the infinite loop. There's another problem. When users call `close()` on the subscription in their error handler, `this._receiverHelper.suspend()` is called to suspend the receiver. However, when re-connecting we call `this._receiverHelper.resume()` again in `_subscribeImpl()`. This essentially reset the receiver to be active and we will not abort the attempt to initialize connection https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L578 To fix it, this PR moves the `resume()` call out. It is supposed to only called to enable receiver before subscribing. It should not be called when we try to re-connect. --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- .../service-bus/src/core/streamingReceiver.ts | 34 +++++++------------ 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 7b449e0760ba..6501e803e1b4 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -11,7 +11,7 @@ ### Bugs Fixed -- Fixes an issue where receiver keeps trying to re-connect when subscribing to non-existent subscription. +- Fixes an issue where user error handler is not called when subscribing to a non-existent subscription. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189) ### Other Changes diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 4e545ee8f4b9..06061986fa30 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -156,23 +156,7 @@ export class StreamingReceiver extends MessageReceiver { this._lockRenewer?.stopAll(this); - let retryable: boolean = true; - if (receiverError) { - const sbError = translateServiceBusError(receiverError) as MessagingError; - retryable = sbError.retryable; - if (!retryable) { - logger.verbose( - `${this.logPrefix} non-recoverable error. Hence not calling detached from the _onAmqpClose() handler.` - ); - this._messageHandlers().processError({ - error: sbError, - errorSource: "receive", - entityPath: this.entityPath, - fullyQualifiedNamespace: this._context.config.host - }); - } - } - if (receiver && !receiver.isItselfClosed() && retryable) { + if (receiver && !receiver.isItselfClosed()) { await this.onDetached(receiverError); } else { logger.verbose( @@ -219,6 +203,17 @@ export class StreamingReceiver extends MessageReceiver { sbError, `${this.logPrefix} 'receiver_error' event occurred. The associated error is` ); + if ( + sbError?.code && + ["MessagingEntityDisabled", "MessagingEntityNotFound"].includes(sbError.code) + ) { + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host + }); + } } }; @@ -446,6 +441,7 @@ export class StreamingReceiver extends MessageReceiver { }); try { + this._receiverHelper.resume(); return await this._subscribeImpl("subscribe"); } catch (err) { // callers aren't going to be in a good position to forward this error properly @@ -540,10 +536,6 @@ export class StreamingReceiver extends MessageReceiver { */ private async _subscribeImpl(caller: "detach" | "subscribe"): Promise { try { - // this allows external callers (ie: ServiceBusReceiver) to prevent concurrent `subscribe` calls - // by not starting new receiving options while this one has started. - this._receiverHelper.resume(); - // we don't expect to ever get an error from retryForever but bugs // do happen. return await this._retryForeverFn({ From 44bc87801df058a65f8aca1af088b713f2eb3164 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 13:01:56 -0800 Subject: [PATCH 06/12] Invoke user error handler for all errors instead of limiting to the two specific entity errors. --- .../service-bus/src/core/streamingReceiver.ts | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 7ea905356337..27c80fb75c95 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -203,17 +203,12 @@ export class StreamingReceiver extends MessageReceiver { sbError, `${this.logPrefix} 'receiver_error' event occurred. The associated error is` ); - if ( - sbError?.code && - ["MessagingEntityDisabled", "MessagingEntityNotFound"].includes(sbError.code) - ) { - this._messageHandlers().processError({ - error: sbError, - errorSource: "receive", - entityPath: this.entityPath, - fullyQualifiedNamespace: this._context.config.host, - }); - } + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host, + }); } }; From 9cbf6a594501c72dd989e2716cb8752017cb5885 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 13:26:02 -0800 Subject: [PATCH 07/12] Update ref doc on the receiver subscribe error handler --- sdk/servicebus/service-bus/src/models.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 0d890d9d8f34..9183078e5027 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -62,6 +62,11 @@ export interface MessageHandlers { processMessage(message: ServiceBusReceivedMessage): Promise; /** * Handler that processes errors that occur during receiving. + * + * This handler will be called for any error that occurs in the receiver when + * - receiving the message, or + * - executing your `processMessage` callback, or + * - the receiver automatically completes or abandons the message. * @param args - The error and additional context to indicate where * the error originated. */ From d22c81fb91760f3a7ef097ac97449383906b31a3 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 14:15:50 -0800 Subject: [PATCH 08/12] Apply suggestions from code review Co-authored-by: Ramya Rao --- sdk/servicebus/service-bus/CHANGELOG.md | 2 +- sdk/servicebus/service-bus/src/models.ts | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 6501e803e1b4..e94657aaaf73 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -11,7 +11,7 @@ ### Bugs Fixed -- Fixes an issue where user error handler is not called when subscribing to a non-existent subscription. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189) +- The `processError` callback to `subscribe()` was previously called only for errors on setting up the receiver, errors on message settlement or message lock renewal and not for errors on AMQP link or session. This is now fixed. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189) ### Other Changes diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 9183078e5027..e5db3145b55a 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -66,7 +66,9 @@ export interface MessageHandlers { * This handler will be called for any error that occurs in the receiver when * - receiving the message, or * - executing your `processMessage` callback, or - * - the receiver automatically completes or abandons the message. + * - receiver is completing the message on your behalf after successfully running your `processMessage` callback and `autoCompleteMessages` is enabled + * - receiver is abandoning the message on your behalf if running your `processMessage` callback fails and `autoCompleteMessages` is enabled + * - receiver is renewing the lock on your behalf due to auto lock renewal feature being enabled * @param args - The error and additional context to indicate where * the error originated. */ From 2fc41bc13e847520ca36837aa2e5ba5064ef3e66 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 14:48:55 -0800 Subject: [PATCH 09/12] Bring back processError() call in _onSessionError() callback --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 27c80fb75c95..520a91bd88d1 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -220,6 +220,12 @@ export class StreamingReceiver extends MessageReceiver { sbError, `${this.logPrefix} 'session_error' event occurred. The associated error is` ); + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host, + }); } }; From b82cc544f8968136e997e77f1f426ffcb9ab38a2 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 17:03:10 -0800 Subject: [PATCH 10/12] Clarify that we retry on all errors when streaming and link to ref doc on service bus errors. --- sdk/servicebus/service-bus/src/models.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index e5db3145b55a..8dadab92cbc4 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -69,6 +69,11 @@ export interface MessageHandlers { * - receiver is completing the message on your behalf after successfully running your `processMessage` callback and `autoCompleteMessages` is enabled * - receiver is abandoning the message on your behalf if running your `processMessage` callback fails and `autoCompleteMessages` is enabled * - receiver is renewing the lock on your behalf due to auto lock renewal feature being enabled + * + * Note that when receiving messages in a stream using `subscribe()`, the receiver will automatically retry receiving messages on all errors unless + * `close()` is called on the subscription. It is completely up to users to decide what errors are considered non-recoverable and to handle them + * accordingly in this callback. + * For a list of errors occurs within Service Bus, please refer to https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebuserror?view=azure-node-latest * @param args - The error and additional context to indicate where * the error originated. */ From 598878381d291b9b9e9cea4162a411b94ef7c992 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Thu, 3 Feb 2022 18:01:59 -0800 Subject: [PATCH 11/12] Bring back comments for resume() call Also fix linting issue. The linter insists "@" in ts-doc should be escaped. The url still works after adding "\" --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 2 ++ sdk/servicebus/service-bus/src/models.ts | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 520a91bd88d1..c5be5443f374 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -442,6 +442,8 @@ export class StreamingReceiver extends MessageReceiver { }); try { + // this allows external callers (ie: ServiceBusReceiver) to prevent concurrent `subscribe` calls + // by not starting new receiving options while this one has started. this._receiverHelper.resume(); return await this._subscribeImpl("subscribe"); } catch (err) { diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 8dadab92cbc4..448afb17f10a 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -73,7 +73,7 @@ export interface MessageHandlers { * Note that when receiving messages in a stream using `subscribe()`, the receiver will automatically retry receiving messages on all errors unless * `close()` is called on the subscription. It is completely up to users to decide what errors are considered non-recoverable and to handle them * accordingly in this callback. - * For a list of errors occurs within Service Bus, please refer to https://docs.microsoft.com/javascript/api/@azure/service-bus/servicebuserror?view=azure-node-latest + * For a list of errors occurs within Service Bus, please refer to https://docs.microsoft.com/javascript/api/\@azure/service-bus/servicebuserror?view=azure-node-latest * @param args - The error and additional context to indicate where * the error originated. */ From afb26f546a66d2520c74e8736740827e581fcec6 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Mon, 7 Feb 2022 10:49:01 -0800 Subject: [PATCH 12/12] Re-remove comments --- sdk/servicebus/service-bus/src/core/streamingReceiver.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index c5be5443f374..520a91bd88d1 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -442,8 +442,6 @@ export class StreamingReceiver extends MessageReceiver { }); try { - // this allows external callers (ie: ServiceBusReceiver) to prevent concurrent `subscribe` calls - // by not starting new receiving options while this one has started. this._receiverHelper.resume(); return await this._subscribeImpl("subscribe"); } catch (err) {