Skip to content

Commit

Permalink
HBASE-28090 Make entryReader field final in ReplicationSourceShipper …
Browse files Browse the repository at this point in the history
…class (apache#5409)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
Apache9 authored and wchevreuil committed Nov 2, 2023
1 parent 7382ac8 commit fce307e
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
public class RecoveredReplicationSource extends ReplicationSource {

@Override
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage,
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
return new RecoveredReplicationSourceShipper(conf, walGroupId, this, walReader, queueStorage,
() -> {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
private final Runnable tryFinish;

public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
RecoveredReplicationSource source, ReplicationSourceWALReader walReader,
ReplicationQueueStorage queueStorage, Runnable tryFinish) {
super(conf, walGroupId, logQueue, source);
super(conf, walGroupId, source, walReader);
this.tryFinish = tryFinish;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,13 @@ private void tryStartNewShipper(String walGroupId) {
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, getStartOffset(walGroupId));
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
+ walGroupId + "," + queueId,
(t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
return worker;
}
Expand Down Expand Up @@ -428,8 +427,9 @@ private long getFileSize(Path currentPath) throws IOException {
return fileSize;
}

protected ReplicationSourceShipper createNewShipper(String walGroupId) {
return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
protected ReplicationSourceShipper createNewShipper(String walGroupId,
ReplicationSourceWALReader walReader) {
return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
}

private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
Expand Down Expand Up @@ -665,7 +665,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics) {
terminate(reason, cause, clearMetrics, true);
}

public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
if (cause == null) {
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else {
Expand All @@ -684,9 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
worker.entryReader.setReaderRunning(false);
}

if (this.replicationEndpoint != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public enum WorkerState {
}

private final Configuration conf;
protected final String walGroupId;
protected final ReplicationSourceLogQueue logQueue;
protected final ReplicationSource source;
final String walGroupId;
private final ReplicationSource source;

// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
Expand All @@ -66,22 +65,22 @@ public enum WorkerState {
private Path currentPath;
// Current state of the worker thread
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
final ReplicationSourceWALReader entryReader;

// How long should we sleep for each retry
protected final long sleepForRetries;
private final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
private final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;

public ReplicationSourceShipper(Configuration conf, String walGroupId,
ReplicationSourceLogQueue logQueue, ReplicationSource source) {
public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
this.conf = conf;
this.walGroupId = walGroupId;
this.logQueue = logQueue;
this.source = source;
this.entryReader = walReader;
// 1 second
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
Expand Down Expand Up @@ -295,10 +294,6 @@ long getCurrentPosition() {
return currentPosition;
}

void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}

protected boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ public void testTerminateClearsBuffer() throws Exception {
mock(MetricsSource.class));
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
Expand Down

0 comments on commit fce307e

Please sign in to comment.