Skip to content

Commit

Permalink
HBASE-28155 RecoveredReplicationSource quit when there are still unfi…
Browse files Browse the repository at this point in the history
…nished groups (apache#5466)

Signed-off-by: Guanghao Zhang <[email protected]>
  • Loading branch information
Apache9 authored and wchevreuil committed Nov 2, 2023
1 parent 522ba04 commit e154482
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@
@InterfaceAudience.Private
public class RecoveredReplicationSource extends ReplicationSource {

@Override
protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
workerThreads.put(walGroupId, createNewShipper(walGroupId));
}
// start shippers after initializing the workerThreads, as in the below postFinish logic, if
// workerThreads is empty, we will mark the RecoveredReplicationSource as finished. So if we
// start the worker on the fly, it is possible that a shipper has already finished its work and
// called postFinish, and find out the workerThreads is empty and then mark the
// RecoveredReplicationSource as finish, while the next shipper has not been added to
// workerThreads yet. See HBASE-28155 for more details.
for (ReplicationSourceShipper shipper : workerThreads.values()) {
startShipper(shipper);
}
}

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,28 @@ private long getStartOffset(String walGroupId) {
}
}

protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
return worker;
}

protected final void startShipper(ReplicationSourceShipper worker) {
worker.startup(this::retryRefreshing);
}

private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
worker.startup(this::retryRefreshing);
ReplicationSourceShipper worker = createNewShipper(walGroupId);
startShipper(worker);
return worker;
}
});
Expand Down Expand Up @@ -522,7 +531,7 @@ private long getCurrentBandwidth() {
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
Expand Down Expand Up @@ -605,10 +614,14 @@ private void initialize() {
queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
startShippers();
setSourceStartupStatus(false);
}

protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
setSourceStartupStatus(false);
}

private synchronized void setSourceStartupStatus(boolean initializing) {
Expand Down

0 comments on commit e154482

Please sign in to comment.