Skip to content

Commit

Permalink
[Service Bus] Bug fix: batching receiver upon a disconnect (Azure#13374)
Browse files Browse the repository at this point in the history
## Bug
- `receiveMessages` method returned zero messages upon a connection refresh caused by a network disconnect(simulated in the test). 
- `OperationTimeout` error on message settlement after the disconnect

These two failures made the "disconnect" test occasionally fail for the batching receiver.

## Cause
`onDetached` on the batchReceivers is called 300ms after the connection refresh causing the recovered receive link to be closed.

- If the message returned from the service took close to 300ms until reached the receiver since the refresh, `onDetached` is called to close the link leading to the loss of messages.
- If the 300ms had elapsed right before the settlement, we'd see the OperationTimeout error on settlement since the receive link is closed.

Investigated here Azure#13339

## Fix
- Call `onDetached` for the batching receivers before calling the refresh connection
- And retain calling `onDetached` for the streaming receivers after the refresh connection

## Changes in the PR
- [x] Refactored "calling onDetached" part
- [x] Removed the 300ms delay since we don't see the utility
- [x] Changelog
- [x] TODO: What to do for sessions?
   - [x] Needs more investigation Azure#13374 (comment), will be handled at Azure#8875
  • Loading branch information
HarshaNalluru authored Jan 26, 2021
1 parent 579d065 commit 6cda731
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 38 deletions.
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

- [Bug Fix] Response from the `ServiceBusAdministrationClient.getSubscriptionRuntimeProperties()` method had the message count properties to be zero.
The bug has been fixed in [#13229](https://github.com/Azure/azure-sdk-for-js/pull/13229)
- [Bug Fix] Fixed a race condition where the `ServiceBusReceiver.receiveMessages` might lose messages and not return any if triggered right after the recovery from a network disruption.
The same race condition could also have led to an OperationTimeout error if attempted the message settlement.
[#13374](https://github.com/Azure/azure-sdk-for-js/pull/13374)

## 7.0.2 (2021-01-13)

Expand Down
143 changes: 106 additions & 37 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@ import { packageJsonInfo } from "./util/constants";
import {
ConnectionConfig,
ConnectionContextBase,
Constants,
CreateConnectionContextBaseParameters,
delay
CreateConnectionContextBaseParameters
} from "@azure/core-amqp";
import { TokenCredential } from "@azure/core-auth";
import { ServiceBusClientOptions } from "./constructorHelpers";
import { Connection, ConnectionEvents, EventContext, OnAmqpEvent } from "rhea-promise";
import {
AmqpError,
Connection,
ConnectionError,
ConnectionEvents,
EventContext,
OnAmqpEvent
} from "rhea-promise";
import { MessageSender } from "./core/messageSender";
import { MessageSession } from "./session/messageSession";
import { MessageReceiver } from "./core/messageReceiver";
import { ManagementClient } from "./core/managementClient";
import { formatUserAgentPrefix } from "./util/utils";
import { getRuntimeInfo } from "./util/runtimeInfo";
import { SharedKeyCredential } from "./servicebusSharedKeyCredential";
import { ReceiverType } from "./core/linkEntity";

/**
* @internal
Expand Down Expand Up @@ -130,6 +136,66 @@ type ConnectionContextMethods = Omit<
> &
ThisType<ConnectionContextInternalMembers>;

/**
* @internal
* @hidden
* Helper method to call onDetached on the receivers from the connection context upon seeing an error.
*/
async function callOnDetachedOnReceivers(
connectionContext: ConnectionContext,
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined,
receiverType: ReceiverType
) {
const detachCalls: Promise<void>[] = [];

for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (receiver && receiver.receiverType === receiverType) {
logger.verbose(
"[%s] calling detached on %s receiver '%s'.",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
detachCalls.push(
receiver.onDetached(contextOrConnectionError).catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() on the %s receiver '%s'",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
})
);
}
}

return Promise.all(detachCalls);
}

/**
* @internal
* @hidden
* Helper method to get the number of receivers of specified type from the connectionContext.
*/
async function getNumberOfReceivers(
connectionContext: Pick<ConnectionContext, "messageReceivers" | "messageSessions">,
receiverType: ReceiverType
) {
if (receiverType === "session") {
const receivers = connectionContext.messageSessions;
return Object.keys(receivers).length;
}
const receivers = connectionContext.messageReceivers;
const receiverNames = Object.keys(receivers);
const count = receiverNames.reduce(
(acc, name) => (receivers[name].receiverType === receiverType ? ++acc : acc),
0
);
return count;
}

/**
* @internal
* @hidden
Expand Down Expand Up @@ -325,7 +391,6 @@ export namespace ConnectionContext {
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
// TODO: do the same for batching receiver
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
`senders. We should not reconnect.`
Expand Down Expand Up @@ -354,47 +419,51 @@ export namespace ConnectionContext {
await Promise.all(detachCalls);
}

// Calling onDetached on batching receivers for the same reasons as sender
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
if (!state.wasConnectionCloseCalled && numBatchingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
`batching receivers. We should reconnect.`
);

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"batching"
);

// TODO:
// `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers".
// ...What to do for sessions (connectionContext.messageSessions) ??
}

await refreshConnection(connectionContext);
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was at least one receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
if (!state.wasConnectionCloseCalled && state.numReceivers) {

// Calling onDetached on streaming receivers
const numStreamingReceivers = getNumberOfReceivers(connectionContext, "streaming");
if (!state.wasConnectionCloseCalled && numStreamingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` +
`receivers. We should reconnect.`
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numStreamingReceivers} ` +
`streaming receivers. We should reconnect.`
);
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
// and streaming receivers can decide whether to reconnect or not.
for (const receiverName of Object.keys(connectionContext.messageReceivers)) {
const receiver = connectionContext.messageReceivers[receiverName];
if (receiver) {
logger.verbose(
"[%s] calling detached on %s receiver '%s'.",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
detachCalls.push(
receiver.onDetached(connectionError || contextError).catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() on the %s receiver '%s'",
connectionContext.connection.id,
receiver.receiverType,
receiver.name
);
})
);
}
}

await Promise.all(detachCalls);
// Calling `onDetached()` on streaming receivers after the refreshConnection() since `onDetached()` would
// recover the streaming receivers and that would only be possible after the connection is refreshed.
//
// This is different from the batching receiver since `onDetached()` for the batching receiver would
// return the outstanding messages and close the receive link.
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"streaming"
);
}
};

Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver {
);
}

await this._batchingReceiverLite.close(connectionError);
this._batchingReceiverLite.close(connectionError);
}

/**
Expand Down

0 comments on commit 6cda731

Please sign in to comment.