Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing a rare case of AuthorizationFailedException #373

Merged
merged 1 commit into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-servicebus/azure-servicebus.pom
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.2.14</version>
<version>1.2.15</version>
<licenses>
<license>
<name>The MIT License (MIT)</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
private ScheduledFuture<?> sasTokenRenewTimerFuture;
private CompletableFuture<Void> requestResponseLinkCreationFuture;
private CompletableFuture<Void> receiveLinkReopenFuture;
private CompletableFuture<Void> ensureLinkReopenFutureToWaitOn;
private final Runnable timedOutUpdateStateRequestsDaemon;
private final Runnable returnMesagesLoopDaemon;
private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
private final ScheduledFuture<?> returnMessagesLoopRunner;
private final MessagingEntityType entityType;
private boolean shouldRetryLinkReopenOnTransientFailure = true;

// TODO Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests.
private CoreMessageReceiver(final MessagingFactory factory,
Expand Down Expand Up @@ -370,8 +372,25 @@ private void closeRequestResponseLink()
private void createReceiveLink()
{
TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
Connection connection = this.underlyingFactory.getConnection();

Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
if (connection == null) {
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
// CBS token but before opening a link.
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just after sending CBS token [](start = 100, length = 28)

Wonder if this is too much of details mentioning about CBS.

Copy link
Contributor

@nemakam nemakam Jun 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServiceBusException [](start = 39, length = 19)

Should this be SBCommunicationException?

if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
// Should never happen
AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), exception);
}

if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
// Complete the future and re-attempt link creation
AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
}

return;
}

final Session session = connection.session();
session.setIncomingCapacity(Integer.MAX_VALUE);
session.open();
Expand Down Expand Up @@ -877,7 +896,7 @@ public void onEvent()
catch (IOException ioException)
{
this.pendingReceives.remove(receiveWorkItem);
this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException));
receiveWorkItem.cancelTimeoutTask(false);
}
Expand Down Expand Up @@ -1191,7 +1210,33 @@ public void onEvent()
}, MessagingFactory.INTERNAL_THREAD_POOL);
}

return this.receiveLinkReopenFuture;
if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) {
this.ensureLinkReopenFutureToWaitOn = new CompletableFuture<Void>();
this.shouldRetryLinkReopenOnTransientFailure = true;
}

this.receiveLinkReopenFuture.handleAsync((v, ex) -> {
if (ex == null) {
this.ensureLinkReopenFutureToWaitOn.complete(null);
} else {
if (ex instanceof ServiceBusException && ((ServiceBusException)ex).getIsTransient()) {
if (this.shouldRetryLinkReopenOnTransientFailure) {
// Retry link creation
this.shouldRetryLinkReopenOnTransientFailure = false;
this.ensureLinkIsOpen();
} else {
this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
}
} else {
this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex);
}

}
return null;
},
MessagingFactory.INTERNAL_THREAD_POOL);

return this.ensureLinkReopenFutureToWaitOn;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr
private CompletableFuture<Void> requestResponseLinkCreationFuture;
private CompletableFuture<Void> sendLinkReopenFuture;
private int maxMessageSize;
private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;

@Deprecated
public static CompletableFuture<CoreMessageSender> create(
Expand Down Expand Up @@ -375,6 +376,7 @@ public CompletableFuture<Void> sendAsync(Message msg)
@Override
public void onOpenComplete(Exception completionException)
{
this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true;
if (completionException == null)
{
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
Expand Down Expand Up @@ -576,13 +578,35 @@ private void cleanupFailedSend(final SendWorkItem<Void> failedSend, final Throwa
private void createSendLink()
{
TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath);
final Connection connection = this.underlyingFactory.getConnection();
Connection connection = this.underlyingFactory.getActiveConnectionOrNothing();
if (connection == null) {
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
// CBS token but before opening a link.
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) {
// Should never happen
AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, exception);
}

if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) {
// Complete the future and re-attempt link creation
AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exception);
if(this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) {
this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false;
Timer.schedule(() -> {this.ensureLinkIsOpen();}, Duration.ZERO, TimerType.OneTimeRun);
}
}

return;
}

final Session session = connection.session();
session.setOutgoingWindow(Integer.MAX_VALUE);
session.open();
BaseHandler.setHandler(session, new SessionHandler(sendPath));

final String sendLinkNamePrefix = StringUtil.getShortRandomString();
final String sendLinkNamePrefix = "sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
final String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ?
sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) :
sendLinkNamePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,17 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException
TRACE_LOGGER.info("Started reactor");
}

Connection getConnection()
Connection getActiveConnectionOrNothing()
{
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
return null;
}
else {
return this.connection;
}
}

Connection getActiveConnectionCreateIfNecessary()
{
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED)
{
Expand Down Expand Up @@ -667,7 +677,7 @@ private void createCBSLinkAsync()
this.cbsLinkCreationFuture.completeExceptionally(completionEx);
}
else
{
{
String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null).handleAsync((cbsLink, ex) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,27 @@ private void createInternalLinks()
commonLinkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue());
}

// Create send link
final Connection connection = this.underlyingFactory.getConnection();
Connection connection;
if(this.sasTokenAudienceURI == null) {
// CBS link. Doesn't have to send SAS token
connection = this.underlyingFactory.getActiveConnectionCreateIfNecessary();
}
else {
connection = this.underlyingFactory.getActiveConnectionOrNothing();
if (connection == null) {
// Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending
// CBS token but before opening a link.
TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
AsyncUtil.completeFutureExceptionally(this.amqpSender.openFuture, exception);
AsyncUtil.completeFutureExceptionally(this.amqpReceiver.openFuture, exception);
// Retry with little delay so that link recreation in progress flag is reset
Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, Duration.ofMillis(1000), TimerType.OneTimeRun);
return;
}
}

// Create send link
Session session = connection.session();
session.setOutgoingWindow(Integer.MAX_VALUE);
session.open();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<proton-j-version>0.31.0</proton-j-version>
<junit-version>4.12</junit-version>
<slf4j-version>1.7.0</slf4j-version>
<client-current-version>1.2.14</client-current-version>
<client-current-version>1.2.15</client-current-version>
</properties>

<modules>
Expand Down