Skip to content

Commit

Permalink
[Service Bus] Session Receiver (Both batching and streaming) during a…
Browse files Browse the repository at this point in the history
… disconnect (Azure#13956)

### Issue Azure#8875

### What's in the PR
"disconnect"/refresh logic added before did not cover the session scenarios. 
This PR attempts to tackle the batching receiver for sessions upon "disconnect" scenarios to have a smooth-resolution/throw-errors based on the receiveMode.
Streaming receiver calls processError with SessionLockLost and closes the link

### receiveMessages - Scenarios to handle/test
- [x] throws "session lock has expired" after a disconnect
- [x] returns messages if drain is in progress (receiveAndDelete)
- [x] throws an error if drain is in progress (peekLock)
- [x] returns messages if receive in progress (receiveAndDelete)
- [x] throws an error if receive is in progress (peekLock)

### Streaming receiver
- [x] Test - calls processError and closes the link
 
### TODO
- [x] Cover the scenarios above
- [x] Tests
- [x] Bug fix - number of receivers - Azure#13990
- [x] Changelog
- [x] Streaming receiver - ~~beyond the scope of this PR~~ Azure#14212
- [ ] Bad log messages Azure#13989 - beyond the scope of this PR
- [ ] Stress testing plan for disconnect - Azure#13988 - beyond the scope of this PR

Fixes Azure#8875
  • Loading branch information
HarshaNalluru authored and vindicatesociety committed Apr 26, 2021
1 parent 9e5343e commit 0cce81b
Show file tree
Hide file tree
Showing 8 changed files with 712 additions and 334 deletions.
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
- Re-exports `RetryMode` for use when setting the `RetryOptions.mode` field
in `ServiceBusClientOptions`.
Resolves [#13166](https://github.com/Azure/azure-sdk-for-js/issues/13166).
- When receiving messages from sessions using either the `ServiceBusSessionReceiver.receiveMessages` method or the `ServiceBusSessionReceiver.subscribe` method, errors on the AMQP link or session were being handled well, but an error on the AMQP connection like a network disconnect was not being handled at all. This results in the promise returned by the `receiveMessages` method never getting fulfilled and the `subscribe` method not calling the user provided error handler.
This is now fixed in [#13956](https://github.com/Azure/azure-sdk-for-js/pull/13956) to throw `SessionLockLostError`. If using the `receiveMessages` method in `receiveAndDelete` mode, then the messages collected so far are returned to avoid data loss.

- Allow null as a value for the properties in `ServiceBusMessage.applicationProperties`.
Fixes [#14329](https://github.com/Azure/azure-sdk-for-js/issues/14329)

Expand Down
171 changes: 117 additions & 54 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import { ManagementClient } from "./core/managementClient";
import { formatUserAgentPrefix } from "./util/utils";
import { getRuntimeInfo } from "./util/runtimeInfo";
import { SharedKeyCredential } from "./servicebusSharedKeyCredential";
import { ReceiverType } from "./core/linkEntity";
import { NonSessionReceiverType, ReceiverType } from "./core/linkEntity";
import { ServiceBusError } from "./serviceBusError";

/**
* @internal
Expand Down Expand Up @@ -134,15 +135,16 @@ type ConnectionContextMethods = Omit<

/**
* @internal
* Helper method to call onDetached on the receivers from the connection context upon seeing an error.
* Helper method to call onDetached on the non-sessions batching and streaming receivers from the connection context upon seeing an error.
*/
async function callOnDetachedOnReceivers(
connectionContext: ConnectionContext,
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined,
receiverType: ReceiverType
receiverType: NonSessionReceiverType
): Promise<void[]> {
const detachCalls: Promise<void>[] = [];

// Iterating over non-sessions batching and streaming receivers
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (receiver && receiver.receiverType === receiverType) {
Expand All @@ -165,6 +167,54 @@ async function callOnDetachedOnReceivers(
);
}
}
return Promise.all(detachCalls);
}

/**
* @internal
* Helper method to call onDetached on the session receivers from the connection context upon seeing an error.
*/
async function callOnDetachedOnSessionReceivers(
connectionContext: ConnectionContext,
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined
): Promise<void[]> {
const getSessionError = (sessionId: string, entityPath: string) => {
const sessionInfo =
`The receiver for session "${sessionId}" in "${entityPath}" has been closed and can no longer be used. ` +
`Please create a new receiver using the "acceptSession" or "acceptNextSession" method on the ServiceBusClient.`;

const errorMessage =
contextOrConnectionError == null
? `Unknown error occurred on the AMQP connection while receiving messages. ` + sessionInfo
: `Error occurred on the AMQP connection while receiving messages. ` +
sessionInfo +
`\nMore info - \n${contextOrConnectionError}`;

const error = new ServiceBusError(errorMessage, "SessionLockLost");
error.retryable = false;
return error;
};

const detachCalls: Promise<void>[] = [];

for (const receiverName of Object.keys(connectionContext.messageSessions)) {
const receiver = connectionContext.messageSessions[receiverName];
logger.verbose(
"[%s] calling detached on %s receiver(sessions).",
connectionContext.connection.id,
receiver.name
);
detachCalls.push(
receiver.onDetached(getSessionError(receiver.sessionId, receiver.entityPath)).catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() on the session receiver(sessions) '%s'",
connectionContext.connection.id,
receiver.name
);
})
);
}

return Promise.all(detachCalls);
}
Expand Down Expand Up @@ -375,63 +425,76 @@ export namespace ConnectionContext {
await connectionContext.managementClients[entityPath].close();
}

