Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28090 Make entryReader field final in ReplicationSourceShipper … #5409

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
public class RecoveredReplicationSource extends ReplicationSource {

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage,
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, this, walReader, queueStorage,
() -> {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
private final Runnable tryFinish;

public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
RecoveredReplicationSource source, ReplicationSourceWALReader walReader,
ReplicationQueueStorage queueStorage, Runnable tryFinish) {
super(conf, walGroupId, logQueue, source);
super(conf, walGroupId, source, walReader);
this.tryFinish = tryFinish;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,13 @@ private void tryStartNewShipper(String walGroupId) {
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
+ walGroupId + "," + queueId,
(t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
return worker;
}
Expand Down Expand Up @@ -428,8 +427,9 @@ private long getFileSize(Path currentPath) throws IOException {
return fileSize;
}

protected ReplicationSourceShipper createNewShipper(String walGroupId) {
return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
protected ReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
}

private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
Expand Down Expand Up @@ -665,7 +665,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics) {
terminate(reason, cause, clearMetrics, true);
}

public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
if (cause == null) {
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else {
Expand All @@ -684,9 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
worker.entryReader.setReaderRunning(false);
}

if (this.replicationEndpoint != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public enum WorkerState {
}

private final Configuration conf;
protected final String walGroupId;
protected final ReplicationSourceLogQueue logQueue;
protected final ReplicationSource source;
final String walGroupId;
private final ReplicationSource source;

// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
Expand All @@ -66,22 +65,22 @@ public enum WorkerState {
private Path currentPath;
// Current state of the worker thread
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
final ReplicationSourceWALReader entryReader;

// How long should we sleep for each retry
protected final long sleepForRetries;
private final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
private final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;

public ReplicationSourceShipper(Configuration conf, String walGroupId,
ReplicationSourceLogQueue logQueue, ReplicationSource source) {
public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
this.conf = conf;
this.walGroupId = walGroupId;
this.logQueue = logQueue;
this.source = source;
this.entryReader = walReader;
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
Expand Down Expand Up @@ -295,10 +294,6 @@ long getCurrentPosition() {
return currentPosition;
}

void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}

protected boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ public void testTerminateClearsBuffer() throws Exception {
mock(MetricsSource.class));
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
Expand Down