Skip to content

Commit

Permalink
HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (
Browse files Browse the repository at this point in the history
…#2987)

Signed-off-by: Xu Cang <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
sandeepvinayak authored Feb 25, 2021
1 parent a7d0445 commit 3f1c486
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -65,7 +66,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -260,6 +260,11 @@ public void enqueueLog(Path wal) {
}
}

@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;

Expand Down Expand Up @@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
WALEntryBatch batch = null;
WALEntryStream entryStream = null;
try {
// we only loop back here if something fatal happened to our stream
while (isReaderRunning()) {
try {
entryStream =
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

batch = createBatch(entryStream);
batch = readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
}
WALEntryBatch batch = readWALEntries(entryStream);
currentPosition = entryStream.getPosition();
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
} catch (IOException e) { // stream related
if (handleEofException(e, batch)) {
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
}
} catch (IOException e) { // stream related
if (!handleEofException(e)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -189,14 +208,19 @@ protected static final boolean switched(WALEntryStream entryStream, Path path) {
return newPath == null || !path.getName().equals(newPath.getName());
}

protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
// We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null;
}
}
Expand All @@ -208,7 +232,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
WALEntryBatch batch = createBatch(entryStream);
batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
Expand Down Expand Up @@ -236,6 +260,7 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
Expand All @@ -245,22 +270,38 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
}

/**
* if we get an EOF due to a zero-length log, and there are other logs in queue
* (highly likely we've closed the current log), and autorecovery is
* enabled, then dump the log
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
private boolean handleEofException(IOException e, WALEntryBatch batch)
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
if ((e instanceof EOFException || e.getCause() instanceof EOFException)
&& (source.isRecovered() || queue.size() > 1)
&& this.eofAutoRecovery) {
Path head = queue.peek();
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
if (fs.getFileStatus(head).getLen() == 0) {
// head of the queue is an empty log file
LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
// After we removed the WAL from the queue, we should
// try shipping the existing batch of entries and set the wal position
// and path to the wal just dequeued to correctly remove logs from the zk
batch.setLastWalPath(head);
batch.setLastWalPosition(currentPosition);
addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
Expand All @@ -270,6 +311,20 @@ private boolean handleEofException(IOException e) {
return false;
}

/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/
private void addBatchToShippingQueue(WALEntryBatch batch)
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}

public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
}

@Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
Expand All @@ -70,7 +70,7 @@ protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
WALEntryBatch batch = createBatch(entryStream);
batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ public Path getLastWalPath() {
return lastWalPath;
}

public void setLastWalPath(Path lastWalPath) {
this.lastWalPath = lastWalPath;
}

/**
* @return the position in the last WAL that was read.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
* @throws IOException
* @throws IOException throw IO exception from stream
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
Expand Down Expand Up @@ -368,7 +368,9 @@ private void openReader(Path path) throws IOException {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) throw ioe;
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -44,18 +44,20 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

/**
Expand Down Expand Up @@ -87,6 +89,8 @@ public class TestReplicationBase {
NB_ROWS_IN_BATCH * 10;
protected static final long SLEEP_TIME = 500;
protected static final int NB_RETRIES = 50;
protected static AtomicInteger replicateCount = new AtomicInteger();
protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();

protected static final TableName tableName = TableName.valueOf("test");
protected static final byte[] famName = Bytes.toBytes("f");
Expand Down Expand Up @@ -281,7 +285,8 @@ private boolean peerExist(String peerId) throws IOException {
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer());
.setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
ReplicationEndpointTest.class.getName());
if (isSyncPeer()) {
FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to
Expand Down Expand Up @@ -378,4 +383,20 @@ public static void tearDownAfterClass() throws Exception {
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}

/**
* Custom replication endpoint to keep track of replication status for tests.
*/
public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
public ReplicationEndpointTest() {
replicateCount.set(0);
}

@Override public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
replicatedEntries.addAll(replicateContext.getEntries());

return super.replicate(replicateContext);
}
}
}
Loading

0 comments on commit 3f1c486

Please sign in to comment.