diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 124da63b9fcb..5acb70922f83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; @@ -439,14 +440,30 @@ public String getPeerClusterId() { } @Override + @VisibleForTesting public Path getCurrentPath() { - // only for testing for (ReplicationSourceShipperThread worker : workerThreads.values()) { if (worker.getCurrentPath() != null) return worker.getCurrentPath(); } return null; } + @VisibleForTesting + public Path getLastLoggedPath() { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { + return worker.getLastLoggedPath(); + } + return null; + } + + @VisibleForTesting + public long getLastLoggedPosition() { + for (ReplicationSourceShipperThread worker : workerThreads.values()) { + return worker.getLastLoggedPosition(); + } + return 0; + } + private boolean isSourceActive() { return !this.stopper.isStopped() && this.sourceRunning; } @@ -481,8 +498,8 @@ public String getStats() { for (Map.Entry entry : workerThreads.entrySet()) { String walGroupId = entry.getKey(); ReplicationSourceShipperThread worker = entry.getValue(); - long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + long position = worker.getLastLoggedPosition(); + Path currentPath = worker.getLastLoggedPath(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") @@ -517,7 +534,7 @@ public Map getWalGroupStatus() { int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); - Path currentPath = worker.getCurrentPath(); + Path currentPath = worker.getLastLoggedPath(); fileSize = -1; if (currentPath != null) { try { @@ -535,7 +552,7 @@ public Map getWalGroupStatus() { .withQueueSize(queueSize) .withWalGroup(walGroupId) .withCurrentPath(currentPath) - .withCurrentPosition(worker.getCurrentPosition()) + .withCurrentPosition(worker.getLastLoggedPosition()) .withFileSize(fileSize) .withAgeOfLastShippedOp(ageOfLastShippedOp) .withReplicationDelay(replicationDelay); @@ -555,7 +572,7 @@ public class ReplicationSourceShipperThread extends Thread { // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log - private volatile Path currentPath; + private volatile Path lastLoggedPath; // Current state of the worker thread private WorkerState state; ReplicationSourceWALReaderThread entryReader; @@ -600,13 +617,11 @@ public void run() { WALEntryBatch entryBatch = entryReader.take(); shipEdits(entryBatch); releaseBufferQuota((int) entryBatch.getHeapSize()); - if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty() - && entryBatch.getLastSeqIds().isEmpty()) { - LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " - + peerClusterZnode); + if (!entryBatch.hasMoreEntries()) { + LOG.debug("Finished recovering queue for group " + + walGroupId + " of peer " + peerClusterZnode); metrics.incrCompletedRecoveryQueue(); setWorkerState(WorkerState.FINISHED); - continue; } } catch (InterruptedException e) { LOG.trace("Interrupted while waiting for next replication entry batch", e); @@ -614,7 +629,7 @@ public void run() { } } - if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) { + if (getWorkerState() == WorkerState.FINISHED) { // use synchronize to make sure one last thread will clean the queue synchronized (this) { Threads.sleep(100);// wait a short while for other worker thread to fully exit @@ -694,15 +709,13 @@ private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { protected void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); long lastReadPosition = entryBatch.getLastWalPosition(); - currentPath = entryBatch.getLastWalPath(); + lastLoggedPath = entryBatch.getLastWalPath(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition); - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); - } + updateLogPosition(lastReadPosition); + // if there was nothing to ship and it's not an error + // set "ageOfLastShippedOp" to to indicate that we're current + metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -787,8 +800,7 @@ protected void shipEdits(WALEntryBatch entryBatch) { } private void updateLogPosition(long lastReadPosition) { - manager.setPendingShipment(false); - manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition, + manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition, this.replicationQueueInfo.isQueueRecovered(), false); lastLoggedPosition = lastReadPosition; } @@ -800,7 +812,7 @@ public void startup() { public void uncaughtException(final Thread t, final Throwable e) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath=" - + getCurrentPath(), e); + + getLastLoggedPath(), e); stopper.stop("Unexpected exception in ReplicationSourceWorkerThread"); } }; @@ -941,8 +953,12 @@ public Path getCurrentPath() { return this.entryReader.getCurrentPath(); } - public long getCurrentPosition() { - return this.lastLoggedPosition; + public Path getLastLoggedPath() { + return lastLoggedPath; + } + + public long getLastLoggedPosition() { + return lastLoggedPosition; } private boolean isWorkerActive() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 6b8b6e273c47..0e3724a5a1ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -123,8 +123,6 @@ public class ReplicationSourceManager implements ReplicationListener { private AtomicLong totalBufferUsed = new AtomicLong(); - private boolean pendingShipment; - /** * Creates a replication manager and sets the watch on all the other registered region servers * @param replicationQueues the interface for manipulating replication queues @@ -191,19 +189,13 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues, * @param holdLogInZK if true then the log is retained in ZK */ public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position, - boolean queueRecovered, boolean holdLogInZK) { - if (!this.pendingShipment) { - String fileName = log.getName(); - this.replicationQueues.setLogPosition(id, fileName, position); - if (holdLogInZK) { - return; - } - cleanOldLogs(fileName, id, queueRecovered); + boolean queueRecovered, boolean holdLogInZK) { + String fileName = log.getName(); + this.replicationQueues.setLogPosition(id, fileName, position); + if (holdLogInZK) { + return; } - } - - public synchronized void setPendingShipment(boolean pendingShipment) { - this.pendingShipment = pendingShipment; + cleanOldLogs(fileName, id, queueRecovered); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 1d94a7a2c815..58555742ecfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -21,9 +21,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -69,7 +67,8 @@ public class ReplicationSourceWALReaderThread extends Thread { // max count of each batch - multiply by number of batches in queue to get total private int replicationBatchCountCapacity; // position in the WAL to start reading at - private long currentPosition; + private long lastReadPosition; + private Path lastReadPath; private WALEntryFilter filter; private long sleepForRetries; //Indicates whether this particular worker is running @@ -81,8 +80,6 @@ public class ReplicationSourceWALReaderThread extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; - private ReplicationSourceManager replicationSourceManager; - /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -101,7 +98,8 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) { this.replicationQueueInfo = replicationQueueInfo; this.logQueue = logQueue; - this.currentPosition = startPosition; + this.lastReadPath = logQueue.peek(); + this.lastReadPosition = startPosition; this.fs = fs; this.conf = conf; this.filter = filter; @@ -111,7 +109,6 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, // memory used will be batchSizeCapacity * (nb.batches + 1) // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); - this.replicationSourceManager = manager; this.totalBufferUsed = manager.getTotalBufferUsed(); this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); @@ -133,61 +130,45 @@ public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { + new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!checkQuota()) { continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { - if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); - } + WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { long entrySize = getEntrySizeIncludeBulkLoad(entry); - long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); + long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); - replicationSourceManager.setPendingShipment(true); updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); + boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || batch.getNbEntries() >= replicationBatchCountCapacity) { break; } } - } else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), - this.replicationQueueInfo.getPeerClusterZnode(), - entryStream.getPosition(), - this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { - if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", - batch.getNbEntries())); - } - entryBatchQueue.put(batch); + + updateBatch(entryStream, batch, hasNext); + if (isShippable(batch)) { sleepMultiplier = 1; - } else { // got no entries and didn't advance position in WAL - LOG.trace("Didn't read any new entries from WAL"); - if (replicationQueueInfo.isQueueRecovered()) { - // we're done with queue recovery, shut ourself down + entryBatchQueue.put(batch); + if (!batch.hasMoreEntries()) { + // we're done with queue recovery, shut ourselves down setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); - } else { - Thread.sleep(sleepForRetries); } + } else { + Thread.sleep(sleepForRetries); } - currentPosition = entryStream.getPosition(); - entryStream.reset(); // reuse stream + resetStream(entryStream); } } catch (IOException | WALEntryStreamRuntimeException e) { // stream related if (sleepMultiplier < maxRetriesMultiplier) { @@ -205,6 +186,38 @@ public void run() { } } + private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) { + logMessage(batch); + batch.updatePosition(entryStream); + batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData); + } + + private void logMessage(WALEntryBatch batch) { + if (LOG.isTraceEnabled()) { + if (batch.isEmpty()) { + LOG.trace("Didn't read any new entries from WAL"); + } else { + LOG.trace(String.format("Read %s WAL entries eligible for replication", + batch.getNbEntries())); + } + } + } + + private boolean isShippable(WALEntryBatch batch) { + return !batch.isEmpty() || checkIfWALRolled(batch) || !batch.hasMoreEntries(); + } + + private boolean checkIfWALRolled(WALEntryBatch batch) { + return lastReadPath == null && batch.lastWalPath != null + || lastReadPath != null && !lastReadPath.equals(batch.lastWalPath); + } + + private void resetStream(WALEntryStream stream) throws IOException { + lastReadPosition = stream.getPosition(); + lastReadPath = stream.getCurrentPath(); + stream.reset(); // reuse stream + } + // if we get an EOF due to a zero-length log, and there are other logs in queue // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is // enabled, then dump the log @@ -214,8 +227,8 @@ private void handleEofException(Exception e) { try { if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; + lastReadPath = logQueue.remove(); + lastReadPosition = 0; } } catch (IOException ioe) { LOG.warn("Couldn't get file length information about log " + logQueue.peek()); @@ -224,12 +237,6 @@ private void handleEofException(Exception e) { } public Path getCurrentPath() { - // if we've read some WAL entries, get the Path we read from - WALEntryBatch batchQueueHead = entryBatchQueue.peek(); - if (batchQueueHead != null) { - return batchQueueHead.lastWalPath; - } - // otherwise, we must be currently reading from the head of the log queue return logQueue.peek(); } @@ -380,6 +387,10 @@ public void setReaderRunning(boolean readerRunning) { this.isReaderRunning = readerRunning; } + public long getLastReadPosition() { + return this.lastReadPosition; + } + /** * Holds a batch of WAL entries to replicate, along with some statistics * @@ -396,17 +407,14 @@ static class WALEntryBatch { private int nbHFiles = 0; // heap size of data we need to replicate private long heapSize = 0; - // save the last sequenceid for each region if the table has serial-replication scope - private Map lastSeqIds = new HashMap<>(); + // whether more entries to read exist in WALs or not + private boolean moreEntries = true; /** - * @param walEntries - * @param lastWalPath Path of the WAL the last entry in this batch was read from - * @param lastWalPosition Position in the WAL the last entry in this batch was read from + * @param maxNbEntries the number of entries a batch can have */ - private WALEntryBatch(int maxNbEntries, Path lastWalPath) { + private WALEntryBatch(int maxNbEntries) { this.walEntries = new ArrayList<>(maxNbEntries); - this.lastWalPath = lastWalPath; } public void addEntry(Entry entry) { @@ -466,13 +474,6 @@ public long getHeapSize() { return heapSize; } - /** - * @return the last sequenceid for each region if the table has serial-replication scope - */ - public Map getLastSeqIds() { - return lastSeqIds; - } - private void incrementNbRowKeys(int increment) { nbRowKeys += increment; } @@ -484,5 +485,22 @@ private void incrementNbHFiles(int increment) { private void incrementHeapSize(long increment) { heapSize += increment; } + + public boolean isEmpty() { + return walEntries.isEmpty(); + } + + public void updatePosition(WALEntryStream entryStream) { + lastWalPath = entryStream.getCurrentPath(); + lastWalPosition = entryStream.getPosition(); + } + + public boolean hasMoreEntries() { + return moreEntries; + } + + public void setMoreEntries(boolean moreEntries) { + this.moreEntries = moreEntries; + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 5d9059a7eb5a..b4ac71bf5c8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -16,13 +16,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -34,36 +51,38 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import static org.mockito.Mockito.mock; - @Category(MediumTests.class) public class TestReplicationSource { @@ -92,6 +111,32 @@ public static void setUpBeforeClass() throws Exception { if (FS.exists(logDir)) FS.delete(logDir, true); } + @Before + public void setup() throws IOException { + if (!FS.exists(logDir)) { + FS.mkdirs(logDir); + } + if (!FS.exists(oldLogDir)) { + FS.mkdirs(oldLogDir); + } + + ReplicationEndpointForTest.contructedCount.set(0); + ReplicationEndpointForTest.startedCount.set(0); + ReplicationEndpointForTest.replicateCount.set(0); + ReplicationEndpointForTest.stoppedCount.set(0); + ReplicationEndpointForTest.lastEntries = null; + } + + @After + public void tearDown() throws IOException { + if (FS.exists(oldLogDir)) { + FS.delete(oldLogDir, true); + } + if (FS.exists(logDir)) { + FS.delete(logDir, true); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL_PEER.shutdownMiniHBaseCluster(); @@ -108,8 +153,6 @@ public static void tearDownAfterClass() throws Exception { @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); - if (!FS.exists(logDir)) FS.mkdirs(logDir); - if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, TEST_UTIL.getConfiguration()); for(int i = 0; i < 3; i++) { @@ -166,7 +209,6 @@ protected void doStop() { Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, replicationEndpoint, null); ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -189,10 +231,184 @@ public boolean evaluate() throws Exception { } + private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { + for (int i = 0; i < numEntries; i++) { + byte[] b = Bytes.toBytes(Integer.toString(i)); + KeyValue kv = new KeyValue(b,b,b); + WALEdit edit = new WALEdit(); + edit.add(kv); + WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, + HConstants.DEFAULT_CLUSTER_ID); + NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); + key.setScopes(scopes); + writer.append(new WAL.Entry(key, edit)); + writer.sync(false); + } + writer.close(); + } + + private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOException { + WAL.Reader reader = wals.createReader(FS, log2); + for (int i = 0; i < numEntries; i++) { + reader.next(); + } + return reader.getPosition(); + } + + private static final class Mocks { + private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class); + private final ReplicationQueues queues = mock(ReplicationQueues.class); + private final ReplicationPeers peers = mock(ReplicationPeers.class); + private final MetricsSource metrics = mock(MetricsSource.class); + private final ReplicationPeer peer = mock(ReplicationPeer.class); + private final ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); + + private Mocks() { + when(peers.getStatusOfPeer(anyString())).thenReturn(true); + when(context.getReplicationPeer()).thenReturn(peer); + when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + } + + ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint) + throws IOException { + final ReplicationSource source = new ReplicationSource(); + endpoint.init(context); + source.init(conf, FS, manager, queues, peers, mock(Stoppable.class), + "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics); + return source; + } + } + + @Test + public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception { + final int numWALEntries = 5; + conf.setInt("replication.source.nb.capacity", numWALEntries); + + Mocks mocks = new Mocks(); + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() { + @Override + public WALEntryFilter getWALEntryfilter() { + return null; + } + }; + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); + + appendEntries(writer1, 3); + appendEntries(writer2, 2); + + long pos = getPosition(wals, log2, 2); + + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + source.run(); + + source.enqueueLog(log1); + // log rolled + source.enqueueLog(log2); + + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return endpoint.replicateCount.get() > 0; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertTrue(endpoint.lastEntries.size() == 5); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(pos)); + } + + @Test + public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exception { + Mocks mocks = new Mocks(); + + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()).close(); + WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()).close(); + final long startPos = getPosition(wals, log2, 0); + + source.run(); + source.enqueueLog(log1); + source.enqueueLog(log2); + + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return log2.equals(source.getLastLoggedPath()) + && source.getLastLoggedPosition() >= startPos; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(startPos)); + } + + @Test + public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exception { + Mocks mocks = new Mocks(); + // set table cfs to filter all cells out + final TableName replicatedTable = TableName.valueOf("replicated_table"); + final Map> cfs = + Collections.singletonMap(replicatedTable, Collections.emptyList()); + when(mocks.peer.getTableCFs()).thenReturn(cfs); + + WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test"); + final Path log1 = new Path(logDir, "log.1"); + final Path log2 = new Path(logDir, "log.2"); + + WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()); + WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()); + + appendEntries(writer1, 3); + appendEntries(writer2, 2); + final long pos = getPosition(wals, log2, 2); + + final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest(); + final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint); + source.enqueueLog(log1); + source.enqueueLog(log2); + source.run(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + // wait until reader read all cells + return log2.equals(source.getLastLoggedPath()) && source.getLastLoggedPosition() >= pos; + } + }); + + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); + + // all old wals should be removed by updating wal position, even if all cells are filtered out. + verify(mocks.manager, times(1)) + .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(), + anyBoolean(), anyBoolean()); + assertThat(pathCaptor.getValue(), is(log2)); + assertThat(positionCaptor.getValue(), is(pos)); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 - * @throws Exception */ @Test public void testServerShutdownRecoveredQueue() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 9f077da57a97..7ad7260c3c38 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -23,17 +23,15 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,17 +45,22 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -77,7 +80,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; @@ -358,8 +360,9 @@ public void testReplicationSourceWALReaderThread() throws Exception { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, - fs, conf, getDummyFilter(), new MetricsSource("1")); + ReplicationSourceWALReaderThread batcher = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0, + fs, conf, getDummyFilter(), new MetricsSource("1")); Path walPath = walQueue.peek(); batcher.start(); WALEntryBatch entryBatch = batcher.take(); @@ -378,37 +381,36 @@ public void testReplicationSourceWALReaderThread() throws Exception { } @Test - public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception { appendEntriesToLog(3); - // get ending position + log.rollWriter(); + appendEntriesToLog(2); + long position; - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue), + fs, conf, new MetricsSource("1"))) { + entryStream.next(); + entryStream.next(); entryStream.next(); entryStream.next(); entryStream.next(); position = entryStream.getPosition(); } - // start up a readerThread with a WALEntryFilter that always filter the entries - ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread( - mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() { - @Override - public Entry filter(Entry entry) { - return null; - } - }, new MetricsSource("1")); - readerThread.start(); - Thread.sleep(100); - ArgumentCaptor positionCaptor = ArgumentCaptor.forClass(Long.class); - verify(mockSourceManager, times(3)) - .logPositionAndCleanOldLogs(any(Path.class), - anyString(), - positionCaptor.capture(), - anyBoolean(), - anyBoolean()); - assertEquals(position, positionCaptor.getValue().longValue()); + ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(), + walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1")); + Path walPath = walQueue.toArray(new Path[2])[1]; + reader.start(); + WALEntryBatch entryBatch = reader.take(); + + assertNotNull(entryBatch); + assertEquals(5, entryBatch.getWalEntries().size()); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertFalse(entryBatch.hasMoreEntries()); } @Test @@ -436,6 +438,96 @@ public void testWALKeySerialization() throws Exception { } } + @Test + public void testReplicationSourceWALReaderThreadWithFilter() throws Exception { + final byte[] notReplicatedCf = Bytes.toBytes("notReplicated"); + final Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family))); + ReplicationPeer peer = mock(ReplicationPeer.class); + when(peer.getTableCFs()).thenReturn(tableCfs); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + + // add filterable entries + appendToLogPlus(3, notReplicatedCf); + appendToLogPlus(3, notReplicatedCf); + appendToLogPlus(3, notReplicatedCf); + + // add non filterable entries + appendEntriesToLog(2); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + final ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, + 0, fs, conf, filter, new MetricsSource("1")); + reader.start(); + + WALEntryBatch entryBatch = reader.take(); + + assertNotNull(entryBatch); + assertFalse(entryBatch.isEmpty()); + List walEntries = entryBatch.getWalEntries(); + assertEquals(2, walEntries.size()); + for (Entry entry : walEntries) { + ArrayList cells = entry.getEdit().getCells(); + assertTrue(cells.size() == 1); + assertTrue(CellUtil.matchingFamily(cells.get(0), family)); + } + } + + @Test + public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() throws Exception { + final byte[] notReplicatedCf = Bytes.toBytes("notReplicated"); + final Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family))); + ReplicationPeer peer = mock(ReplicationPeer.class); + when(peer.getTableCFs()).thenReturn(tableCfs); + WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + + appendToLogPlus(3, notReplicatedCf); + + Path firstWAL = walQueue.peek(); + final long eof = getPosition(firstWAL); + + ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + final ReplicationSourceWALReaderThread reader = + new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, + 0, fs, conf, filter, new MetricsSource("1")); + reader.start(); + + // reader won't put any batch, even if EOF reached. + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() { + return reader.getLastReadPosition() >= eof; + } + }); + assertNull(reader.poll(0)); + + log.rollWriter(); + + // should get empty batch with current wal position, after wal rolled + WALEntryBatch entryBatch = reader.take(); + + Path lastWAL= walQueue.peek(); + long positionToBeLogged = getPosition(lastWAL); + + assertNotNull(entryBatch); + assertTrue(entryBatch.isEmpty()); + assertEquals(1, walQueue.size()); + assertNotEquals(firstWAL, entryBatch.getLastWalPath()); + assertEquals(lastWAL, entryBatch.getLastWalPath()); + assertEquals(positionToBeLogged, entryBatch.getLastWalPosition()); + } + + private long getPosition(Path walPath) throws IOException { + WALEntryStream entryStream = + new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)), + fs, conf, new MetricsSource("1")); + entryStream.hasNext(); + return entryStream.getPosition(); + } + private String getRow(WAL.Entry entry) { Cell cell = entry.getEdit().getCells().get(0); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); @@ -459,17 +551,25 @@ private void appendToLog() throws IOException { } private void appendToLogPlus(int count) throws IOException { + appendToLogPlus(count, family, qualifier); + } + + private void appendToLogPlus(int count, byte[] cf) throws IOException { + appendToLogPlus(count, cf, qualifier); + } + + private void appendToLogPlus(int count, byte[] cf, byte[] cq) throws IOException { final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), - getWALEdits(count), true); + getWALEdits(count, cf, cq), true); log.sync(txid); } - private WALEdit getWALEdits(int count) { + private WALEdit getWALEdits(int count, byte[] cf, byte[] cq) { WALEdit edit = new WALEdit(); for (int i = 0; i < count; i++) { - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, - System.currentTimeMillis(), qualifier)); + edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), cf, cq, + System.currentTimeMillis(), cq)); } return edit; } @@ -491,8 +591,16 @@ public Entry filter(Entry entry) { }; } + private ReplicationQueueInfo getRecoveredQueueInfo() { + return getQueueInfo("1-1"); + } + private ReplicationQueueInfo getQueueInfo() { - return new ReplicationQueueInfo("1"); + return getQueueInfo("1"); + } + + private ReplicationQueueInfo getQueueInfo(String znode) { + return new ReplicationQueueInfo(znode); } class PathWatcher extends WALActionsListener.Base { @@ -505,5 +613,4 @@ public void preLogRoll(Path oldPath, Path newPath) throws IOException { currentPath = newPath; } } - }