Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Fix an issue where user error handler is not called #19189

Merged
merged 17 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

### Bugs Fixed

- Fixes an issue where receiver keeps trying to re-connect when subscribing to non-existent subscription.

### Other Changes

## 7.4.0 (2021-11-08)
Expand Down
18 changes: 17 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,23 @@ export class StreamingReceiver extends MessageReceiver {

this._lockRenewer?.stopAll(this);

if (receiver && !receiver.isItselfClosed()) {
let retryable: boolean = true;
if (receiverError) {
const sbError = translateServiceBusError(receiverError) as MessagingError;
retryable = sbError.retryable;
if (!retryable) {
logger.verbose(
`${this.logPrefix} non-recoverable error. Hence not calling detached from the _onAmqpClose() handler.`
);
this._messageHandlers().processError({
error: sbError,
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
if (receiver && !receiver.isItselfClosed() && retryable) {
await this.onDetached(receiverError);
} else {
logger.verbose(
Expand Down
100 changes: 90 additions & 10 deletions sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,20 @@ describe("ServiceBusClient live tests", () => {
await sbClient.test.after();
});

const testError = (err: Error | ServiceBusError, entityPath: string): void => {
const testError = (err: Error | ServiceBusError, entityPath?: string): void => {
if (!isServiceBusError(err)) {
should.equal(true, false, "Error expected to be instance of ServiceBusError");
} else {
should.equal(err.code, "MessagingEntityNotFound", "Error code is different than expected");
should.equal(
err.message.includes(
`The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.`
),
true
);
if (entityPath) {
should.equal(
err.message.includes(
`The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found.`
),
true,
`Expecting error message to contain "The messaging entity 'sb://${sbClient.fullyQualifiedNamespace}/${entityPath}' could not be found." but got ${err.message}`
);
}
errorWasThrown = true;
}
};
Expand All @@ -218,7 +221,7 @@ describe("ServiceBusClient live tests", () => {
should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving batch data from a non existing subscription", async function(): Promise<
it("throws error when receiving batch data from a non existing topic", async function(): Promise<
void
> {
const receiver = sbClient.createReceiver("some-topic-name", "some-subscription-name");
Expand All @@ -229,6 +232,29 @@ describe("ServiceBusClient live tests", () => {
should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving batch data from a non existing subscription", async function(): Promise<
void
> {
const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic);
if (!entityNames.topic) {
throw new Error("Expecting valid topic name");
}
const receiver = sbClient.createReceiver(entityNames.topic, "some-subscription-name");
await receiver.receiveMessages(1).catch((err) => {
testError(err);
console.log(err.message);
const namespace = sbClient.fullyQualifiedNamespace.split(".")[0];
const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`;
should.equal(
new RegExp(entityPattern).test(err.message),
true,
`Expect error message to contain pattern "${entityPattern}" but got ${err.message}`
);
});

should.equal(errorWasThrown, true, "Error thrown flag must be true");
});

it("throws error when receiving streaming data from a non existing queue", async function(): Promise<
void
> {
Expand Down Expand Up @@ -264,7 +290,7 @@ describe("ServiceBusClient live tests", () => {
await receiver.close();
});

it("throws error when receiving streaming data from a non existing subscription", async function(): Promise<
it("throws error when receiving streaming data from a non existing topic", async function(): Promise<
void
> {
const receiver = sbClient.createReceiver(
Expand All @@ -275,7 +301,7 @@ describe("ServiceBusClient live tests", () => {

receiver.subscribe({
async processMessage() {
throw "processMessage should not have been called when receive call is made from a non existing namespace";
throw "processMessage should not have been called when subscribing to a non existing topic";
},
async processError(args) {
const expected: Omit<ProcessErrorArgs, "error"> = {
Expand Down Expand Up @@ -306,6 +332,60 @@ describe("ServiceBusClient live tests", () => {

await receiver.close();
});

it("throws error when receiving streaming data from a non existing subscription", async function(): Promise<
void
> {
const entityNames = await sbClient.test.createTestEntities(TestClientType.PartitionedTopic);
if (!entityNames.topic) {
throw new Error("Expecting valid topic name");
}
const receiver = sbClient.createReceiver(
entityNames.topic,
"some-subscription-name"
) as ServiceBusReceiverImpl;
reduceRetries(receiver);

receiver.subscribe({
async processMessage() {
throw "processMessage should not have been called when receive call when subscribing to a non existing subscription";
},
async processError(args) {
const expected: Omit<ProcessErrorArgs, "error"> = {
errorSource: args.errorSource,
entityPath: args.entityPath,
fullyQualifiedNamespace: args.fullyQualifiedNamespace
};

expected.should.deep.equal({
errorSource: "receive",
entityPath: receiver.entityPath,
fullyQualifiedNamespace: sbClient.fullyQualifiedNamespace
} as Omit<ProcessErrorArgs, "error">);

testError(args.error);
const namespace = sbClient.fullyQualifiedNamespace.split(".")[0];
const entityPattern = `The messaging entity '${namespace}:topic:${entityNames.topic}.*|some-subscription-name`;
should.equal(
new RegExp(entityPattern).test(args.error.message),
true,
`Expect error message to contain pattern "${entityPattern}" but got ${args.error.message}`
);
deyaaeldeen marked this conversation as resolved.
Show resolved Hide resolved
}
});

should.equal(
await checkWithTimeout(
() => errorWasThrown === true,
1000,
CoreAmqpConstants.defaultOperationTimeoutInMs * 2 // arbitrary, just don't want it to be too short.
),
true,
"Error thrown flag must be true"
);

await receiver.close();
});
});

describe("Test ServiceBusClient with TokenCredentials", function(): void {
Expand Down