From 19fe18e466a2c4eb72c5b3c9dfbbbbb09e11df46 Mon Sep 17 00:00:00 2001 From: Sandeep Pal <50725353+sandeepvinayak@users.noreply.github.com> Date: Tue, 23 Feb 2021 13:20:00 -0800 Subject: [PATCH] HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975) Signed-off-by: Andrew Purtell --- .../regionserver/ReplicationSource.java | 5 + .../ReplicationSourceManager.java | 2 +- .../ReplicationSourceWALReaderThread.java | 208 ++++++++++---- .../regionserver/WALEntryStream.java | 22 +- .../replication/TestReplicationSource.java | 265 +++++++++++++++--- .../TestReplicationSourceBase.java | 1 - .../regionserver/TestWALEntryStream.java | 56 +++- 7 files changed, 443 insertions(+), 116 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a58289e6baa2..969e8caff387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -221,6 +221,11 @@ public void enqueueLog(Path log) { } } + @InterfaceAudience.Private + public Map> getQueues() { + return logQueue.getQueues(); + } + @Override public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index d9435a39c676..a8e8e76e2a35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -183,7 +183,7 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues, * position. It will also clean old logs from the queue. * @param log Path to the log currently being replicated from * replication status in zookeeper. It will also delete older entries. - * @param id id of the peer cluster + * @param id id of the replication queue * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server * @param holdLogInZK if true then the log is retained in ZK diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index bd155d5642b6..a1d64caab1b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.wal.WALKey; /** - * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue + * Reads and filters WAL entries, groups the filtered entries into batches, + * and puts the batches onto a queue * */ @InterfaceAudience.Private @@ -88,7 +89,7 @@ public class ReplicationSourceWALReaderThread extends Thread { * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. * @param manager replication manager - * @param replicationQueueInfo + * @param replicationQueueInfo replication queue info * @param logQueue The WAL queue to read off of * @param startPosition position in the first WAL to start reading from * @param fs the files system to use @@ -135,71 +136,128 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, @Override public void run() { int sleepMultiplier = 1; - while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) { - while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); - continue; - } - if (!checkQuota()) { - continue; - } - WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity); - boolean hasNext; - while ((hasNext = entryStream.hasNext()) == true) { - Entry entry = entryStream.next(); - entry = filterEntry(entry); - if (entry != null) { - WALEdit edit = entry.getEdit(); - if (edit != null && !edit.isEmpty()) { - long entrySize = getEntrySizeIncludeBulkLoad(entry); - long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); - batch.addEntry(entry, entrySize); - updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); - // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity + WALEntryBatch batch = null; + WALEntryStream entryStream = + new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId); + try { + while (isReaderRunning()) { // we only loop back here if something fatal happens to stream + try { + entryStream = new WALEntryStream(logQueue, fs, conf, + lastReadPosition, metrics, walGroupId); + while (isReaderRunning()) { // loop here to keep reusing stream while we can + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } + if (!checkQuota()) { + continue; + } + batch = new WALEntryBatch(replicationBatchCountCapacity); + boolean hasNext = entryStream.hasNext(); + while (hasNext) { + Entry entry = entryStream.next(); + entry = filterEntry(entry); + if (entry != null) { + WALEdit edit = entry.getEdit(); + if (edit != null && !edit.isEmpty()) { + long entrySize = getEntrySizeIncludeBulkLoad(entry); + long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); + batch.addEntry(entry, entrySize); + updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); + boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); + // Stop if too many entries or too big + if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity) { - break; + break; + } } } + hasNext = entryStream.hasNext(); } - } - updateBatch(entryStream, batch, hasNext); - if (isShippable(batch)) { - sleepMultiplier = 1; - entryBatchQueue.put(batch); - if (!batch.hasMoreEntries()) { - // we're done with queue recovery, shut ourselves down - setReaderRunning(false); + // If the batch has data to max capacity or stream doesn't have anything + // try to ship it + if (updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) { + sleepMultiplier = 1; } + } + } catch (IOException | WALEntryStreamRuntimeException e) { // stream related + if (handleEofException(e, entryStream, batch)) { + sleepMultiplier = 1; } else { - Thread.sleep(sleepForRetries); + if (sleepMultiplier < maxRetriesMultiplier) { + LOG.debug("Failed to read stream of replication entries: " + e); + sleepMultiplier++; + } else { + LOG.error("Failed to read stream of replication entries", e); + } + Threads.sleep(sleepForRetries * sleepMultiplier); } - resetStream(entryStream); - } - } catch (IOException | WALEntryStreamRuntimeException e) { // stream related - if (sleepMultiplier < maxRetriesMultiplier) { - LOG.debug("Failed to read stream of replication entries: " + e); - sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads"); + Thread.currentThread().interrupt(); + } finally { + entryStream.close(); } - Threads.sleep(sleepForRetries * sleepMultiplier); - } catch (InterruptedException e) { - LOG.trace("Interrupted while sleeping between WAL reads"); - Thread.currentThread().interrupt(); } + } catch (IOException e) { + if (sleepMultiplier < maxRetriesMultiplier) { + LOG.debug("Failed to read stream of replication entries: " + e); + sleepMultiplier++; + } else { + LOG.error("Failed to read stream of replication entries", e); + } + Threads.sleep(sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads"); + Thread.currentThread().interrupt(); + } + } + + /** + * Update the batch try to ship and return true if shipped + * @param entryStream stream of the WALs + * @param batch Batch of entries to ship + * @param hasMoreData if the stream has more yet more data to read + * @param isEOFException if we have hit the EOF exception before this. For EOF exception, + * we do not want to reset the stream since entry stream doesn't + * have correct information. + * @return if batch is shipped successfully + * @throws InterruptedException throws interrupted exception + * @throws IOException throws io exception from stream + */ + private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch, + boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException { + updateBatch(entryStream, batch, hasMoreData, isEOFException); + boolean isDataQueued = false; + if (isShippable(batch)) { + isDataQueued = true; + entryBatchQueue.put(batch); + if (!batch.hasMoreEntries()) { + // we're done with queue recovery, shut ourselves down + LOG.debug("Stopping the reader after recovering the queue"); + setReaderRunning(false); + } + } else { + Thread.sleep(sleepForRetries); + } + + if (!isEOFException) { + resetStream(entryStream); } + return isDataQueued; } - private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) { + private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData, + boolean isEOFException) { logMessage(batch); - batch.updatePosition(entryStream); + // In case of EOF exception we can utilize the last read path and position + // since we do not have the current information. + if (isEOFException) { + batch.updatePosition(lastReadPath, lastReadPosition); + } else { + batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition()); + } batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData); } @@ -229,10 +287,18 @@ private void resetStream(WALEntryStream stream) throws IOException { stream.reset(); // reuse stream } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(Exception e) { + /** + * This is to handle the EOFException from the WAL entry stream. EOFException should + * be handled carefully because there are chances of data loss because of never replicating + * the data. + * If EOFException happens on the last log in recovered queue, we can safely stop + * the reader. + * If EOException doesn't happen on the last log in recovered queue, we should + * not stop the reader. + * @return true only the IOE can be handled + */ + private boolean handleEofException(Exception e, WALEntryStream entryStream, + WALEntryBatch batch) throws InterruptedException { boolean isRecoveredSource = manager.getOldSources().contains(source); PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't @@ -245,11 +311,22 @@ private void handleEofException(Exception e) { lastReadPath = queue.peek(); logQueue.remove(walGroupId); lastReadPosition = 0; + + // If it was on last log in the recovered queue, + // the stream doesn't have more data, we should stop the reader + boolean hasMoreData = !queue.isEmpty(); + // After we removed the WAL from the queue, we should + // try shipping the existing batch of entries, we do not want to reset + // stream since entry stream doesn't have the correct data at this point + updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true); + return true; } } catch (IOException ioe) { LOG.warn("Couldn't get file length information about log " + queue.peek()); } } + + return false; } public Path getCurrentPath() { @@ -299,7 +376,8 @@ public long getEntrySizeExcludeBulkLoad(Entry entry) { return edit.heapSize() + key.estimatedSerializedSizeOf(); } - private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { + private void updateBatchStats(WALEntryBatch batch, Entry entry, + long entryPosition, long entrySize) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { batch.incrementHeapSize(entrySize); @@ -409,7 +487,7 @@ public long getLastReadPosition() { * Holds a batch of WAL entries to replicate, along with some statistics * */ - static class WALEntryBatch { + final static class WALEntryBatch { private List> walEntriesWithSize; // last WAL that was read private Path lastWalPath; @@ -515,9 +593,15 @@ public boolean isEmpty() { return walEntriesWithSize.isEmpty(); } - public void updatePosition(WALEntryStream entryStream) { - lastWalPath = entryStream.getCurrentPath(); - lastWalPosition = entryStream.getPosition(); + /** + * Update the wal entry batch with latest wal and position which will be used by + * shipper to update the log position in ZK node + * @param currentPath the path of WAL + * @param currentPosition the position of the WAL + */ + public void updatePosition(Path currentPath, long currentPosition) { + lastWalPath = currentPath; + lastWalPosition = currentPosition; } public boolean hasMoreEntries() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index a0b09dd894ac..c66788109f19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -72,25 +72,23 @@ public class WALEntryStream implements Iterator, Closeable, IterablenewArrayList(source)); + } return source; } @@ -321,48 +329,54 @@ public AtomicLong getTotalBufferUsed() { @Test public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception { final int numWALEntries = 5; - conf.setInt("replication.source.nb.capacity", numWALEntries); + int nbCapacity = conf.getInt("replication.source.nb.capacity", 25000); + try { + conf.setInt("replication.source.nb.capacity", numWALEntries); - Mocks mocks = new Mocks(); - final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { - @Override - public WALEntryFilter getWALEntryfilter() { - return null; - } - }; - WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); - final Path log1 = new Path(logDir, "log.1"); - final Path log2 = new Path(logDir, "log.2"); + Mocks mocks = new Mocks(); + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); - WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); - WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer1 + = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer2 + = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); - appendEntries(writer1, 3); - appendEntries(writer2, 2); + appendEntries(writer1, 3); + appendEntries(writer2, 2); - long pos = getPosition(wals, log2, 2); + long pos = getPosition(wals, log2, 2); - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); - source.run(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); + source.run(); - source.enqueueLog(log1); - // log rolled - source.enqueueLog(log2); + source.enqueueLog(log1); + // log rolled + source.enqueueLog(log2); - Waiter.waitFor(conf, 20000, new Waiter.Predicate() { - @Override public boolean evaluate() throws Exception { - return endpoint.replicateCount.get() > 0; - } - }); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.replicateCount.get() > 0; + } + }); - ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); - ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); - verify(mocks.manager, times(1)) + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + verify(mocks.manager, times(1)) .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), - anyBoolean(), anyBoolean()); - assertTrue(endpoint.lastEntries.size() == 5); - assertThat(pathCaptor.getValue(), is(log2)); - assertThat(positionCaptor.getValue(), is(pos)); + anyBoolean(), anyBoolean()); + assertTrue(endpoint.lastEntries.size() == 5); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(pos)); + } finally { + conf.setInt("replication.source.nb.capacity", nbCapacity); + } } @Test @@ -405,7 +419,7 @@ public WALEntryFilter getWALEntryfilter() { writer.close(); Mocks mocks = new Mocks(); - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); source.run(); source.enqueueLog(log); @@ -423,7 +437,7 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exc Mocks mocks = new Mocks(); final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); final Path log1 = new Path(logDir, "log.1"); @@ -475,7 +489,7 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exc final long pos = getPosition(wals, log2, 2); final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); source.enqueueLog(log1); source.enqueueLog(log2); source.run(); @@ -529,7 +543,7 @@ public WALEntryFilter getWALEntryfilter() { } }; - final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false); source.run(); source.enqueueLog(log1); @@ -556,6 +570,173 @@ public WALEntryFilter getWALEntryfilter() { }); } + @Test + public void testReplicationOnEmptyLogAtTheEndOfQueueWithMultipleLogs() throws Exception { + final String logPrefix = "logPrefix"; + Mocks mocks = new Mocks(); + // set table cfs to filter all cells out + final TableName replicatedTable = TableName.valueOf("replicated_table"); + final Map> cfs = + Collections.singletonMap(replicatedTable, Collections.emptyList()); + when(mocks.peer.getTableCFs()).thenReturn(cfs); + + // Append 3 entries in a log + final Path log1 = new Path(logDir, logPrefix + ".1"); + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + appendEntries(writer1, 3); + + // Create a 0 length log. + Path emptyLog = new Path(logDir, logPrefix + ".2"); + FSDataOutputStream fsdos = FS.create(emptyLog); + fsdos.close(); + assertEquals(0, FS.getFileStatus(emptyLog).getLen()); + + // Replication end point with no filter + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true); + source.run(); + source.enqueueLog(log1); + source.enqueueLog(emptyLog); + + // Wait for source to replicate + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.replicateCount.get() == 1; + } + }); + + // Wait and verify if all the entries get replicated for non empty logs + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.lastEntries.size() == 3; + } + }); + + // Wait and verify if log queue has been drained fully + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return source.getQueues().get(logPrefix).isEmpty(); + } + }); + } + + @Test + public void testReplicationOnEmptyLogAtTheEndOfQueueWithSingleLog() throws Exception { + final String logPrefix = "logPrefix"; + Mocks mocks = new Mocks(); + // set table cfs to filter all cells out + final TableName replicatedTable = TableName.valueOf("replicated_table"); + final Map> cfs = + Collections.singletonMap(replicatedTable, Collections.emptyList()); + when(mocks.peer.getTableCFs()).thenReturn(cfs); + + // Create a 0 length log. + Path emptyLog = new Path(logDir, logPrefix + ".1"); + FSDataOutputStream fsdos = FS.create(emptyLog); + fsdos.close(); + assertEquals(0, FS.getFileStatus(emptyLog).getLen()); + + // Replication end point with no filter + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true); + source.run(); + source.enqueueLog(emptyLog); + + // Wait and verify if no entry got replicated + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.lastEntries == null; + } + }); + + // Wait and verify get is queue is empty + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return source.getQueues().get(logPrefix).isEmpty(); + } + }); + } + + @Test + public void testReplicationOnEmptyLogBetweenTheNonEmptyLogsInLogQueue() throws Exception { + final String logPrefix = "logPrefix"; + Mocks mocks = new Mocks(); + // set table cfs to filter all cells out + final TableName replicatedTable = TableName.valueOf("replicated_table"); + final Map> cfs = + Collections.singletonMap(replicatedTable, Collections.emptyList()); + when(mocks.peer.getTableCFs()).thenReturn(cfs); + + // Append 3 entries in a log + final Path log1 = new Path(logDir, logPrefix + ".11"); + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + appendEntries(writer1, 3); + + // Create a 0 length log. + Path emptyLog = new Path(logDir, logPrefix + ".12"); + FSDataOutputStream fsdos = FS.create(emptyLog); + fsdos.close(); + assertEquals(0, FS.getFileStatus(emptyLog).getLen()); + + // Append 5 entries in a log + final Path log3 = new Path(logDir, logPrefix + ".13"); + WALProvider.Writer writer3 = WALFactory.createWALWriter(FS, log3, TEST_UTIL.getConfiguration()); + appendEntries(writer3, 5); + + // Append 10 entries in a log + final Path log4 = new Path(logDir, logPrefix + ".14"); + WALProvider.Writer writer4 = WALFactory.createWALWriter(FS, log4, TEST_UTIL.getConfiguration()); + appendEntries(writer4, 10); + + // Replication end point with no filter + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true); + source.run(); + source.enqueueLog(log1); + source.enqueueLog(emptyLog); + source.enqueueLog(log3); + source.enqueueLog(log4); + + // Wait for source to replicate + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.replicateCount.get() == 2; + } + }); + + // Wait and verify the last replicated entries + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return endpoint.lastEntries.size() == 15; + } + }); + + // Wait and verify only one log is there in queue + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return source.getQueues().get(logPrefix).size() == 1; + } + }); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java index e94985eb9346..ab4d19d9985e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java @@ -76,7 +76,6 @@ public abstract class TestReplicationSourceBase { protected static DummyServer server; @BeforeClass public static void setUpBeforeClass() throws Exception { - conf = HBaseConfiguration.create(); conf.set("replication.replicationsource.implementation", ReplicationSourceDummyWithNoTermination.class.getCanonicalName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index adf427b39e3f..0b01c5f656a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -741,9 +742,45 @@ public void testEOFExceptionForRecoveredQueue() throws Exception { new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId); reader.run(); + assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); + } + + @Test + public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception { + ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, getMockMetrics()); + // Create a 0 length log. + Path emptyLog = new Path("log.2"); + FSDataOutputStream fsdos = fs.create(emptyLog); + fsdos.close(); + assertEquals(0, fs.getFileStatus(emptyLog).getLen()); + localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); + + final Path log1 = new Path("log.1"); + WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration()); + appendEntries(writer1, 3); + localLogQueue.enqueueLog(log1, fakeWalGroupId); + + ReplicationSource source = Mockito.mock(ReplicationSource.class); + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + // Make it look like the source is from recovered source. + when(mockSourceManager.getOldSources()) + .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source))); + when(source.isPeerEnabled()).thenReturn(true); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + // Override the max retries multiplier to fail fast. + conf.setInt("replication.source.maxretriesmultiplier", 1); + conf.setBoolean("replication.source.eof.autorecovery", true); + // Create a reader thread. + ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), + localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId); + assertEquals("Initial log queue size is not correct", + 2, localLogQueue.getQueueSize(fakeWalGroupId)); + reader.run(); + // ReplicationSourceWALReaderThread#handleEofException method will // remove empty log from logQueue. - assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId)); + assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId)); } private PriorityBlockingQueue getQueue() { @@ -757,4 +794,21 @@ private MetricsSource getMockMetrics() { doNothing().when(source).setOldestWalAge(Mockito.anyInt()); return source; } + + private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { + for (int i = 0; i < numEntries; i++) { + byte[] b = Bytes.toBytes(Integer.toString(i)); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, + HConstants.DEFAULT_CLUSTER_ID); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + key.setScopes(scopes); + writer.append(new WAL.Entry(key, edit)); + writer.sync(false); + } + writer.close(); + } }