Skip to content

Commit

Permalink
Removing finalize method altogether. (#388)
Browse files Browse the repository at this point in the history
* Removing finalize method altogether.

* Chaging vesion.

* Addin finalize only for MessagingFactory as connection needs to be disposed properly.

* Another fix to avoid stalling in finalize waiting for object to close.
  • Loading branch information
yvgopal authored Nov 6, 2019
1 parent 8d12d31 commit 812bf7e
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion azure-servicebus/azure-servicebus.pom
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.2.17</version>
<version>1.2.18</version>
<licenses>
<license>
<name>The MIT License (MIT)</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
Expand Down Expand Up @@ -52,7 +53,8 @@ class MessageReceiver extends InitializableEntity implements IMessageReceiver, I
private CoreMessageReceiver internalReceiver = null;
private boolean isInitialized = false;
private MessageBrowser browser = null;
private int messagePrefetchCount;
private int messagePrefetchCount;
private ScheduledFuture<?> requestResponseLockTokenPruner = null;

private final ConcurrentHashMap<UUID, Instant> requestResponseLockTokensToLockTimesMap;

Expand Down Expand Up @@ -425,6 +427,9 @@ protected CompletableFuture<Void> onClose() {
} else {
TRACE_LOGGER.info("Closing MessageReceiver to entity '{}'", this.entityPath);
}
if (this.requestResponseLockTokenPruner != null) {
this.requestResponseLockTokenPruner.cancel(false);
}
CompletableFuture<Void> closeReceiverFuture = this.internalReceiver.closeAsync();

return closeReceiverFuture.thenComposeAsync((v) ->
Expand Down Expand Up @@ -624,8 +629,12 @@ public CompletableFuture<Collection<IMessage>> peekBatchAsync(long fromSequenceN

private void schedulePruningRequestResponseLockTokens() {
// Run it every 1 hour
Timer.schedule(new Runnable() {
this.requestResponseLockTokenPruner = Timer.schedule(new Runnable() {
public void run() {
if (MessageReceiver.this.getIsClosed()) {
MessageReceiver.this.requestResponseLockTokenPruner.cancel(true);
return;
}
Instant systemTime = Instant.now();
Entry<UUID, Instant>[] copyOfEntries = (Entry<UUID, Instant>[]) MessageReceiver.this.requestResponseLockTokensToLockTimesMap.entrySet().toArray();
for (Entry<UUID, Instant> entry : copyOfEntries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,10 @@ protected final void throwIfClosed(Throwable cause)
}

@Override
protected void finalize()
{
try {
if(!this.isClosed)
{
this.close();
}

super.finalize();
} catch (Throwable e) {
//Ignore
}
}
protected void finalize() throws Throwable {
if (!this.getIsClosingOrClosed()) {
this.closeAsync();
}
super.finalize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1051,8 +1051,12 @@ public void onEvent() {

this.cancelSASTokenRenewTimer();
this.closeRequestResponseLink();
this.updateStateRequestsTimeoutChecker.cancel(false);
this.returnMessagesLoopRunner.cancel(false);
if (this.updateStateRequestsTimeoutChecker != null) {
this.updateStateRequestsTimeoutChecker.cancel(false);
}
if (this.returnMessagesLoopRunner != null) {
this.returnMessagesLoopRunner.cancel(false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ public void onEvent()
protected CompletableFuture<Void> onClose() {
TRACE_LOGGER.info("Closing requestresponselink to {} by closing both internal sender and receiver links.", this.linkPath);
this.cancelSASTokenRenewTimer();
return this.amqpSender.closeAsync().thenComposeAsync((v) -> this.amqpReceiver.closeAsync(), MessagingFactory.INTERNAL_THREAD_POOL);
CompletableFuture<Void> senderCloseFuture = this.amqpSender.closeAsync();
CompletableFuture<Void> receiverCloseFuture = this.amqpReceiver.closeAsync();
return CompletableFuture.allOf(senderCloseFuture, receiverCloseFuture);
}

private static void scheduleLinkCloseTimeout(CompletableFuture<Void> closeFuture, Duration timeout, String linkName)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<proton-j-version>0.31.0</proton-j-version>
<junit-version>4.12</junit-version>
<slf4j-version>1.7.0</slf4j-version>
<client-current-version>1.2.17</client-current-version>
<client-current-version>1.2.18</client-current-version>
</properties>

<modules>
Expand Down

0 comments on commit 812bf7e

Please sign in to comment.