Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Fix MaxListenersExceeded for management client #11738

5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

- `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)

- Internal improvement - For the operations depending on `$management` link such as peek or lock renewals, the listeners for the "sender_error" and "receiver_error" events were added to the link for each new request made before the link is initialized which would have resulted in too many listeners and a warning such as `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 sender_error listeners added to [Sender]. Use emittr.setMaxListeners() to increase limit`(same for `receiver_error`). This has been improved such that the listeners are reused.
[PR 11738](https://github.com/Azure/azure-sdk-for-js/pull/11738)

### Breaking changes

- The `createBatch` method on the sender is renamed to `createMesageBatch`
- The `createBatch` method on the sender is renamed to `createMessageBatch`
- The interface `CreateBatchOptions` followed by the options that are passed to the `createBatch` method is renamed to `CreateMessageBatchOptions`
- The `tryAdd` method on the message batch object is renamed to `tryAddMessage`
- `ServiceBusMessage` interface updates:
Expand Down
58 changes: 34 additions & 24 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import Long from "long";
import {
EventContext,
ReceiverEvents,
ReceiverOptions,
message as RheaMessageUtil,
SenderEvents,
SenderOptions,
generate_uuid,
string_to_uuid,
types,
Typed
Typed,
ReceiverEvents
} from "rhea-promise";
import {
AmqpMessage,
Expand Down Expand Up @@ -226,53 +225,64 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
name: this.replyTo,
target: { address: this.replyTo },
onSessionError: (context: EventContext) => {
const ehError = translate(context.session!.error!);
const sbError = translate(context.session!.error!);
managementClientLogger.logError(
ehError,
sbError,
`${this.logPrefix} An error occurred on the session for request/response links for $management`
);
}
};
const sropt: SenderOptions = { target: { address: this.address } };
const sropt: SenderOptions = {
target: { address: this.address },
onError: (context: EventContext) => {
const ehError = translate(context.sender!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management sender link`
);
}
};

// Even if multiple parallel requests reach here, the initLink secures a lock
// to ensure there won't be multiple initializations
await this.initLink(
{
senderOptions: sropt,
receiverOptions: rxopt
},
abortSignal
);

this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => {
const ehError = translate(context.sender!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management sender link`
);
});
this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const ehError = translate(context.receiver!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management receiver link`
);
});
} catch (err) {
err = translate(err);
managementClientLogger.logError(
err,
`${this.logPrefix} An error occured while establishing the $management links`
`${this.logPrefix} An error occurred while establishing the $management links`
);
throw err;
}
}

protected createRheaLink(options: RequestResponseLinkOptions): Promise<RequestResponseLink> {
return RequestResponseLink.create(
protected async createRheaLink(
options: RequestResponseLinkOptions
): Promise<RequestResponseLink> {
const rheaLink = await RequestResponseLink.create(
this._context.connection,
options.senderOptions,
options.receiverOptions
);
// Attach listener for the `receiver_error` events to log the errors.

// "message" event listener is added in core-amqp.
// "rhea" doesn't allow setting only the "onError" handler in the options if it is not accompanied by an "onMessage" handler.
// Hence, not passing onError handler in the receiver options, adding a handler below.
rheaLink.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const ehError = translate(context.receiver!.error!);
managementClientLogger.logError(
ehError,
`${this.logPrefix} An error occurred on the $management receiver link`
);
});
return rheaLink;
}

/**
Expand Down