Skip to content

Commit

Permalink
HBASE-24781 Clean up peer metrics when disabling peer (#4997)
Browse files Browse the repository at this point in the history
Co-authored-by: Yuta Imazu <[email protected]>
Signed-off-by: Duo Zhang <[email protected]
(cherry picked from commit ef6a113)
  • Loading branch information
mosmeh authored and Apache9 committed Feb 28, 2023
1 parent 5b6d9de commit 840c62e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,14 @@ public void incrLogReadInBytes(long readInBytes) {

/** Removes all metrics about this Source. */
public void clear() {
terminate();
singleSourceSource.clear();
}

public void terminate() {
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastShippedTimeStamps.clear();
lastHFileRefsQueueSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,13 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
}
}
}
if (clearMetrics) {
// Can be null in test context.
if (this.metrics != null) {

// Can be null in test context.
if (this.metrics != null) {
if (clearMetrics) {
this.metrics.clear();
} else {
this.metrics.terminate();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ public void refreshSources(String peerId) throws IOException {
ReplicationSourceInterface toRemove = this.sources.remove(peerId);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage, null, true);
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
src = createSource(peerId, peer);
this.sources.put(peerId, src);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public void terminate(String reason, Exception e) {
public void terminate(String reason, Exception e, boolean clearMetrics) {
if (clearMetrics) {
this.metrics.clear();
} else {
this.metrics.terminate();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,41 @@ public void testRemovePeerMetricsCleanup() throws Exception {
}
}

@Test
public void testDisablePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());

// Refreshing the peer should decrement the global and single source metrics
manager.refreshSources(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());

source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}

/**
* Add a peer and wait for it to initialize
* @param waitForSource Whether to wait for replication source to initialize
Expand Down

0 comments on commit 840c62e

Please sign in to comment.