From 6cda731d513a038c6492b3fdf8fd983f2f2d1cde Mon Sep 17 00:00:00 2001 From: Harsha Nalluru Date: Mon, 25 Jan 2021 17:54:37 -0800 Subject: [PATCH] [Service Bus] Bug fix: batching receiver upon a disconnect (#13374) ## Bug - `receiveMessages` method returned zero messages upon a connection refresh caused by a network disconnect(simulated in the test). - `OperationTimeout` error on message settlement after the disconnect These two failures made the "disconnect" test occasionally fail for the batching receiver. ## Cause `onDetached` on the batchReceivers is called 300ms after the connection refresh causing the recovered receive link to be closed. - If the message returned from the service took close to 300ms until reached the receiver since the refresh, `onDetached` is called to close the link leading to the loss of messages. - If the 300ms had elapsed right before the settlement, we'd see the OperationTimeout error on settlement since the receive link is closed. Investigated here https://github.com/Azure/azure-sdk-for-js/pull/13339 ## Fix - Call `onDetached` for the batching receivers before calling the refresh connection - And retain calling `onDetached` for the streaming receivers after the refresh connection ## Changes in the PR - [x] Refactored "calling onDetached" part - [x] Removed the 300ms delay since we don't see the utility - [x] Changelog - [x] TODO: What to do for sessions? - [x] Needs more investigation https://github.com/Azure/azure-sdk-for-js/pull/13374#discussion_r564139864, will be handled at #8875 --- sdk/servicebus/service-bus/CHANGELOG.md | 3 + .../service-bus/src/connectionContext.ts | 143 +++++++++++++----- .../service-bus/src/core/batchingReceiver.ts | 2 +- 3 files changed, 110 insertions(+), 38 deletions(-) diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 26a04420a083..1652db71699f 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -4,6 +4,9 @@ - [Bug Fix] Response from the `ServiceBusAdministrationClient.getSubscriptionRuntimeProperties()` method had the message count properties to be zero. The bug has been fixed in [#13229](https://github.com/Azure/azure-sdk-for-js/pull/13229) +- [Bug Fix] Fixed a race condition where the `ServiceBusReceiver.receiveMessages` might lose messages and not return any if triggered right after the recovery from a network disruption. + The same race condition could also have led to an OperationTimeout error if attempted the message settlement. + [#13374](https://github.com/Azure/azure-sdk-for-js/pull/13374) ## 7.0.2 (2021-01-13) diff --git a/sdk/servicebus/service-bus/src/connectionContext.ts b/sdk/servicebus/service-bus/src/connectionContext.ts index 7657d8f941e1..7a34f4583d94 100644 --- a/sdk/servicebus/service-bus/src/connectionContext.ts +++ b/sdk/servicebus/service-bus/src/connectionContext.ts @@ -6,13 +6,18 @@ import { packageJsonInfo } from "./util/constants"; import { ConnectionConfig, ConnectionContextBase, - Constants, - CreateConnectionContextBaseParameters, - delay + CreateConnectionContextBaseParameters } from "@azure/core-amqp"; import { TokenCredential } from "@azure/core-auth"; import { ServiceBusClientOptions } from "./constructorHelpers"; -import { Connection, ConnectionEvents, EventContext, OnAmqpEvent } from "rhea-promise"; +import { + AmqpError, + Connection, + ConnectionError, + ConnectionEvents, + EventContext, + OnAmqpEvent +} from "rhea-promise"; import { MessageSender } from "./core/messageSender"; import { MessageSession } from "./session/messageSession"; import { MessageReceiver } from "./core/messageReceiver"; @@ -20,6 +25,7 @@ import { ManagementClient } from "./core/managementClient"; import { formatUserAgentPrefix } from "./util/utils"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { SharedKeyCredential } from "./servicebusSharedKeyCredential"; +import { ReceiverType } from "./core/linkEntity"; /** * @internal @@ -130,6 +136,66 @@ type ConnectionContextMethods = Omit< > & ThisType; +/** + * @internal + * @hidden + * Helper method to call onDetached on the receivers from the connection context upon seeing an error. + */ +async function callOnDetachedOnReceivers( + connectionContext: ConnectionContext, + contextOrConnectionError: Error | ConnectionError | AmqpError | undefined, + receiverType: ReceiverType +) { + const detachCalls: Promise[] = []; + + for (const receiverName of Object.keys(connectionContext.messageReceivers)) { + const receiver = connectionContext.messageReceivers[receiverName]; + if (receiver && receiver.receiverType === receiverType) { + logger.verbose( + "[%s] calling detached on %s receiver '%s'.", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + detachCalls.push( + receiver.onDetached(contextOrConnectionError).catch((err) => { + logger.logError( + err, + "[%s] An error occurred while calling onDetached() on the %s receiver '%s'", + connectionContext.connection.id, + receiver.receiverType, + receiver.name + ); + }) + ); + } + } + + return Promise.all(detachCalls); +} + +/** + * @internal + * @hidden + * Helper method to get the number of receivers of specified type from the connectionContext. + */ +async function getNumberOfReceivers( + connectionContext: Pick, + receiverType: ReceiverType +) { + if (receiverType === "session") { + const receivers = connectionContext.messageSessions; + return Object.keys(receivers).length; + } + const receivers = connectionContext.messageReceivers; + const receiverNames = Object.keys(receivers); + const count = receiverNames.reduce( + (acc, name) => (receivers[name].receiverType === receiverType ? ++acc : acc), + 0 + ); + return count; +} + /** * @internal * @hidden @@ -325,7 +391,6 @@ export namespace ConnectionContext { // by cleaning up the timers and closing the links. // We don't call onDetached for sender after `refreshConnection()` // because any new send calls that potentially initialize links would also get affected if called later. - // TODO: do the same for batching receiver logger.verbose( `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + `senders. We should not reconnect.` @@ -354,47 +419,51 @@ export namespace ConnectionContext { await Promise.all(detachCalls); } + // Calling onDetached on batching receivers for the same reasons as sender + const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching"); + if (!state.wasConnectionCloseCalled && numBatchingReceivers) { + logger.verbose( + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` + + `batching receivers. We should reconnect.` + ); + + // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation + await callOnDetachedOnReceivers( + connectionContext, + connectionError || contextError, + "batching" + ); + + // TODO: + // `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers". + // ...What to do for sessions (connectionContext.messageSessions) ?? + } + await refreshConnection(connectionContext); waitForConnectionRefreshResolve(); waitForConnectionRefreshPromise = undefined; // The connection should always be brought back up if the sdk did not call connection.close() // and there was at least one receiver link on the connection before it went down. logger.verbose("[%s] state: %O", connectionContext.connectionId, state); - if (!state.wasConnectionCloseCalled && state.numReceivers) { + + // Calling onDetached on streaming receivers + const numStreamingReceivers = getNumberOfReceivers(connectionContext, "streaming"); + if (!state.wasConnectionCloseCalled && numStreamingReceivers) { logger.verbose( - `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + - `receivers. We should reconnect.` + `[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numStreamingReceivers} ` + + `streaming receivers. We should reconnect.` ); - await delay(Constants.connectionReconnectDelay); - - const detachCalls: Promise[] = []; - - // Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation - // and streaming receivers can decide whether to reconnect or not. - for (const receiverName of Object.keys(connectionContext.messageReceivers)) { - const receiver = connectionContext.messageReceivers[receiverName]; - if (receiver) { - logger.verbose( - "[%s] calling detached on %s receiver '%s'.", - connectionContext.connection.id, - receiver.receiverType, - receiver.name - ); - detachCalls.push( - receiver.onDetached(connectionError || contextError).catch((err) => { - logger.logError( - err, - "[%s] An error occurred while calling onDetached() on the %s receiver '%s'", - connectionContext.connection.id, - receiver.receiverType, - receiver.name - ); - }) - ); - } - } - await Promise.all(detachCalls); + // Calling `onDetached()` on streaming receivers after the refreshConnection() since `onDetached()` would + // recover the streaming receivers and that would only be possible after the connection is refreshed. + // + // This is different from the batching receiver since `onDetached()` for the batching receiver would + // return the outstanding messages and close the receive link. + await callOnDetachedOnReceivers( + connectionContext, + connectionError || contextError, + "streaming" + ); } }; diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index f3203ec0c78f..45bbfee55aaf 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver { ); } - await this._batchingReceiverLite.close(connectionError); + this._batchingReceiverLite.close(connectionError); } /**