Skip to content

Commit

Permalink
HBASE-25932: Ensure replication reads the trailer bytes from WAL. (#3332
Browse files Browse the repository at this point in the history
)

This bug was exposed by the test from HBASE-25924. Since this wal
implementations close the wal asynchronously, replication can potentially
miss the trailer bytes. (see jira comment for detailed analysis).

While this is not a correctness problem (since trailer does not have any entry data),
it erroneously bumps a metric that is used to track skipped bytes in WAL resulting
in false alarms which is something we should avoid.

Reviewed-by: Rushabh Shah <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by Anoop Sam John <[email protected]>
  • Loading branch information
bharathv authored Jun 1, 2021
1 parent 06c6e06 commit b04c3c7
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -192,6 +193,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
/** Listeners that are called on WAL events. */
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();

/** Tracks the logs in the process of being closed. */
protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();

/**
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
Expand Down Expand Up @@ -1027,6 +1031,13 @@ public void close() throws IOException {
LOG.info("Closed WAL: " + toString());
}

/**
* @return number of WALs currently in the process of closing.
*/
public int getInflightWALCloseCount() {
return inflightWALClosures.size();
}

/**
* updates the sequence number of a specific store. depending on the flag: replaces current seq
* number if the given seq id is bigger, or even if it is lower than existing one
Expand Down Expand Up @@ -1190,9 +1201,18 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
try {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
// Currently active path.
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
W temp = inflightWALClosures.get(path.getName());
if (temp != null) {
// In the process of being closed, trailer bytes may or may not be flushed.
// Ensuring that we read all the bytes in a file is critical for correctness of tailing
// use cases like replication, see HBASE-25924/HBASE-25932.
return OptionalLong.of(temp.getSyncedLength());
}
// Log rolled successfully.
return OptionalLong.empty();
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -712,14 +713,17 @@ private void waitForSafePoint() {
}
}

protected final long closeWriter(AsyncWriter writer) {
protected final long closeWriter(AsyncWriter writer, Path path) {
if (writer != null) {
inflightWALClosures.put(path.getName(), writer);
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
inflightWALClosures.remove(path.getName());
}
});
return fileLength;
Expand All @@ -733,7 +737,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter(this.writer);
long oldFileLen = closeWriter(this.writer, oldPath);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
Expand All @@ -759,7 +763,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter(this.writer);
closeWriter(this.writer, getOldPath());
this.writer = null;
closeExecutor.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The default implementation of FSWAL.
* The original implementation of FSWAL.
*/
@InterfaceAudience.Private
public class FSHLog extends AbstractFSWAL<Writer> {
Expand Down Expand Up @@ -380,6 +380,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
inflightWALClosures.put(oldPath.getName(), writer);
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
closeWriter(this.writer, oldPath, true);
} else {
Expand Down Expand Up @@ -448,6 +449,8 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
} finally {
inflightWALClosures.remove(path.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,9 @@ private boolean checkAllBytesParsed() throws IOException {
if (trailerSize < 0) {
if (currentPositionOfReader < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPositionOfReader;
LOG.debug(
"Reached the end of WAL {}. It was not closed cleanly," +
" so we did not parse {} bytes of data. This is normally ok.",
currentPath, skippedBytes);
// See the commits in HBASE-25924/HBASE-25932 for context.
LOG.warn("Reached the end of WAL {}. It was not closed cleanly," +
" so we did not parse {} bytes of data.", currentPath, skippedBytes);
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

/**
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
*/
@Category({ ReplicationTests.class, LargeTests.class })
public class TestFSHLogWALEntryStream extends TestWALEntryStream {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
CONF.setLong("replication.source.sleepforretries", 10);
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
Expand Down Expand Up @@ -91,10 +94,11 @@ public class TestWALEntryStream {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEntryStream.class);

private static HBaseTestingUtility TEST_UTIL;
private static Configuration CONF;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final long TEST_TIMEOUT_MS = 5000;
protected static HBaseTestingUtility TEST_UTIL;
protected static Configuration CONF;
protected static FileSystem fs;
protected static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte[] family = Bytes.toBytes("column");
private static final byte[] qualifier = Bytes.toBytes("qualifier");
Expand All @@ -103,6 +107,27 @@ public class TestWALEntryStream {
private static final NavigableMap<byte[], Integer> scopes = getScopes();
private final String fakeWalGroupId = "fake-wal-group-id";

/**
* Test helper that waits until a non-null entry is available in the stream next or times out.
*/
private static class WALEntryStreamWithRetries extends WALEntryStream {
// Class member to be able to set a non-final from within a lambda.
private Entry result;

public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics, String walGroupId) throws IOException {
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
}

@Override
public Entry next() {
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
result = WALEntryStreamWithRetries.super.next()) != null);
return result;
}
}

private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
Expand Down Expand Up @@ -148,7 +173,9 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
log.close();
if (log != null) {
log.close();
}
}

// Try out different combinations of row count and KeyValue count
Expand Down Expand Up @@ -215,7 +242,7 @@ public void testAppendsWithRolls() throws Exception {

appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
log, null, new MetricsSource("1"), fakeWalGroupId)) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
Expand All @@ -229,7 +256,7 @@ log, null, new MetricsSource("1"), fakeWalGroupId)) {
log.rollWriter();
appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
log, null, new MetricsSource("1"), fakeWalGroupId)) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
Expand All @@ -255,7 +282,7 @@ public void testLogrollWhileStreaming() throws Exception {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null,
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("1", getRow(entryStream.next()));

Expand Down Expand Up @@ -530,7 +557,8 @@ public void testReplicationSourceWALReaderWrongPosition() throws Exception {

@Override
public boolean evaluate() throws Exception {
return fs.getFileStatus(walPath).getLen() > 0;
return fs.getFileStatus(walPath).getLen() > 0 &&
((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
}

@Override
Expand All @@ -539,12 +567,13 @@ public String explainFailure() throws Exception {
}

});
long walLength = fs.getFileStatus(walPath).getLen();

ReplicationSourceWALReader reader = createReader(false, CONF);

WALEntryBatch entryBatch = reader.take();
assertEquals(walPath, entryBatch.getLastWalPath());

long walLength = fs.getFileStatus(walPath).getLen();
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
walLength, entryBatch.getLastWalPosition() <= walLength);
assertEquals(1, entryBatch.getNbEntries());
Expand Down Expand Up @@ -869,7 +898,7 @@ public void testSizeOfLogQueue() throws Exception {
*/
@Test
public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStream(
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLogAndSync();
Expand Down

0 comments on commit b04c3c7

Please sign in to comment.