diff --git a/azure-servicebus/azure-servicebus.pom b/azure-servicebus/azure-servicebus.pom
index d2f8db8b..09875011 100644
--- a/azure-servicebus/azure-servicebus.pom
+++ b/azure-servicebus/azure-servicebus.pom
@@ -4,7 +4,7 @@
4.0.0
com.microsoft.azure
azure-servicebus
- 1.2.14
+ 1.2.15
The MIT License (MIT)
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
index 57edc305..18a42f63 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
@@ -102,11 +102,13 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
private ScheduledFuture> sasTokenRenewTimerFuture;
private CompletableFuture requestResponseLinkCreationFuture;
private CompletableFuture receiveLinkReopenFuture;
+ private CompletableFuture ensureLinkReopenFutureToWaitOn;
private final Runnable timedOutUpdateStateRequestsDaemon;
private final Runnable returnMesagesLoopDaemon;
private final ScheduledFuture> updateStateRequestsTimeoutChecker;
private final ScheduledFuture> returnMessagesLoopRunner;
private final MessagingEntityType entityType;
+ private boolean shouldRetryLinkReopenOnTransientFailure = true;
// TODO Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
private CoreMessageReceiver(final MessagingFactory factory,
@@ -370,8 +372,25 @@ private void closeRequestResponseLink()
private void createReceiveLink()
{
TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
- Connection connection = this.underlyingFactory.getConnection();
-
+ Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
+ if (connection == null) {
+ // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
+ // CBS token but before opening a link.
+ TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
+ ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
+ if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
+ // Should never happen
+ AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), exception);
+ }
+
+ if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
+ // Complete the future and re-attempt link creation
+ AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
+ }
+
+ return;
+ }
+
final Session session = connection.session();
session.setIncomingCapacity(Integer.MAX_VALUE);
session.open();
@@ -877,7 +896,7 @@ public void onEvent()
catch (IOException ioException)
{
this.pendingReceives.remove(receiveWorkItem);
- this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
+ this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
receiveWorkItem.cancelTimeoutTask(false);
}
@@ -1191,7 +1210,33 @@ public void onEvent()
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- return this.receiveLinkReopenFuture;
+ if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) {
+ this.ensureLinkReopenFutureToWaitOn = new CompletableFuture();
+ this.shouldRetryLinkReopenOnTransientFailure = true;
+ }
+
+ this.receiveLinkReopenFuture.handleAsync((v, ex) -> {
+ if (ex == null) {
+ this.ensureLinkReopenFutureToWaitOn.complete(null);
+ } else {
+ if (ex instanceof ServiceBusException && ((ServiceBusException)ex).getIsTransient()) {
+ if (this.shouldRetryLinkReopenOnTransientFailure) {
+ // Retry link creation
+ this.shouldRetryLinkReopenOnTransientFailure = false;
+ this.ensureLinkIsOpen();
+ } else {
+ this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
+ }
+ } else {
+ this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
+ }
+
+ }
+ return null;
+ },
+ MessagingFactory.INTERNAL_THREAD_POOL);
+
+ return this.ensureLinkReopenFutureToWaitOn;
}
else
{
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
index 44cc22a7..f1af29bd 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
@@ -86,6 +86,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
private CompletableFuture requestResponseLinkCreationFuture;
private CompletableFuture sendLinkReopenFuture;
private int maxMessageSize;
+ private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
@Deprecated
public static CompletableFuture create(
@@ -375,6 +376,7 @@ public CompletableFuture sendAsync(Message msg)
@Override
public void onOpenComplete(Exception completionException)
{
+ this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
if (completionException == null)
{
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
@@ -576,13 +578,35 @@ private void cleanupFailedSend(final SendWorkItem failedSend, final Throwa
private void createSendLink()
{
TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
- final Connection connection = this.underlyingFactory.getConnection();
+ Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
+ if (connection == null) {
+ // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
+ // CBS token but before opening a link.
+ TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
+ ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
+ if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) {
+ // Should never happen
+ AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, exception);
+ }
+
+ if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
+ // Complete the future and re-attempt link creation
+ AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exception);
+ if(this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) {
+ this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false;
+ Timer.schedule(() -> {this.ensureLinkIsOpen();}, Duration.ZERO, TimerType.OneTimeRun);
+ }
+ }
+
+ return;
+ }
+
final Session session = connection.session();
session.setOutgoingWindow(Integer.MAX_VALUE);
session.open();
BaseHandler.setHandler(session, new SessionHandler(sendPath));
- final String sendLinkNamePrefix = StringUtil.getShortRandomString();
+ final String sendLinkNamePrefix = "sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
final String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ?
sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) :
sendLinkNamePrefix;
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
index dbe230eb..29d4c20b 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
@@ -140,7 +140,17 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException
TRACE_LOGGER.info("Started reactor");
}
- Connection getConnection()
+ Connection getActiveConnectionOrNothing()
+ {
+ if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
+ return null;
+ }
+ else {
+ return this.connection;
+ }
+ }
+
+ Connection getActiveConnectionCreateIfNecessary()
{
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED)
{
@@ -667,7 +677,7 @@ private void createCBSLinkAsync()
this.cbsLinkCreationFuture.completeExceptionally(completionEx);
}
else
- {
+ {
String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null).handleAsync((cbsLink, ex) ->
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
index 3e261b3c..151c2d28 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
@@ -239,9 +239,27 @@ private void createInternalLinks()
commonLinkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
}
- // Create send link
- final Connection connection = this.underlyingFactory.getConnection();
+ Connection connection;
+ if(this.sasTokenAudienceURI == null) {
+ // CBS link. Doesn't have to send SAS token
+ connection = this.underlyingFactory.getActiveConnectionCreateIfNecessary();
+ }
+ else {
+ connection = this.underlyingFactory.getActiveConnectionOrNothing();
+ if (connection == null) {
+ // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
+ // CBS token but before opening a link.
+ TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
+ ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
+ AsyncUtil.completeFutureExceptionally(this.amqpSender.openFuture, exception);
+ AsyncUtil.completeFutureExceptionally(this.amqpReceiver.openFuture, exception);
+ // Retry with little delay so that link recreation in progress flag is reset
+ Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, Duration.ofMillis(1000), TimerType.OneTimeRun);
+ return;
+ }
+ }
+ // Create send link
Session session = connection.session();
session.setOutgoingWindow(Integer.MAX_VALUE);
session.open();
diff --git a/pom.xml b/pom.xml
index 18d530ee..d471add3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
0.31.0
4.12
1.7.0
- 1.2.14
+ 1.2.15