From 2b2b9a4b52a08d95a3c46f9560a920b1d7a4c7a3 Mon Sep 17 00:00:00 2001 From: Jeremy Meng Date: Mon, 7 Feb 2022 11:10:09 -0800 Subject: [PATCH] [ServiceBus] Fix an issue where user error handler is not called (#19189) when subscribing to a valid topic but an invalid or disabled subscription. Issue https://github.com/Azure/azure-sdk-for-js/issues/18798 The problem is that in these cases, the service accepts our client's cbs claim negotiation, and the receiver link is created. The [client receiver enters](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L539) the `retryForeverFn` to call `_initAndAddCreditOperation()`, where the following ```ts await this._messageHandlers().postInitialize(); this._receiverHelper.addCredit(this.maxConcurrentCalls); ``` will return successfully, despite that the fired-and-forgot `addCredit()` call would later leads to `MessagingEntityDisabled` or `MessagingEntityNotFound` errors in the underlying link. Since there's no errors thrown in our retry-forever loop, the `onErrors()` [callback](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L541) is not invoked. It is one place where the user error handler is called. Because of the error, the link is detatched. We have code to re-establish the link when errors happen, so we call `_subscribeImpl()` where the `retryForeverFn()` is called again. This goes on in an endless loop. We used to invoke user error handler and would not attempt to re-establish connections when errors are considered non-retryable. In PR #11973 we removed the classification of errors that `subscribe()` used and instead continues to retry infinitely. We also removed the code to invoke user error handler. This PR adds code to invoke the user error handler in `_onAmqpError()` so users have a chance to stop the infinite loop. There's another problem. When users call `close()` on the subscription in their error handler, `this._receiverHelper.suspend()` is called to suspend the receiver. However, when re-connecting we call `this._receiverHelper.resume()` again in `_subscribeImpl()`. This essentially reset the receiver to be active and we will not abort the attempt to initialize connection https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L574-L579 To fix it, this PR moves the `resume()` call out. It is supposed to only called to enable receiver before subscribing. It should not be called when we try to re-connect. * Update ref doc on the receiver subscribe error handler * Apply suggestions from code review Co-authored-by: Ramya Rao * Bring back processError() call in _onSessionError() callback * Clarify that we retry on all errors when streaming and link to ref doc on service bus errors. * Bring back comments for resume() call Also fix linting issue. The linter insists "@" in ts-doc should be escaped. The url still works after adding "\" * Re-remove comments Co-authored-by: Ramya Rao --- sdk/servicebus/service-bus/CHANGELOG.md | 2 + .../service-bus/src/core/streamingReceiver.ts | 17 +++- sdk/servicebus/service-bus/src/models.ts | 12 +++ .../test/internal/serviceBusClient.spec.ts | 96 +++++++++++++++++-- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 3effd347d2a4..e94657aaaf73 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -11,6 +11,8 @@ ### Bugs Fixed +- The `processError` callback to `subscribe()` was previously called only for errors on setting up the receiver, errors on message settlement or message lock renewal and not for errors on AMQP link or session. This is now fixed. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189) + ### Other Changes ## 7.4.0 (2021-11-08) diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index b6dbd9b3d791..520a91bd88d1 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -203,6 +203,12 @@ export class StreamingReceiver extends MessageReceiver { sbError, `${this.logPrefix} 'receiver_error' event occurred. The associated error is` ); + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host, + }); } }; @@ -214,6 +220,12 @@ export class StreamingReceiver extends MessageReceiver { sbError, `${this.logPrefix} 'session_error' event occurred. The associated error is` ); + this._messageHandlers().processError({ + error: sbError, + errorSource: "receive", + entityPath: this.entityPath, + fullyQualifiedNamespace: this._context.config.host, + }); } }; @@ -430,6 +442,7 @@ export class StreamingReceiver extends MessageReceiver { }); try { + this._receiverHelper.resume(); return await this._subscribeImpl("subscribe"); } catch (err) { // callers aren't going to be in a good position to forward this error properly @@ -524,10 +537,6 @@ export class StreamingReceiver extends MessageReceiver { */ private async _subscribeImpl(caller: "detach" | "subscribe"): Promise { try { - // this allows external callers (ie: ServiceBusReceiver) to prevent concurrent `subscribe` calls - // by not starting new receiving options while this one has started. - this._receiverHelper.resume(); - // we don't expect to ever get an error from retryForever but bugs // do happen. return await this._retryForeverFn({ diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 0d890d9d8f34..448afb17f10a 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -62,6 +62,18 @@ export interface MessageHandlers { processMessage(message: ServiceBusReceivedMessage): Promise; /** * Handler that processes errors that occur during receiving. + * + * This handler will be called for any error that occurs in the receiver when + * - receiving the message, or + * - executing your `processMessage` callback, or + * - receiver is completing the message on your behalf after successfully running your `processMessage` callback and `autoCompleteMessages` is enabled + * - receiver is abandoning the message on your behalf if running your `processMessage` callback fails and `autoCompleteMessages` is enabled + * - receiver is renewing the lock on your behalf due to auto lock renewal feature being enabled + * + * Note that when receiving messages in a stream using `subscribe()`, the receiver will automatically retry receiving messages on all errors unless + * `close()` is called on the subscription. It is completely up to users to decide what errors are considered non-recoverable and to handle them + * accordingly in this callback. + * For a list of errors occurs within Service Bus, please refer to https://docs.microsoft.com/javascript/api/\@azure/service-bus/servicebuserror?view=azure-node-latest * @param args - The error and additional context to indicate where * the error originated. */ diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 9cc4e41209c7..c505ee61cff1 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -190,17 +190,20 @@ describe("ServiceBusClient live tests", () => { await sbClient.test.after(); }); - const testError = (err: Error | ServiceBusError, entityPath: string): void => { + const testError = (err: Error | ServiceBusError, entityPath?: string): void => { if (!isServiceBusError(err)) { should.equal(true, false, "Error expected to be instance of ServiceBusError"); } else { should.equal(err.code, "MessagingEntityNotFound", "Error code is different than expected"); - should.equal( - err.message.includes( - `The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.` - ), - true - ); + if (entityPath) { + should.equal( + err.message.includes( + `The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.` + ), + true, + `Expecting error message to contain "The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found." but got ${err.message}` + ); + } errorWasThrown = true; } }; @@ -212,7 +215,7 @@ describe("ServiceBusClient live tests", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); - it("throws error when receiving batch data from a non existing subscription", async function (): Promise { + it("throws error when receiving batch data from a non existing topic", async function (): Promise { const receiver = sbClient.createReceiver("some-topic-name", "some-subscription-name"); await receiver .receiveMessages(1) @@ -221,6 +224,27 @@ describe("ServiceBusClient live tests", () => { should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); + it("throws error when receiving batch data from a non existing subscription", async function (): Promise { + const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); + if (!entityNames.topic) { + throw new Error("Expecting valid topic name"); + } + const receiver = sbClient.createReceiver(entityNames.topic, "some-subscription-name"); + await receiver.receiveMessages(1).catch((err) => { + testError(err); + console.log(err.message); + const namespace = sbClient.fullyQualifiedNamespace.split(".")[0]; + const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`; + should.equal( + new RegExp(entityPattern).test(err.message), + true, + `Expect error message to contain pattern "${entityPattern}" but got ${err.message}` + ); + }); + + should.equal(errorWasThrown, true, "Error thrown flag must be true"); + }); + it("throws error when receiving streaming data from a non existing queue", async function (): Promise { const receiver = sbClient.createReceiver("some-name"); reduceRetries(receiver); @@ -254,7 +278,7 @@ describe("ServiceBusClient live tests", () => { await receiver.close(); }); - it("throws error when receiving streaming data from a non existing subscription", async function (): Promise { + it("throws error when receiving streaming data from a non existing topic", async function (): Promise { const receiver = sbClient.createReceiver( "some-topic-name", "some-subscription-name" @@ -263,7 +287,7 @@ describe("ServiceBusClient live tests", () => { receiver.subscribe({ async processMessage() { - throw "processMessage should not have been called when receive call is made from a non existing namespace"; + throw "processMessage should not have been called when subscribing to a non existing topic"; }, async processError(args) { const expected: Omit = { @@ -294,6 +318,58 @@ describe("ServiceBusClient live tests", () => { await receiver.close(); }); + + it("throws error when receiving streaming data from a non existing subscription", async function (): Promise { + const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic); + if (!entityNames.topic) { + throw new Error("Expecting valid topic name"); + } + const receiver = sbClient.createReceiver( + entityNames.topic, + "some-subscription-name" + ) as ServiceBusReceiverImpl; + reduceRetries(receiver); + + receiver.subscribe({ + async processMessage() { + throw "processMessage should not have been called when receive call when subscribing to a non existing subscription"; + }, + async processError(args) { + const expected: Omit = { + errorSource: args.errorSource, + entityPath: args.entityPath, + fullyQualifiedNamespace: args.fullyQualifiedNamespace, + }; + + expected.should.deep.equal({ + errorSource: "receive", + entityPath: receiver.entityPath, + fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace, + } as Omit); + + testError(args.error); + const namespace = sbClient.fullyQualifiedNamespace.split(".")[0]; + const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`; + should.equal( + new RegExp(entityPattern).test(args.error.message), + true, + `Expect error message to contain pattern "${entityPattern}" but got ${args.error.message}` + ); + }, + }); + + should.equal( + await checkWithTimeout( + () => errorWasThrown === true, + 1000, + CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short. + ), + true, + "Error thrown flag must be true" + ); + + await receiver.close(); + }); }); describe("Test ServiceBusClient with TokenCredentials", function (): void {