Skip to content

Commit

Permalink
Fix autoRecover memory leak. (apache#3361)
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy authored Jul 26, 2022
1 parent f181325 commit da1b29a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
Expand Down Expand Up @@ -154,6 +155,30 @@ private void handleMetadata(Versioned<LedgerMetadata> result, Throwable exceptio
}
}

/**
* CancelWatchLedgerMetadataTask class.
*/
protected class CancelWatchLedgerMetadataTask implements Runnable {

final long ledgerId;

CancelWatchLedgerMetadataTask(long ledgerId) {
this.ledgerId = ledgerId;
}

@Override
public void run() {
Set<LedgerMetadataListener> listeners = AbstractZkLedgerManager.this.listeners.get(ledgerId);
if (!CollectionUtils.isEmpty(listeners)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Still watch ledgerId: {}, ignore this unwatch task.", ledgerId);
}
return;
}
cancelMetadataWatch(ledgerId, AbstractZkLedgerManager.this);
}
}

/**
* ZooKeeper-based Ledger Manager Constructor.
*
Expand Down Expand Up @@ -420,11 +445,28 @@ public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListen
}
if (listenerSet.isEmpty()) {
listeners.remove(ledgerId, listenerSet);
new CancelWatchLedgerMetadataTask(ledgerId).run();
}
}
}
}

private void cancelMetadataWatch(long ledgerId, Watcher watcher) {
zk.removeWatches(getLedgerPath(ledgerId), watcher, WatcherType.Data, true, new VoidCallback() {
@Override
public void processResult(int rc, String path, Object o) {
if (rc != KeeperException.Code.OK.intValue()) {
LOG.error("Cancel watch ledger {} metadata failed.", ledgerId,
KeeperException.create(KeeperException.Code.get(rc), path));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Cancel watch ledger {} metadata succeed.", ledgerId);
}
}
}, null);
}

@Override
public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
return readLedgerMetadata(ledgerId, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,16 @@ public void testUnregisterLedgerMetadataListener() throws Exception {
ledgerStr, true,
KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);

mockZkRemoveWatcher();

// unregister the listener
ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
assertFalse(ledgerManager.listeners.containsKey(ledgerId));
assertFalse(watchers.containsKey(ledgerStr));
verify(mockZk, times(1)).removeWatches(eq(ledgerManager.getLedgerPath(ledgerId)),
any(Watcher.class), any(Watcher.WatcherType.class), any(Boolean.class),
any(VoidCallback.class), any());


// notify the watcher event
notifyWatchedEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand Down Expand Up @@ -83,6 +84,20 @@ private void addWatcher(String path, Watcher watcher) {
watcherSet.add(watcher);
}

private void removeWatcher(String path, Watcher watcher) {
if (watcher == null) {
return;
}
Set<Watcher> watcherSet = watchers.get(path);
if (null == watcherSet) {
return;
}
watcherSet.remove(watcher);
if (watcherSet.isEmpty()) {
watchers.remove(path);
}
}

protected void mockZkUtilsAsyncCreateFullPathOptimistic(
String expectedLedgerPath,
CreateMode expectedCreateMode,
Expand Down Expand Up @@ -187,7 +202,24 @@ protected void mockZkGetData(
expectedWatcher ? any(Watcher.class) : eq(null),
any(DataCallback.class),
any());
}

protected void mockZkRemoveWatcher () throws Exception {
doAnswer(invocationOnMock -> {
String path = invocationOnMock.getArgument(0);
Watcher watcher = invocationOnMock.getArgument(1);
VoidCallback callback = invocationOnMock.getArgument(4);
removeWatcher(path, watcher);

callback.processResult(KeeperException.Code.OK.intValue(), path, null);
return null;
}).when(mockZk).removeWatches(
any(String.class),
any(Watcher.class),
any(Watcher.WatcherType.class),
any(Boolean.class),
any(VoidCallback.class),
any());
}

protected void mockZkSetData(
Expand Down

0 comments on commit da1b29a

Please sign in to comment.