Skip to content

Commit

Permalink
HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (
Browse files Browse the repository at this point in the history
…#2990)

Signed-off-by: Xu Cang <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
sandeepvinayak authored Feb 25, 2021
1 parent 33c9f77 commit d724d05
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -66,7 +66,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -265,6 +264,11 @@ public void enqueueLog(Path wal) {
}
}

@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;

Expand Down Expand Up @@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
WALEntryBatch batch = null;
WALEntryStream entryStream = null;
try {
// we only loop back here if something fatal happened to our stream
while (isReaderRunning()) {
try {
entryStream =
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

batch = createBatch(entryStream);
batch = readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
} catch (IOException e) { // stream related
if (handleEofException(e, batch)) {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
entryStream.reset(); // reuse stream
} else {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
}
} catch (IOException e) { // stream related
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -189,14 +208,19 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) {
return newPath == null || !path.getName().equals(newPath.getName());
}

protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
// We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null;
}
}
Expand All @@ -208,7 +232,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
Expand All @@ -231,10 +255,12 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
return batch;
}

private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
if (source.isRecovered()) {
// we're done with queue recovery, shut ourself down
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
Expand All @@ -244,22 +270,38 @@ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedExcept
}

/**
* if we get an EOF due to a zero-length log, and there are other logs in queue
* (highly likely we've closed the current log), and autorecovery is
* enabled, then dump the log
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
private boolean handleEofException(IOException e, WALEntryBatch batch)
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
if ((e instanceof EOFException || e.getCause() instanceof EOFException)
&& (source.isRecovered() || queue.size() > 1)
&& this.eofAutoRecovery) {
Path head = queue.peek();
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries and set the wal position
// and path to the wal just dequeued to correctly remove logs from the zk
batch.setLastWalPath(head);
batch.setLastWalPosition(currentPosition);
addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
Expand All @@ -269,6 +311,20 @@ private boolean handleEofException(IOException e) {
return false;
}

/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private void addBatchToShippingQueue(WALEntryBatch batch)
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}

public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
}

@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
Expand All @@ -70,7 +70,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public Path getLastWalPath() {
return lastWalPath;
}

public void setLastWalPath(Path lastWalPath) {
this.lastWalPath = lastWalPath;
}

/**
* @return the position in the last WAL that was read.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
* @throws IOException
* @throws IOException throw IO exception from stream
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
Expand Down Expand Up @@ -368,7 +368,9 @@ private void openReader(Path path) throws IOException {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
Expand Down
Loading

0 comments on commit d724d05

Please sign in to comment.