diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index b5697796ae1e..52f3f7133add 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -192,6 +193,9 @@ public abstract class AbstractFSWAL implements WAL { /** Listeners that are called on WAL events. */ protected final List listeners = new CopyOnWriteArrayList<>(); + /** Tracks the logs in the process of being closed. */ + protected final Map inflightWALClosures = new ConcurrentHashMap<>(); + /** * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has @@ -1027,6 +1031,13 @@ public void close() throws IOException { LOG.info("Closed WAL: " + toString()); } + /** + * @return number of WALs currently in the process of closing. + */ + public int getInflightWALCloseCount() { + return inflightWALClosures.size(); + } + /** * updates the sequence number of a specific store. depending on the flag: replaces current seq * number if the given seq id is bigger, or even if it is lower than existing one @@ -1190,9 +1201,18 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) { try { Path currentPath = getOldPath(); if (path.equals(currentPath)) { + // Currently active path. W writer = this.writer; return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); } else { + W temp = inflightWALClosures.get(path.getName()); + if (temp != null) { + // In the process of being closed, trailer bytes may or may not be flushed. + // Ensuring that we read all the bytes in a file is critical for correctness of tailing + // use cases like replication, see HBASE-25924/HBASE-25932. + return OptionalLong.of(temp.getSyncedLength()); + } + // Log rolled successfully. return OptionalLong.empty(); } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index c11496b8f3b5..4b0f0410097d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -31,6 +31,7 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; +import java.util.OptionalLong; import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; @@ -712,14 +713,17 @@ private void waitForSafePoint() { } } - protected final long closeWriter(AsyncWriter writer) { + protected final long closeWriter(AsyncWriter writer, Path path) { if (writer != null) { + inflightWALClosures.put(path.getName(), writer); long fileLength = writer.getLength(); closeExecutor.execute(() -> { try { writer.close(); } catch (IOException e) { LOG.warn("close old writer failed", e); + } finally { + inflightWALClosures.remove(path.getName()); } }); return fileLength; @@ -733,7 +737,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(this.writer); + long oldFileLen = closeWriter(this.writer, oldPath); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -759,7 +763,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite @Override protected void doShutdown() throws IOException { waitForSafePoint(); - closeWriter(this.writer); + closeWriter(this.writer, getOldPath()); this.writer = null; closeExecutor.shutdown(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index cad08216c524..3efadc1d479a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -66,7 +66,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * The default implementation of FSWAL. + * The original implementation of FSWAL. */ @InterfaceAudience.Private public class FSHLog extends AbstractFSWAL { @@ -380,6 +380,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th // In case of having unflushed entries or we already reached the // closeErrorsTolerated count, call the closeWriter inline rather than in async // way so that in case of an IOE we will throw it back and abort RS. + inflightWALClosures.put(oldPath.getName(), writer); if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) { closeWriter(this.writer, oldPath, true); } else { @@ -448,6 +449,8 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws } LOG.warn("Riding over failed WAL close of " + path + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe); + } finally { + inflightWALClosures.remove(path.getName()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 5e63e5e3661a..53cd0845a9cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -223,10 +223,9 @@ private boolean checkAllBytesParsed() throws IOException { if (trailerSize < 0) { if (currentPositionOfReader < stat.getLen()) { final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL {}. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); + // See the commits in HBASE-25924/HBASE-25932 for context. + LOG.warn("Reached the end of WAL {}. It was not closed cleanly," + + " so we did not parse {} bytes of data.", currentPath, skippedBytes); metrics.incrUncleanlyClosedWALs(); metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java new file mode 100644 index 000000000000..32d6ec4b806f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestFSHLogWALEntryStream extends TestWALEntryStream { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + CONF = TEST_UTIL.getConfiguration(); + CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class); + CONF.setLong("replication.source.sleepforretries", 10); + TEST_UTIL.startMiniDFSCluster(3); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 5507972ab4c2..d4bdaaaec0c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -53,11 +53,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; @@ -91,10 +94,11 @@ public class TestWALEntryStream { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALEntryStream.class); - private static HBaseTestingUtility TEST_UTIL; - private static Configuration CONF; - private static FileSystem fs; - private static MiniDFSCluster cluster; + private static final long TEST_TIMEOUT_MS = 5000; + protected static HBaseTestingUtility TEST_UTIL; + protected static Configuration CONF; + protected static FileSystem fs; + protected static MiniDFSCluster cluster; private static final TableName tableName = TableName.valueOf("tablename"); private static final byte[] family = Bytes.toBytes("column"); private static final byte[] qualifier = Bytes.toBytes("qualifier"); @@ -103,6 +107,27 @@ public class TestWALEntryStream { private static final NavigableMap scopes = getScopes(); private final String fakeWalGroupId = "fake-wal-group-id"; + /** + * Test helper that waits until a non-null entry is available in the stream next or times out. + */ + private static class WALEntryStreamWithRetries extends WALEntryStream { + // Class member to be able to set a non-final from within a lambda. + private Entry result; + + public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics, String walGroupId) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId); + } + + @Override + public Entry next() { + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> ( + result = WALEntryStreamWithRetries.super.next()) != null); + return result; + } + } + private static NavigableMap getScopes() { NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); scopes.put(family, 1); @@ -148,7 +173,9 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - log.close(); + if (log != null) { + log.close(); + } } // Try out different combinations of row count and KeyValue count @@ -215,7 +242,7 @@ public void testAppendsWithRolls() throws Exception { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, null, new MetricsSource("1"), fakeWalGroupId)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -229,7 +256,7 @@ log, null, new MetricsSource("1"), fakeWalGroupId)) { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos, + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, null, new MetricsSource("1"), fakeWalGroupId)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -255,7 +282,7 @@ public void testLogrollWhileStreaming() throws Exception { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, + new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { assertEquals("1", getRow(entryStream.next())); @@ -530,7 +557,8 @@ public void testReplicationSourceWALReaderWrongPosition() throws Exception { @Override public boolean evaluate() throws Exception { - return fs.getFileStatus(walPath).getLen() > 0; + return fs.getFileStatus(walPath).getLen() > 0 && + ((AbstractFSWAL) log).getInflightWALCloseCount() == 0; } @Override @@ -539,12 +567,13 @@ public String explainFailure() throws Exception { } }); - long walLength = fs.getFileStatus(walPath).getLen(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); assertEquals(walPath, entryBatch.getLastWalPath()); + + long walLength = fs.getFileStatus(walPath).getLen(); assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + walLength, entryBatch.getLastWalPosition() <= walLength); assertEquals(1, entryBatch.getNbEntries()); @@ -869,7 +898,7 @@ public void testSizeOfLogQueue() throws Exception { */ @Test public void testCleanClosedWALs() throws Exception { - try (WALEntryStream entryStream = new WALEntryStream( + try (WALEntryStream entryStream = new WALEntryStreamWithRetries( logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); appendToLogAndSync();