Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25932: Ensure replication reads the trailer bytes from WAL. #3332

Merged
merged 2 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -192,6 +193,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
/** Listeners that are called on WAL events. */
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();

/** Tracks the logs in the process of being closed. */
protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();

/**
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
* id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
Expand Down Expand Up @@ -1027,6 +1031,13 @@ public void close() throws IOException {
LOG.info("Closed WAL: " + toString());
}

/**
* @return number of WALs currently in the process of closing.
*/
public int getInflightWALCloseCount() {
return inflightWALClosures.size();
}

/**
* updates the sequence number of a specific store. depending on the flag: replaces current seq
* number if the given seq id is bigger, or even if it is lower than existing one
Expand Down Expand Up @@ -1190,9 +1201,18 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
try {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
// Currently active path.
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
W temp = inflightWALClosures.get(path.getName());
if (temp != null) {
anoopsjohn marked this conversation as resolved.
Show resolved Hide resolved
// In the process of being closed, trailer bytes may or may not be flushed.
// Ensuring that we read all the bytes in a file is critical for correctness of tailing
// use cases like replication, see HBASE-25924/HBASE-25932.
return OptionalLong.of(temp.getSyncedLength());
}
// Log rolled successfully.
return OptionalLong.empty();
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -712,14 +713,17 @@ private void waitForSafePoint() {
}
}

protected final long closeWriter(AsyncWriter writer) {
protected final long closeWriter(AsyncWriter writer, Path path) {
if (writer != null) {
inflightWALClosures.put(path.getName(), writer);
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
inflightWALClosures.remove(path.getName());
}
});
return fileLength;
Expand All @@ -733,7 +737,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter(this.writer);
long oldFileLen = closeWriter(this.writer, oldPath);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
Expand All @@ -759,7 +763,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
closeWriter(this.writer);
closeWriter(this.writer, getOldPath());
this.writer = null;
closeExecutor.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The default implementation of FSWAL.
* The original implementation of FSWAL.
*/
@InterfaceAudience.Private
public class FSHLog extends AbstractFSWAL<Writer> {
Expand Down Expand Up @@ -380,6 +380,7 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
inflightWALClosures.put(oldPath.getName(), writer);
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
closeWriter(this.writer, oldPath, true);
} else {
Expand Down Expand Up @@ -448,6 +449,8 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
} finally {
inflightWALClosures.remove(path.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,9 @@ private boolean checkAllBytesParsed() throws IOException {
if (trailerSize < 0) {
if (currentPositionOfReader < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPositionOfReader;
LOG.debug(
"Reached the end of WAL {}. It was not closed cleanly," +
" so we did not parse {} bytes of data. This is normally ok.",
currentPath, skippedBytes);
// See the commits in HBASE-25924/HBASE-25932 for context.
LOG.warn("Reached the end of WAL {}. It was not closed cleanly," +
" so we did not parse {} bytes of data.", currentPath, skippedBytes);
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

/**
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
*/
@Category({ ReplicationTests.class, LargeTests.class })
public class TestFSHLogWALEntryStream extends TestWALEntryStream {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
CONF.setLong("replication.source.sleepforretries", 10);
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
Expand Down Expand Up @@ -91,10 +94,11 @@ public class TestWALEntryStream {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEntryStream.class);

private static HBaseTestingUtility TEST_UTIL;
private static Configuration CONF;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final long TEST_TIMEOUT_MS = 5000;
protected static HBaseTestingUtility TEST_UTIL;
protected static Configuration CONF;
protected static FileSystem fs;
protected static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte[] family = Bytes.toBytes("column");
private static final byte[] qualifier = Bytes.toBytes("qualifier");
Expand All @@ -103,6 +107,27 @@ public class TestWALEntryStream {
private static final NavigableMap<byte[], Integer> scopes = getScopes();
private final String fakeWalGroupId = "fake-wal-group-id";

/**
* Test helper that waits until a non-null entry is available in the stream next or times out.
*/
private static class WALEntryStreamWithRetries extends WALEntryStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u also pls explain why we use this special subclass here for the tests? (as comments) will be easy to read and digest later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #3344

// Class member to be able to set a non-final from within a lambda.
private Entry result;
virajjasani marked this conversation as resolved.
Show resolved Hide resolved

public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics, String walGroupId) throws IOException {
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
}

@Override
public Entry next() {
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
result = WALEntryStreamWithRetries.super.next()) != null);
return result;
}
}

private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
Expand Down Expand Up @@ -148,7 +173,9 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
log.close();
if (log != null) {
log.close();
}
}

// Try out different combinations of row count and KeyValue count
Expand Down Expand Up @@ -215,7 +242,7 @@ public void testAppendsWithRolls() throws Exception {

appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
log, null, new MetricsSource("1"), fakeWalGroupId)) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
Expand All @@ -229,7 +256,7 @@ log, null, new MetricsSource("1"), fakeWalGroupId)) {
log.rollWriter();
appendToLogAndSync();

try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
log, null, new MetricsSource("1"), fakeWalGroupId)) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
Expand All @@ -255,7 +282,7 @@ public void testLogrollWhileStreaming() throws Exception {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null,
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("1", getRow(entryStream.next()));

Expand Down Expand Up @@ -530,7 +557,8 @@ public void testReplicationSourceWALReaderWrongPosition() throws Exception {

@Override
public boolean evaluate() throws Exception {
return fs.getFileStatus(walPath).getLen() > 0;
return fs.getFileStatus(walPath).getLen() > 0 &&
((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wait for 5 sec and check. Should be wait for getInflightWALCloseCount to be zero before coming here? Or will that be a better guarantee for WAL close has happened where we come here for FS.getLen() check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't wait for 5s and check. We wait for the condition to pass within 5s or timeout, this is the javadoc, think the test is doing what you are saying (unless I misunderstood what you are saying)..

/**
   * Waits up to the duration equal to the specified timeout multiplied by the
   * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
   * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
   * <code>false</code>.
   * <p/>
   * @param conf the configuration
   * @param timeout the timeout in milliseconds to wait for the predicate.
   * @param predicate the predicate to evaluate.
   * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
   *         wait is interrupted otherwise <code>-1</code> when times out
   */

}

@Override
Expand All @@ -539,12 +567,13 @@ public String explainFailure() throws Exception {
}

});
long walLength = fs.getFileStatus(walPath).getLen();

ReplicationSourceWALReader reader = createReader(false, CONF);

WALEntryBatch entryBatch = reader.take();
assertEquals(walPath, entryBatch.getLastWalPath());

long walLength = fs.getFileStatus(walPath).getLen();
assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
walLength, entryBatch.getLastWalPosition() <= walLength);
assertEquals(1, entryBatch.getNbEntries());
Expand Down Expand Up @@ -869,7 +898,7 @@ public void testSizeOfLogQueue() throws Exception {
*/
@Test
public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStream(
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLogAndSync();
Expand Down