Skip to content

Commit

Permalink
[ServiceBus] Fix an issue where user error handler is not called (#19189
Browse files Browse the repository at this point in the history
)

when subscribing to a valid topic but an invalid or disabled subscription. Issue #18798

The problem is that in these cases, the service accepts our client's cbs claim negotiation, and the receiver link is
created. The [client receiver enters](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L539)
the `retryForeverFn` to call `_initAndAddCreditOperation()`, where the following

```ts
  await this._messageHandlers().postInitialize();
  this._receiverHelper.addCredit(this.maxConcurrentCalls);
```

will return successfully, despite that the fired-and-forgot `addCredit()` call would later leads to
`MessagingEntityDisabled` or `MessagingEntityNotFound` errors in the underlying link. Since there's no errors thrown in
our retry-forever loop, the `onErrors()` [callback](https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L541)
is not invoked. It is one place where the user error handler is called.

Because of the error, the link is detatched. We have code to re-establish the link when errors happen, so we call
`_subscribeImpl()` where the `retryForeverFn()` is called again. This goes on in an endless loop.

We used to invoke user error handler and would not attempt to re-establish connections when errors are considered
non-retryable. In PR #11973 we removed the classification of errors that `subscribe()` used and instead continues to
retry infinitely. We also removed the code to invoke user error handler. This PR adds code to invoke the user error
handler in `_onAmqpError()` so users have a chance to stop the infinite loop.

There's another problem. When users call `close()` on the subscription in their error handler,
`this._receiverHelper.suspend()` is called to suspend the receiver. However, when re-connecting we call
`this._receiverHelper.resume()` again in `_subscribeImpl()`. This essentially reset the receiver to be active and we
will not abort the attempt to initialize connection

https://github.com/Azure/azure-sdk-for-js/blob/14099039a8009d6d9687daf65d22a998e3ad7920/sdk/servicebus/service-bus/src/core/streamingReceiver.ts#L574-L579

To fix it, this PR moves the `resume()` call out. It is supposed to only called to enable receiver before
subscribing. It should not be called when we try to re-connect.

* Update ref doc on the receiver subscribe error handler

* Apply suggestions from code review

Co-authored-by: Ramya Rao <[email protected]>

* Bring back processError() call in _onSessionError() callback

* Clarify that we retry on all errors when streaming

and link to ref doc on service bus errors.

* Bring back comments for resume() call

Also fix linting issue. The linter insists "@" in ts-doc should be escaped.

The url still works after adding "\"

* Re-remove comments

Co-authored-by: Ramya Rao <[email protected]>
  • Loading branch information
jeremymeng and ramya-rao-a authored Feb 7, 2022
1 parent 6b7e478 commit 2b2b9a4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 14 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

### Bugs Fixed

- The `processError` callback to `subscribe()` was previously called only for errors on setting up the receiver, errors on message settlement or message lock renewal and not for errors on AMQP link or session. This is now fixed. [PR #19189](https://github.com/Azure/azure-sdk-for-js/pull/19189)

### Other Changes

## 7.4.0 (2021-11-08)
Expand Down
17 changes: 13 additions & 4 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ export class StreamingReceiver extends MessageReceiver {
sbError,
`${this.logPrefix} 'receiver_error' event occurred. The associated error is`
);
this._messageHandlers().processError({
error: sbError,
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host,
});
}
};

Expand All @@ -214,6 +220,12 @@ export class StreamingReceiver extends MessageReceiver {
sbError,
`${this.logPrefix} 'session_error' event occurred. The associated error is`
);
this._messageHandlers().processError({
error: sbError,
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host,
});
}
};

Expand Down Expand Up @@ -430,6 +442,7 @@ export class StreamingReceiver extends MessageReceiver {
});

