Skip to content

Commit

Permalink
HBASE-28129 Do not retry refreshSources when region server is already…
Browse files Browse the repository at this point in the history
… stopping (#5453)

Signed-off-by: GeorryHuang <[email protected]>
Signed-off-by: Xiaolin Ha <[email protected]>
  • Loading branch information
Apache9 authored Oct 7, 2023
1 parent 544d368 commit 6455c49
Showing 1 changed file with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,9 @@ private void tryStartNewShipper(String walGroupId) {
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
+ walGroupId + "," + queueId,
(t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
worker.startup(this::retryRefreshing);
return worker;
}
});
Expand Down Expand Up @@ -448,24 +446,30 @@ WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}

private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager,
String peerId) {
OOMEChecker.exitIfOOME(e, getClass().getSimpleName());
LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e);
// log the error, check if the error is OOME, or whether we should abort the server
private void checkError(Thread t, Throwable error) {
OOMEChecker.exitIfOOME(error, getClass().getSimpleName());
LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
if (abortOnError) {
server.abort("Unexpected exception in " + t.getName(), e);
server.abort("Unexpected exception in " + t.getName(), error);
}
if (manager != null) {
while (true) {
try {
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException | ReplicationException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
}

private void retryRefreshing(Thread t, Throwable error) {
checkError(t, error);
while (true) {
if (server.isAborted() || server.isStopped() || server.isStopping()) {
LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
return;
}
try {
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
t.getName());
manager.refreshSources(getPeerId());
break;
} catch (Exception e) {
LOG.error("Replication sources refresh failed.", e);
sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
}
}
Expand Down Expand Up @@ -630,7 +634,7 @@ public ReplicationSourceInterface startup() {
// keep looping in this thread until initialize eventually succeeds,
// while the server main startup one can go on with its work.
sourceRunning = false;
uncaughtException(t, e, null, null);
checkError(t, e);
retryStartup.set(!this.abortOnError);
do {
if (retryStartup.get()) {
Expand All @@ -641,7 +645,7 @@ public ReplicationSourceInterface startup() {
initialize();
} catch (Throwable error) {
setSourceStartupStatus(false);
uncaughtException(t, error, null, null);
checkError(t, error);
retryStartup.set(!this.abortOnError);
}
}
Expand Down

0 comments on commit 6455c49

Please sign in to comment.