Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23205 Correctly update the position of WALs currently being replicated (2) #944

Merged
merged 1 commit into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
Expand Down Expand Up @@ -439,14 +440,30 @@ public String getPeerClusterId() {
}

@Override
@VisibleForTesting
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}

@VisibleForTesting
public Path getLastLoggedPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPath();
}
return null;
}

@VisibleForTesting
public long getLastLoggedPosition() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPosition();
}
return 0;
}

private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
Expand Down Expand Up @@ -481,8 +498,8 @@ public String getStats() {
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
long position = worker.getLastLoggedPosition();
Path currentPath = worker.getLastLoggedPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
Expand Down Expand Up @@ -517,7 +534,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = worker.getCurrentPath();
Path currentPath = worker.getLastLoggedPath();
fileSize = -1;
if (currentPath != null) {
try {
Expand All @@ -535,7 +552,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(worker.getCurrentPosition())
.withCurrentPosition(worker.getLastLoggedPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
Expand All @@ -555,7 +572,7 @@ public class ReplicationSourceShipperThread extends Thread {
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
private volatile Path lastLoggedPath;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
Expand Down Expand Up @@ -600,21 +617,19 @@ public void run() {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}

if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
Expand Down Expand Up @@ -694,15 +709,13 @@ private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
lastLoggedPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -787,8 +800,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
}

private void updateLogPosition(long lastReadPosition) {
manager.setPendingShipment(false);
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
Expand All @@ -800,7 +812,7 @@ public void startup() {
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getCurrentPath(), e);
+ getLastLoggedPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Expand Down Expand Up @@ -941,8 +953,12 @@ public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
}

public long getCurrentPosition() {
return this.lastLoggedPosition;
public Path getLastLoggedPath() {
return lastLoggedPath;
}

public long getLastLoggedPosition() {
return lastLoggedPosition;
}

private boolean isWorkerActive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public class ReplicationSourceManager implements ReplicationListener {

private AtomicLong totalBufferUsed = new AtomicLong();

private boolean pendingShipment;

/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
Expand Down Expand Up @@ -191,19 +189,13 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
if (!this.pendingShipment) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
}

public synchronized void setPendingShipment(boolean pendingShipment) {
this.pendingShipment = pendingShipment;
cleanOldLogs(fileName, id, queueRecovered);
}

/**
Expand Down
Loading