Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23231 ReplicationSource do not update metrics after refresh #778

Merged
merged 2 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,12 @@ public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}

public void terminate(String reason, Exception cause, boolean join) {
@Override
public void terminate(String reason, Exception cause, boolean clearMetrics) {
terminate(reason, cause, clearMetrics, true);
}

public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
if (cause == null) {
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else {
Expand Down Expand Up @@ -595,7 +600,9 @@ public void terminate(String reason, Exception cause, boolean join) {
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
metrics.clear();
if (clearMetrics) {
metrics.clear();
}
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
Expand All @@ -611,7 +618,9 @@ public void terminate(String reason, Exception cause, boolean join) {
}
}
}
this.metrics.clear();
if (clearMetrics) {
this.metrics.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pai
*/
void terminate(String reason, Exception cause);

/**
* End the replication
* @param reason why it's terminating
* @param cause the error that's causing it
* @param clearMetrics removes all metrics about this Source
*/
void terminate(String reason, Exception cause, boolean clearMetrics);

/**
* Get the current log that's replicated
* @return the current log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ public void refreshSources(String peerId) throws IOException {
ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
// Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
}
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ public void terminate(String reason) {

@Override
public void terminate(String reason, Exception e) {
this.metrics.clear();
terminate(reason, e, true);
}

@Override
public void terminate(String reason, Exception e, boolean clearMetrics) {
if (clearMetrics) {
this.metrics.clear();
}
}

@Override
Expand Down