Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28155 RecoveredReplicationSource quit when there are still unfi… #5466

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@
@InterfaceAudience.Private
public class RecoveredReplicationSource extends ReplicationSource {

@Override
protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
workerThreads.put(walGroupId, createNewShipper(walGroupId));
}
// start shippers after initializing the workerThreads, as in the below postFinish logic, if
// workerThreads is empty, we will mark the RecoveredReplicationSource as finished. So if we
// start the worker on the fly, it is possible that a shipper has already finished its work and
// called postFinish, and find out the workerThreads is empty and then mark the
// RecoveredReplicationSource as finish, while the next shipper has not been added to
// workerThreads yet. See HBASE-28155 for more details.
for (ReplicationSourceShipper shipper : workerThreads.values()) {
startShipper(shipper);
}
}

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,28 @@ private long getStartOffset(String walGroupId) {
}
}

protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
return worker;
}

protected final void startShipper(ReplicationSourceShipper worker) {
worker.startup(this::retryRefreshing);
}

private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
worker.startup(this::retryRefreshing);
ReplicationSourceShipper worker = createNewShipper(walGroupId);
startShipper(worker);
return worker;
}
});
Expand Down Expand Up @@ -522,7 +531,7 @@ private long getCurrentBandwidth() {
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
private boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
Expand Down Expand Up @@ -605,10 +614,14 @@ private void initialize() {
queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
startShippers();
setSourceStartupStatus(false);
}

protected void startShippers() {
for (String walGroupId : logQueue.getQueues().keySet()) {
tryStartNewShipper(walGroupId);
}
setSourceStartupStatus(false);
}

private synchronized void setSourceStartupStatus(boolean initializing) {
Expand Down