// Calling onDetached on sender
if (!state.wasConnectionCloseCalled && state.numSenders) {
// We don't do recovery for the sender:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link.
// Call onDetached() on sender so that it can gracefully shutdown
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
`senders. We should not reconnect.`
);
const detachCalls: Promise<void>[] = [];
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
logger.verbose(
"[%s] calling detached on sender '%s'.",
connectionContext.connection.id,
sender.name
);
detachCalls.push(
sender.onDetached().catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() the sender '%s'",
connectionContext.connection.id,
sender.name
);
})
);
if (state.wasConnectionCloseCalled) {
// Do Nothing
} else {
// Calling onDetached on sender
if (state.numSenders) {
// We don't do recovery for the sender:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link.
// Call onDetached() on sender so that it can gracefully shutdown
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
`senders. We should not reconnect.`
);
const detachCalls: Promise<void>[] = [];
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
logger.verbose(
"[%s] calling detached on sender '%s'.",
connectionContext.connection.id,
sender.name
);
detachCalls.push(
sender.onDetached().catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() the sender '%s'",
connectionContext.connection.id,
sender.name
);
})
);
}
}
await Promise.all(detachCalls);
}
await Promise.all(detachCalls);
}

// Calling onDetached on batching receivers for the same reasons as sender
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
if (!state.wasConnectionCloseCalled && numBatchingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
`batching receivers. We should reconnect.`
);
// Calling onDetached on batching receivers for the same reasons as sender
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
if (numBatchingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
`batching receivers. We should not reconnect.`
);

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"batching"
);
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"batching"
);
}

// TODO:
// `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers".
// ...What to do for sessions (connectionContext.messageSessions) ??
}
// Calling onDetached on session receivers
const numSessionReceivers = getNumberOfReceivers(connectionContext, "session");
if (numSessionReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numSessionReceivers} ` +
`session receivers. We should close them.`
);

await callOnDetachedOnSessionReceivers(
connectionContext,
connectionError || contextError
);
}
}
await refreshConnection();
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
Expand Down
1 change: 0 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ export class BatchingReceiver extends MessageReceiver {
options: OperationOptionsBase
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context);

try {
logger.verbose(
"[%s] Receiver '%s', setting max concurrent calls to 0.",
Expand Down
10 changes: 7 additions & 3 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ export interface RequestResponseLinkOptions {
/**
* @internal
*/
export type ReceiverType =
export type NonSessionReceiverType =
| "batching" // batching receiver
| "streaming" // streaming receiver;
| "session"; // message session
| "streaming"; // streaming receiver

/**
* @internal
*/
export type ReceiverType = NonSessionReceiverType | "session"; // message session

/**
* @internal
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/serviceBusError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class ServiceBusError extends MessagingError {

/**
* Translates an error into either an Error or a ServiceBusError which provides a `reason` code that
* can be used by clients to programatically react to errors.
* can be used by clients to programmatically react to errors.
*
* If you are calling `@azure/core-amqp/translate` you should swap to using this function instead since it provides
* Service Bus specific handling of the error (falling back to default translate behavior otherwise).
Expand Down
34 changes: 32 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ export class MessageSession extends LinkEntity<Receiver> {
/**
* Closes the underlying AMQP receiver link.
*/
async close(): Promise<void> {
async close(error?: Error | AmqpError): Promise<void> {
try {
this._isReceivingMessagesForSubscriber = false;
if (this._sessionLockRenewalTimer) clearTimeout(this._sessionLockRenewalTimer);
Expand All @@ -546,7 +546,7 @@ export class MessageSession extends LinkEntity<Receiver> {

await super.close();

await this._batchingReceiverLite.terminate();
this._batchingReceiverLite.terminate(error);
} catch (err) {
logger.logError(
err,
Expand Down Expand Up @@ -762,6 +762,36 @@ export class MessageSession extends LinkEntity<Receiver> {
}
}

/**
* To be called when connection is disconnected to gracefully close ongoing receive request.
* @param connectionError - The connection error if any.
*/
async onDetached(connectionError: AmqpError | Error): Promise<void> {
logger.error(
translateServiceBusError(connectionError),
`${this.logPrefix} onDetached: closing link (session receiver will not reconnect)`
);
try {
// Notifying so that the streaming receiver knows about the error
this._notifyError({
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host,
error: translateServiceBusError(connectionError),
errorSource: "receive"
});
} catch (error) {
logger.error(
translateServiceBusError(error),
`${
this.logPrefix
} onDetached: unexpected error seen when tried calling "_notifyError" with ${translateServiceBusError(
connectionError
)}`
);
}
await this.close(connectionError);
}

/**
* Settles the message with the specified disposition.
* @param message - The ServiceBus Message that needs to be settled.
Expand Down
Loading

0 comments on commit 0cce81b

Please sign in to comment.