try {
this._receiverHelper.resume();
return await this._subscribeImpl("subscribe");
} catch (err) {
// callers aren't going to be in a good position to forward this error properly
Expand Down Expand Up @@ -524,10 +537,6 @@ export class StreamingReceiver extends MessageReceiver {
*/
private async _subscribeImpl(caller: "detach" | "subscribe"): Promise<void> {
try {
// this allows external callers (ie: ServiceBusReceiver) to prevent concurrent `subscribe` calls
// by not starting new receiving options while this one has started.
this._receiverHelper.resume();

// we don't expect to ever get an error from retryForever but bugs
// do happen.
return await this._retryForeverFn({
Expand Down
12 changes: 12 additions & 0 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ export interface MessageHandlers {
processMessage(message: ServiceBusReceivedMessage): Promise<void>;
/**
* Handler that processes errors that occur during receiving.
*
* This handler will be called for any error that occurs in the receiver when
* - receiving the message, or
* - executing your `processMessage` callback, or
* - receiver is completing the message on your behalf after successfully running your `processMessage` callback and `autoCompleteMessages` is enabled
* - receiver is abandoning the message on your behalf if running your `processMessage` callback fails and `autoCompleteMessages` is enabled
* - receiver is renewing the lock on your behalf due to auto lock renewal feature being enabled
*
* Note that when receiving messages in a stream using `subscribe()`, the receiver will automatically retry receiving messages on all errors unless
* `close()` is called on the subscription. It is completely up to users to decide what errors are considered non-recoverable and to handle them
* accordingly in this callback.
* For a list of errors occurs within Service Bus, please refer to https://docs.microsoft.com/javascript/api/\@azure/service-bus/servicebuserror?view=azure-node-latest
* @param args - The error and additional context to indicate where
* the error originated.
*/
Expand Down
96 changes: 86 additions & 10 deletions sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,20 @@ describe("ServiceBusClient live tests", () => {
await sbClient.test.after();
});

const testError = (err: Error | ServiceBusError, entityPath: string): void => {
const testError = (err: Error | ServiceBusError, entityPath?: string): void => {
if (!isServiceBusError(err)) {
should.equal(true, false, "Error expected to be instance of ServiceBusError");
} else {
should.equal(err.code, "MessagingEntityNotFound", "Error code is different than expected");
should.equal(
err.message.includes(
`The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.`
),
true
);
if (entityPath) {
should.equal(
err.message.includes(
`The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.`
),
true,
`Expecting error message to contain "The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found." but got ${err.message}`
);
}
errorWasThrown = true;
}
};
Expand All @@ -212,7 +215,7 @@ describe("ServiceBusClient live tests", () => {
should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving batch data from a non existing subscription", async function (): Promise<void> {
it("throws error when receiving batch data from a non existing topic", async function (): Promise<void> {
const receiver = sbClient.createReceiver("some-topic-name", "some-subscription-name");
await receiver
.receiveMessages(1)
Expand All @@ -221,6 +224,27 @@ describe("ServiceBusClient live tests", () => {
should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving batch data from a non existing subscription", async function (): Promise<void> {
const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic);
if (!entityNames.topic) {
throw new Error("Expecting valid topic name");
}
const receiver = sbClient.createReceiver(entityNames.topic, "some-subscription-name");
await receiver.receiveMessages(1).catch((err) => {
testError(err);
console.log(err.message);
const namespace = sbClient.fullyQualifiedNamespace.split(".")[0];
const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`;
should.equal(
new RegExp(entityPattern).test(err.message),
true,
`Expect error message to contain pattern "${entityPattern}" but got ${err.message}`
);
});

should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving streaming data from a non existing queue", async function (): Promise<void> {
const receiver = sbClient.createReceiver("some-name");
reduceRetries(receiver);
Expand Down Expand Up @@ -254,7 +278,7 @@ describe("ServiceBusClient live tests", () => {
await receiver.close();
});

it("throws error when receiving streaming data from a non existing subscription", async function (): Promise<void> {
it("throws error when receiving streaming data from a non existing topic", async function (): Promise<void> {
const receiver = sbClient.createReceiver(
"some-topic-name",
"some-subscription-name"
Expand All @@ -263,7 +287,7 @@ describe("ServiceBusClient live tests", () => {

receiver.subscribe({
async processMessage() {
throw "processMessage should not have been called when receive call is made from a non existing namespace";
throw "processMessage should not have been called when subscribing to a non existing topic";
},
async processError(args) {
const expected: Omit<ProcessErrorArgs, "error"> = {
Expand Down Expand Up @@ -294,6 +318,58 @@ describe("ServiceBusClient live tests", () => {

await receiver.close();
});

it("throws error when receiving streaming data from a non existing subscription", async function (): Promise<void> {
const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic);
if (!entityNames.topic) {
throw new Error("Expecting valid topic name");
}
const receiver = sbClient.createReceiver(
entityNames.topic,
"some-subscription-name"
) as ServiceBusReceiverImpl;
reduceRetries(receiver);

receiver.subscribe({
async processMessage() {
throw "processMessage should not have been called when receive call when subscribing to a non existing subscription";
},
async processError(args) {
const expected: Omit<ProcessErrorArgs, "error"> = {
errorSource: args.errorSource,
entityPath: args.entityPath,
fullyQualifiedNamespace: args.fullyQualifiedNamespace,
};

expected.should.deep.equal({
errorSource: "receive",
entityPath: receiver.entityPath,
fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace,
} as Omit<ProcessErrorArgs, "error">);

testError(args.error);
const namespace = sbClient.fullyQualifiedNamespace.split(".")[0];
const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`;
should.equal(
new RegExp(entityPattern).test(args.error.message),
true,
`Expect error message to contain pattern "${entityPattern}" but got ${args.error.message}`
);
},
});

should.equal(
await checkWithTimeout(
() => errorWasThrown === true,
1000,
CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short.
),
true,
"Error thrown flag must be true"
);

await receiver.close();
});
});

describe("Test ServiceBusClient with TokenCredentials", function (): void {
Expand Down

0 comments on commit 2b2b9a4

Please sign in to comment.