diff --git a/azure-servicebus/azure-servicebus.pom b/azure-servicebus/azure-servicebus.pom index eddd4886..822aa447 100644 --- a/azure-servicebus/azure-servicebus.pom +++ b/azure-servicebus/azure-servicebus.pom @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure azure-servicebus - 1.2.17 + 1.2.18 The MIT License (MIT) diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index e1f080c1..20942a3b 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -15,6 +15,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; @@ -52,7 +53,8 @@ class MessageReceiver extends InitializableEntity implements IMessageReceiver, I private CoreMessageReceiver internalReceiver = null; private boolean isInitialized = false; private MessageBrowser browser = null; - private int messagePrefetchCount; + private int messagePrefetchCount; + private ScheduledFuture requestResponseLockTokenPruner = null; private final ConcurrentHashMap requestResponseLockTokensToLockTimesMap; @@ -425,6 +427,9 @@ protected CompletableFuture onClose() { } else { TRACE_LOGGER.info("Closing MessageReceiver to entity '{}'", this.entityPath); } + if (this.requestResponseLockTokenPruner != null) { + this.requestResponseLockTokenPruner.cancel(false); + } CompletableFuture closeReceiverFuture = this.internalReceiver.closeAsync(); return closeReceiverFuture.thenComposeAsync((v) -> @@ -624,8 +629,12 @@ public CompletableFuture> peekBatchAsync(long fromSequenceN private void schedulePruningRequestResponseLockTokens() { // Run it every 1 hour - Timer.schedule(new Runnable() { + this.requestResponseLockTokenPruner = Timer.schedule(new Runnable() { public void run() { + if (MessageReceiver.this.getIsClosed()) { + MessageReceiver.this.requestResponseLockTokenPruner.cancel(true); + return; + } Instant systemTime = Instant.now(); Entry[] copyOfEntries = (Entry[]) MessageReceiver.this.requestResponseLockTokensToLockTimesMap.entrySet().toArray(); for (Entry entry : copyOfEntries) { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java index c341dcfe..f19283c4 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java @@ -138,17 +138,10 @@ protected final void throwIfClosed(Throwable cause) } @Override - protected void finalize() - { - try { - if(!this.isClosed) - { - this.close(); - } - - super.finalize(); - } catch (Throwable e) { - //Ignore - } - } + protected void finalize() throws Throwable { + if (!this.getIsClosingOrClosed()) { + this.closeAsync(); + } + super.finalize(); + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index aa7781bc..0c3d12dc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -1051,8 +1051,12 @@ public void onEvent() { this.cancelSASTokenRenewTimer(); this.closeRequestResponseLink(); - this.updateStateRequestsTimeoutChecker.cancel(false); - this.returnMessagesLoopRunner.cancel(false); + if (this.updateStateRequestsTimeoutChecker != null) { + this.updateStateRequestsTimeoutChecker.cancel(false); + } + if (this.returnMessagesLoopRunner != null) { + this.returnMessagesLoopRunner.cancel(false); + } } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index 151c2d28..609e1a53 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -555,7 +555,9 @@ public void onEvent() protected CompletableFuture onClose() { TRACE_LOGGER.info("Closing requestresponselink to {} by closing both internal sender and receiver links.", this.linkPath); this.cancelSASTokenRenewTimer(); - return this.amqpSender.closeAsync().thenComposeAsync((v) -> this.amqpReceiver.closeAsync(), MessagingFactory.INTERNAL_THREAD_POOL); + CompletableFuture senderCloseFuture = this.amqpSender.closeAsync(); + CompletableFuture receiverCloseFuture = this.amqpReceiver.closeAsync(); + return CompletableFuture.allOf(senderCloseFuture, receiverCloseFuture); } private static void scheduleLinkCloseTimeout(CompletableFuture closeFuture, Duration timeout, String linkName) diff --git a/pom.xml b/pom.xml index cbc81e1c..2e1832ad 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ 0.31.0 4.12 1.7.0 - 1.2.17 + 1.2.18