From 3353381338c4a19825f41fefd103b5ad12be8b2a Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 13 Jul 2023 21:24:43 +0800 Subject: [PATCH] HBASE-27231 FSHLog should retry writing WAL entries when syncs to HDFS failed (#5317) Co-authored-by: Duo Zhang Co-authored-by: chenglei Signed-off-by: chenglei Signed-off-by: Duo Zhang --- .../hbase/regionserver/wal/AbstractFSWAL.java | 752 +++++++++++++- .../hbase/regionserver/wal/AsyncFSWAL.java | 651 +----------- .../hadoop/hbase/regionserver/wal/FSHLog.java | 977 ++++-------------- .../regionserver/TestFailedAppendAndSync.java | 18 +- .../hbase/regionserver/TestHRegion.java | 12 + .../hbase/regionserver/TestWALLockup.java | 454 -------- .../regionserver/wal/AbstractTestFSWAL.java | 2 +- .../hbase/regionserver/wal/TestFSHLog.java | 102 +- .../regionserver/wal/TestLogRollAbort.java | 53 +- .../regionserver/wal/TestLogRolling.java | 213 ++-- pom.xml | 2 +- 11 files changed, 1112 insertions(+), 2124 deletions(-) delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 8df65487c676..b3445ab42423 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -19,13 +19,17 @@ import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; +import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.Sequencer; import io.opentelemetry.api.trace.Span; import java.io.FileNotFoundException; import java.io.IOException; @@ -33,14 +37,20 @@ import java.lang.management.MemoryType; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Deque; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -56,7 +66,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -95,6 +108,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -128,6 +142,9 @@ public abstract class AbstractFSWAL implements WAL { private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); + private static final Comparator SEQ_COMPARATOR = + Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); + private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; /** Don't log blocking regions more frequently than this. */ @@ -157,6 +174,9 @@ public abstract class AbstractFSWAL implements WAL { public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; + public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; + public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; + /** * file system instance */ @@ -368,6 +388,57 @@ private static final class WALProps { private final int archiveRetries; + protected ExecutorService consumeExecutor; + + private final Lock consumeLock = new ReentrantLock(); + + protected final Runnable consumer = this::consume; + + // check if there is already a consumer task in the event loop's task queue + protected Supplier hasConsumerTask; + + private static final int MAX_EPOCH = 0x3FFFFFFF; + // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old + // writer to be closed. + // the second lowest bit is writerBroken which means the current writer is broken and rollWriter + // is needed. + // all other bits are the epoch number of the current writer, this is used to detect whether the + // writer is still the one when you issue the sync. + // notice that, modification to this field is only allowed under the protection of consumeLock. + private volatile int epochAndState; + + private boolean readyForRolling; + + private final Condition readyForRollingCond = consumeLock.newCondition(); + + private final RingBuffer waitingConsumePayloads; + + private final Sequence waitingConsumePayloadsGatingSequence; + + private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); + + private final long batchSize; + + protected final Deque toWriteAppends = new ArrayDeque<>(); + + protected final Deque unackedAppends = new ArrayDeque<>(); + + protected final SortedSet syncFutures = new TreeSet<>(SEQ_COMPARATOR); + + // the highest txid of WAL entries being processed + protected long highestProcessedAppendTxid; + + // file length when we issue last sync request on the writer + private long fileLengthAtLastSync; + + private long highestProcessedAppendTxidAtLastSync; + + private int waitOnShutdownInSeconds; + + private String waitOnShutdownInSecondsConfigKey; + + protected boolean shouldShutDownConsumeExecutorWhenClose = true; + public long getFilenum() { return this.filenum.get(); } @@ -414,6 +485,23 @@ protected final int getPreallocatedEventCount() { return floor << 1; } + protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds, + String waitOnShutdownInSecondsConfigKey) { + this.waitOnShutdownInSeconds = waitOnShutdownInSeconds; + this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey; + } + + protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir, + final String prefix) { + ThreadPoolExecutor threadPool = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:" + + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build()); + hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; + consumeExecutor = threadPool; + this.shouldShutDownConsumeExecutorWhenClose = true; + } + protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List listeners, final boolean failIfWALExists, final String prefix, final String suffix) @@ -527,6 +615,19 @@ public boolean accept(final Path fileName) { archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); this.walShutdownTimeout = conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); + + int preallocatedEventCount = + conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + waitingConsumePayloads = + RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); + waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); + + // inrease the ringbuffer sequence so our txid is start from 1 + waitingConsumePayloads.publish(waitingConsumePayloads.next()); + waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); + + batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); } /** @@ -615,10 +716,6 @@ public final void sync(long txid, boolean forceSync) throws IOException { TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); } - protected abstract void doSync(boolean forceSync) throws IOException; - - protected abstract void doSync(long txid, boolean forceSync) throws IOException; - /** * This is a convenience method that computes a new filename with a given file-number. * @param filenum to use @@ -735,7 +832,7 @@ Map> findRegionsToForceFlush() throws IOException { /** * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. */ - protected final void markClosedAndClean(Path path) { + private void markClosedAndClean(Path path) { WALProps props = walFile2Props.get(path); // typically this should not be null, but if there is no big issue if it is already null, so // let's make the code more robust @@ -1305,6 +1402,432 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws () -> createSpan("WAL.appendMarker")); } + /** + * Helper that marks the future as DONE and offers it back to the cache. + */ + protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { + future.done(txid, t); + syncFutureCache.offer(future); + } + + private static boolean waitingRoll(int epochAndState) { + return (epochAndState & 1) != 0; + } + + private static boolean writerBroken(int epochAndState) { + return ((epochAndState >>> 1) & 1) != 0; + } + + private static int epoch(int epochAndState) { + return epochAndState >>> 2; + } + + // return whether we have successfully set readyForRolling to true. + private boolean trySetReadyForRolling() { + // Check without holding lock first. Usually we will just return here. + // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to + // check them outside the consumeLock. + if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { + return false; + } + consumeLock.lock(); + try { + // 1. a roll is requested + // 2. all out-going entries have been acked(we have confirmed above). + if (waitingRoll(epochAndState)) { + readyForRolling = true; + readyForRollingCond.signalAll(); + return true; + } else { + return false; + } + } finally { + consumeLock.unlock(); + } + } + + private void syncFailed(long epochWhenSync, Throwable error) { + LOG.warn("sync failed", error); + this.onException(epochWhenSync, error); + } + + private void onException(long epochWhenSync, Throwable error) { + boolean shouldRequestLogRoll = true; + consumeLock.lock(); + try { + int currentEpochAndState = epochAndState; + if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { + // this is not the previous writer which means we have already rolled the writer. + // or this is still the current writer, but we have already marked it as broken and request + // a roll. + return; + } + this.epochAndState = currentEpochAndState | 0b10; + if (waitingRoll(currentEpochAndState)) { + readyForRolling = true; + readyForRollingCond.signalAll(); + // this means we have already in the middle of a rollWriter so just tell the roller thread + // that you can continue without requesting an extra log roll. + shouldRequestLogRoll = false; + } + } finally { + consumeLock.unlock(); + } + for (Iterator iter = unackedAppends.descendingIterator(); iter.hasNext();) { + toWriteAppends.addFirst(iter.next()); + } + highestUnsyncedTxid = highestSyncedTxid.get(); + if (shouldRequestLogRoll) { + // request a roll. + requestLogRoll(ERROR); + } + } + + private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) { + // Please see the last several comments on HBASE-22761, it is possible that we get a + // syncCompleted which acks a previous sync request after we received a syncFailed on the same + // writer. So here we will also check on the epoch and state, if the epoch has already been + // changed, i.e, we have already rolled the writer, or the writer is already broken, we should + // just skip here, to avoid mess up the state or accidentally release some WAL entries and + // cause data corruption. + // The syncCompleted call is on the critical write path so we should try our best to make it + // fast. So here we do not hold consumeLock, for increasing performance. It is safe because + // there are only 3 possible situations: + // 1. For normal case, the only place where we change epochAndState is when rolling the writer. + // Before rolling actually happen, we will only change the state to waitingRoll which is another + // bit than writerBroken, and when we actually change the epoch, we can make sure that there is + // no out going sync request. So we will always pass the check here and there is no problem. + // 2. The writer is broken, but we have not called syncFailed yet. In this case, since + // syncFailed and syncCompleted are executed in the same thread, we will just face the same + // situation with #1. + // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are + // only 2 possible situations: + // a. we arrive before we actually roll the writer, then we will find out the writer is broken + // and give up. + // b. we arrive after we actually roll the writer, then we will find out the epoch is changed + // and give up. + // For both #a and #b, we do not need to hold the consumeLock as we will always update the + // epochAndState as a whole. + // So in general, for all the cases above, we do not need to hold the consumeLock. + int epochAndState = this.epochAndState; + if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { + LOG.warn("Got a sync complete call after the writer is broken, skip"); + return; + } + + if (processedTxid < highestSyncedTxid.get()) { + return; + } + highestSyncedTxid.set(processedTxid); + for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { + FSWALEntry entry = iter.next(); + if (entry.getTxid() <= processedTxid) { + entry.release(); + iter.remove(); + } else { + break; + } + } + postSync(System.nanoTime() - startTimeNs, finishSync()); + /** + * This method is used to be compatible with the original logic of {@link FSHLog}. + */ + checkSlowSyncCount(); + if (trySetReadyForRolling()) { + // we have just finished a roll, then do not need to check for log rolling, the writer will be + // closed soon. + return; + } + // If we haven't already requested a roll, check if we have exceeded logrollsize + if (!isLogRollRequested() && writer.getLength() > logrollsize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() + + ", logrollsize=" + logrollsize); + } + requestLogRoll(SIZE); + } + } + + // find all the sync futures between these two txids to see if we need to issue a hsync, if no + // sync futures then just use the default one. + private boolean isHsync(long beginTxid, long endTxid) { + SortedSet futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), + new SyncFuture().reset(endTxid + 1, false)); + if (futures.isEmpty()) { + return useHsync; + } + for (SyncFuture future : futures) { + if (future.isForceSync()) { + return true; + } + } + return false; + } + + private void sync(W writer) { + fileLengthAtLastSync = writer.getLength(); + long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; + boolean shouldUseHsync = + isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); + highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; + final long startTimeNs = System.nanoTime(); + final long epoch = (long) epochAndState >>> 2L; + addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid), + (result, error) -> { + if (error != null) { + syncFailed(epoch, error); + } else { + long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result); + syncCompleted(epoch, writer, syncedTxid, startTimeNs); + } + }, consumeExecutor); + } + + /** + * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use + * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling + * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we + * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as + * successful syncedTxid. + */ + protected long getSyncedTxid(long processedTxid, long completableFutureResult) { + return processedTxid; + } + + protected abstract CompletableFuture doWriterSync(W writer, boolean shouldUseHsync, + long txidWhenSyn); + + private int finishSyncLowerThanTxid(long txid) { + int finished = 0; + for (Iterator iter = syncFutures.iterator(); iter.hasNext();) { + SyncFuture sync = iter.next(); + if (sync.getTxid() <= txid) { + markFutureDoneAndOffer(sync, txid, null); + iter.remove(); + finished++; + } else { + break; + } + } + return finished; + } + + // try advancing the highestSyncedTxid as much as possible + private int finishSync() { + if (unackedAppends.isEmpty()) { + // All outstanding appends have been acked. + if (toWriteAppends.isEmpty()) { + // Also no appends that wait to be written out, then just finished all pending syncs. + long maxSyncTxid = highestSyncedTxid.get(); + for (SyncFuture sync : syncFutures) { + maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); + markFutureDoneAndOffer(sync, maxSyncTxid, null); + } + highestSyncedTxid.set(maxSyncTxid); + int finished = syncFutures.size(); + syncFutures.clear(); + return finished; + } else { + // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so + // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between + // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. + long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); + assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; + long doneTxid = lowestUnprocessedAppendTxid - 1; + highestSyncedTxid.set(doneTxid); + return finishSyncLowerThanTxid(doneTxid); + } + } else { + // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the + // first unacked append minus 1. + long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); + long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); + highestSyncedTxid.set(doneTxid); + return finishSyncLowerThanTxid(doneTxid); + } + } + + // confirm non-empty before calling + private static long getLastTxid(Deque queue) { + return queue.peekLast().getTxid(); + } + + private void appendAndSync() throws IOException { + final W writer = this.writer; + // maybe a sync request is not queued when we issue a sync, so check here to see if we could + // finish some. + finishSync(); + long newHighestProcessedAppendTxid = -1L; + // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single + // threaded, this could save us some cycles + boolean addedToUnackedAppends = false; + for (Iterator iter = toWriteAppends.iterator(); iter.hasNext();) { + FSWALEntry entry = iter.next(); + /** + * For {@link FSHog},here may throws IOException,but for {@link AsyncFSWAL}, here would not + * throw any IOException. + */ + boolean appended = appendEntry(writer, entry); + newHighestProcessedAppendTxid = entry.getTxid(); + iter.remove(); + if (appended) { + // This is possible, when we fail to sync, we will add the unackedAppends back to + // toWriteAppends, so here we may get an entry which is already in the unackedAppends. + if ( + addedToUnackedAppends || unackedAppends.isEmpty() + || getLastTxid(unackedAppends) < entry.getTxid() + ) { + unackedAppends.addLast(entry); + addedToUnackedAppends = true; + } + // See HBASE-25905, here we need to make sure that, we will always write all the entries in + // unackedAppends out. As the code in the consume method will assume that, the entries in + // unackedAppends have all been sent out so if there is roll request and unackedAppends is + // not empty, we could just return as later there will be a syncCompleted call to clear the + // unackedAppends, or a syncFailed to lead us to another state. + // There could be other ways to fix, such as changing the logic in the consume method, but + // it will break the assumption and then (may) lead to a big refactoring. So here let's use + // this way to fix first, can optimize later. + if ( + writer.getLength() - fileLengthAtLastSync >= batchSize + && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) + ) { + break; + } + } + } + // if we have a newer transaction id, update it. + // otherwise, use the previous transaction id. + if (newHighestProcessedAppendTxid > 0) { + highestProcessedAppendTxid = newHighestProcessedAppendTxid; + } else { + newHighestProcessedAppendTxid = highestProcessedAppendTxid; + } + + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + // sync because buffer size limit. + sync(writer); + return; + } + if (writer.getLength() == fileLengthAtLastSync) { + // we haven't written anything out, just advance the highestSyncedSequence since we may only + // stamped some region sequence id. + if (unackedAppends.isEmpty()) { + highestSyncedTxid.set(highestProcessedAppendTxid); + finishSync(); + trySetReadyForRolling(); + } + return; + } + // reach here means that we have some unsynced data but haven't reached the batch size yet + // but we will not issue a sync directly here even if there are sync requests because we may + // have some new data in the ringbuffer, so let's just return here and delay the decision of + // whether to issue a sync in the caller method. + } + + private void consume() { + consumeLock.lock(); + try { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState)) { + return; + } + if (waitingRoll(currentEpochAndState)) { + if (writer.getLength() > fileLengthAtLastSync) { + // issue a sync + sync(writer); + } else { + if (unackedAppends.isEmpty()) { + readyForRolling = true; + readyForRollingCond.signalAll(); + } + } + return; + } + } finally { + consumeLock.unlock(); + } + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor + <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case APPEND: + toWriteAppends.addLast(truck.unloadAppend()); + break; + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); + break; + } + waitingConsumePayloadsGatingSequence.set(nextCursor); + } + + /** + * This method is used to be compatible with the original logic of {@link AsyncFSWAL}. + */ + preAppendAndSync(); + try { + appendAndSync(); + } catch (IOException exception) { + /** + * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't + * go in here. + */ + LOG.error("appendAndSync throws IOException.", exception); + onAppendEntryFailed(exception); + return; + } + if (hasConsumerTask.get()) { + return; + } + if (toWriteAppends.isEmpty()) { + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + consumerScheduled.set(false); + // recheck here since in append and sync we do not hold the consumeLock. Thing may + // happen like + // 1. we check cursor, no new entry + // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and + // give up scheduling the consumer task. + // 3. we set consumerScheduled to false and also give up scheduling consumer task. + if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { + // we will give up consuming so if there are some unsynced data we need to issue a sync. + if ( + writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() + && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync + ) { + // no new data in the ringbuffer and we have at least one sync request + sync(writer); + } + return; + } else { + // maybe someone has grabbed this before us + if (!consumerScheduled.compareAndSet(false, true)) { + return; + } + } + } + } + // reschedule if we still have something to write. + consumeExecutor.execute(consumer); + } + + protected void preAppendAndSync() { + } + + private boolean shouldScheduleConsumer() { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { + return false; + } + return consumerScheduled.compareAndSet(false, true); + } + /** * Append a set of edits to the WAL. *

@@ -1322,7 +1845,7 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws * passed in WALKey walKey parameter. Be warned that the WriteEntry is not * immediately available on return from this method. It WILL be available subsequent to a sync of * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. - * @param info the regioninfo associated with append + * @param hri the regioninfo associated with append * @param key Modified by this call; we add to it this edits region edit/sequence id. * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit * sequence id that is after all currently appended edits. @@ -1335,14 +1858,95 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. */ - protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException; + protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) + throws IOException { + precheckBeforeAppendWALEdit(hri, key, edits, inMemstore); + long txid = + stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); + if (shouldScheduleConsumer()) { + consumeExecutor.execute(consumer); + } + return txid; + } - protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; + protected void precheckBeforeAppendWALEdit(RegionInfo hri, WALKeyImpl key, WALEdit edits, + boolean inMemstore) throws IOException { + } + + protected void doSync(boolean forceSync) throws IOException { + long txid = waitingConsumePayloads.next(); + SyncFuture future; + try { + future = getSyncFuture(txid, forceSync); + RingBufferTruck truck = waitingConsumePayloads.get(txid); + truck.load(future); + } finally { + waitingConsumePayloads.publish(txid); + } + if (shouldScheduleConsumer()) { + consumeExecutor.execute(consumer); + } + blockOnSync(future); + } + + protected void doSync(long txid, boolean forceSync) throws IOException { + if (highestSyncedTxid.get() >= txid) { + return; + } + // here we do not use ring buffer sequence as txid + long sequence = waitingConsumePayloads.next(); + SyncFuture future; + try { + future = getSyncFuture(txid, forceSync); + RingBufferTruck truck = waitingConsumePayloads.get(sequence); + truck.load(future); + } finally { + waitingConsumePayloads.publish(sequence); + } + if (shouldScheduleConsumer()) { + consumeExecutor.execute(consumer); + } + blockOnSync(future); + } protected abstract W createWriterInstance(Path path) throws IOException, CommonFSUtils.StreamLacksCapabilityException; + protected final void waitForSafePoint() { + consumeLock.lock(); + try { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState) || this.writer == null) { + return; + } + consumerScheduled.set(true); + epochAndState = currentEpochAndState | 1; + readyForRolling = false; + consumeExecutor.execute(consumer); + while (!readyForRolling) { + readyForRollingCond.awaitUninterruptibly(); + } + } finally { + consumeLock.unlock(); + } + } + + protected final void closeWriter(W writer, Path path) { + inflightWALClosures.put(path.getName(), writer); + closeExecutor.execute(() -> { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); + } finally { + // call this even if the above close fails, as there is no other chance we can set closed to + // true, it will not cause big problems. + markClosedAndClean(path); + inflightWALClosures.remove(path.getName()); + } + }); + } + /** * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer * will begin to work before returning from this method. If we clear the flag after returning from @@ -1350,13 +1954,127 @@ protected abstract W createWriterInstance(Path path) * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you * start writing to the new writer. */ - protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) - throws IOException; + protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { + Preconditions.checkNotNull(nextWriter); + waitForSafePoint(); + /** + * For {@link FSHLog},here would shutdown {@link FSHLog.SyncRunner}. + */ + doCleanUpResources(); + // we will call rollWriter in init method, where we want to create the first writer and + // obviously the previous writer is null, so here we need this null check. And why we must call + // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after + // closing the writer asynchronously, we need to make sure the WALProps is put into + // walFile2Props before we call markClosedAndClean + if (writer != null) { + long oldFileLen = writer.getLength(); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + closeWriter(writer, oldPath); + } else { + logRollAndSetupWalProps(oldPath, newPath, 0); + } + this.writer = nextWriter; + /** + * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem + * output after writer is replaced. + */ + onWriterReplaced(nextWriter); + this.fileLengthAtLastSync = nextWriter.getLength(); + this.highestProcessedAppendTxidAtLastSync = 0L; + consumeLock.lock(); + try { + consumerScheduled.set(true); + int currentEpoch = epochAndState >>> 2; + int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; + // set a new epoch and also clear waitingRoll and writerBroken + this.epochAndState = nextEpoch << 2; + // Reset rollRequested status + rollRequested.set(false); + consumeExecutor.execute(consumer); + } finally { + consumeLock.unlock(); + } + } - protected abstract void doShutdown() throws IOException; + protected abstract void onWriterReplaced(W nextWriter); + + protected void doShutdown() throws IOException { + waitForSafePoint(); + /** + * For {@link FSHLog},here would shutdown {@link FSHLog.SyncRunner}. + */ + doCleanUpResources(); + if (this.writer != null) { + closeWriter(this.writer, getOldPath()); + this.writer = null; + } + closeExecutor.shutdown(); + try { + if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey + + "\""); + } + } catch (InterruptedException e) { + LOG.error("The wait for close of async writer is interrupted"); + Thread.currentThread().interrupt(); + } + IOException error = new IOException("WAL has been closed"); + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + // drain all the pending sync requests + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor + <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + break; + } + } + // and fail them + syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); + if (this.shouldShutDownConsumeExecutorWhenClose) { + consumeExecutor.shutdown(); + } + } + + protected void doCleanUpResources() { + }; + + protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; + + /** + * This method gets the pipeline for the current WAL. + */ + abstract DatanodeInfo[] getPipeline(); + + /** + * This method gets the datanode replication count for the current WAL. + */ + abstract int getLogReplication(); protected abstract boolean doCheckLogLowReplication(); + protected boolean isWriterBroken() { + return writerBroken(epochAndState); + } + + private void onAppendEntryFailed(IOException exception) { + LOG.warn("append entry failed", exception); + final long currentEpoch = (long) epochAndState >>> 2L; + this.onException(currentEpoch, exception); + } + + protected void checkSlowSyncCount() { + } + /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ protected boolean doCheckSlowSync() { boolean result = false; @@ -1407,16 +2125,6 @@ public void checkLogLowReplication(long checkInterval) { } } - /** - * This method gets the pipeline for the current WAL. - */ - abstract DatanodeInfo[] getPipeline(); - - /** - * This method gets the datanode replication count for the current WAL. - */ - abstract int getLogReplication(); - private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = CommonFSUtils.getWALFileSystem(conf); if (!fs.exists(p)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9be86077b525..5de9d4d6b8d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -17,32 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; -import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; -import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.Sequencer; import java.io.IOException; import java.lang.reflect.Field; -import java.util.ArrayDeque; -import java.util.Comparator; -import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Queue; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,10 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; -import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; @@ -129,9 +106,6 @@ public class AsyncFSWAL extends AbstractFSWAL { private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); - private static final Comparator SEQ_COMPARATOR = - Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); - public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; @@ -145,57 +119,10 @@ public class AsyncFSWAL extends AbstractFSWAL { private final EventLoopGroup eventLoopGroup; - private final ExecutorService consumeExecutor; - private final Class channelClass; - private final Lock consumeLock = new ReentrantLock(); - - private final Runnable consumer = this::consume; - - // check if there is already a consumer task in the event loop's task queue - private final Supplier hasConsumerTask; - - private static final int MAX_EPOCH = 0x3FFFFFFF; - // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old - // writer to be closed. - // the second lowest bit is writerBroken which means the current writer is broken and rollWriter - // is needed. - // all other bits are the epoch number of the current writer, this is used to detect whether the - // writer is still the one when you issue the sync. - // notice that, modification to this field is only allowed under the protection of consumeLock. - private volatile int epochAndState; - - private boolean readyForRolling; - - private final Condition readyForRollingCond = consumeLock.newCondition(); - - private final RingBuffer waitingConsumePayloads; - - private final Sequence waitingConsumePayloadsGatingSequence; - - private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); - - private final long batchSize; - private volatile AsyncFSOutput fsOut; - private final Deque toWriteAppends = new ArrayDeque<>(); - - private final Deque unackedAppends = new ArrayDeque<>(); - - private final SortedSet syncFutures = new TreeSet<>(SEQ_COMPARATOR); - - // the highest txid of WAL entries being processed - private long highestProcessedAppendTxid; - - // file length when we issue last sync request on the writer - private long fileLengthAtLastSync; - - private long highestProcessedAppendTxidAtLastSync; - - private final int waitOnShutdownInSeconds; - private final StreamSlowMonitor streamSlowMonitor; public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, @@ -216,345 +143,35 @@ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDi this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; this.streamSlowMonitor = monitor; - Supplier hasConsumerTask; if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { this.consumeExecutor = eventLoopGroup.next(); + this.shouldShutDownConsumeExecutorWhenClose = false; if (consumeExecutor instanceof SingleThreadEventExecutor) { try { Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); field.setAccessible(true); Queue queue = (Queue) field.get(consumeExecutor); - hasConsumerTask = () -> queue.peek() == consumer; + this.hasConsumerTask = () -> queue.peek() == consumer; } catch (Exception e) { LOG.warn("Can not get task queue of " + consumeExecutor + ", this is not necessary, just give up", e); - hasConsumerTask = () -> false; - } - } else { - hasConsumerTask = () -> false; - } - } else { - ThreadPoolExecutor threadPool = - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), - new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString() - + "-prefix:" + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true) - .build()); - hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; - this.consumeExecutor = threadPool; - } - - this.hasConsumerTask = hasConsumerTask; - int preallocatedEventCount = - conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); - waitingConsumePayloads = - RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); - waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); - - // inrease the ringbuffer sequence so our txid is start from 1 - waitingConsumePayloads.publish(waitingConsumePayloads.next()); - waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); - - batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); - waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, - DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); - } - - /** - * Helper that marks the future as DONE and offers it back to the cache. - */ - private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { - future.done(txid, t); - syncFutureCache.offer(future); - } - - private static boolean waitingRoll(int epochAndState) { - return (epochAndState & 1) != 0; - } - - private static boolean writerBroken(int epochAndState) { - return ((epochAndState >>> 1) & 1) != 0; - } - - private static int epoch(int epochAndState) { - return epochAndState >>> 2; - } - - // return whether we have successfully set readyForRolling to true. - private boolean trySetReadyForRolling() { - // Check without holding lock first. Usually we will just return here. - // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to - // check them outside the consumeLock. - if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { - return false; - } - consumeLock.lock(); - try { - // 1. a roll is requested - // 2. all out-going entries have been acked(we have confirmed above). - if (waitingRoll(epochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - return true; - } else { - return false; - } - } finally { - consumeLock.unlock(); - } - } - - private void syncFailed(long epochWhenSync, Throwable error) { - LOG.warn("sync failed", error); - boolean shouldRequestLogRoll = true; - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { - // this is not the previous writer which means we have already rolled the writer. - // or this is still the current writer, but we have already marked it as broken and request - // a roll. - return; - } - this.epochAndState = currentEpochAndState | 0b10; - if (waitingRoll(currentEpochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - // this means we have already in the middle of a rollWriter so just tell the roller thread - // that you can continue without requesting an extra log roll. - shouldRequestLogRoll = false; - } - } finally { - consumeLock.unlock(); - } - for (Iterator iter = unackedAppends.descendingIterator(); iter.hasNext();) { - toWriteAppends.addFirst(iter.next()); - } - highestUnsyncedTxid = highestSyncedTxid.get(); - if (shouldRequestLogRoll) { - // request a roll. - requestLogRoll(ERROR); - } - } - - private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, - long startTimeNs) { - // Please see the last several comments on HBASE-22761, it is possible that we get a - // syncCompleted which acks a previous sync request after we received a syncFailed on the same - // writer. So here we will also check on the epoch and state, if the epoch has already been - // changed, i.e, we have already rolled the writer, or the writer is already broken, we should - // just skip here, to avoid mess up the state or accidentally release some WAL entries and - // cause data corruption. - // The syncCompleted call is on the critical write path so we should try our best to make it - // fast. So here we do not hold consumeLock, for increasing performance. It is safe because - // there are only 3 possible situations: - // 1. For normal case, the only place where we change epochAndState is when rolling the writer. - // Before rolling actually happen, we will only change the state to waitingRoll which is another - // bit than writerBroken, and when we actually change the epoch, we can make sure that there is - // no out going sync request. So we will always pass the check here and there is no problem. - // 2. The writer is broken, but we have not called syncFailed yet. In this case, since - // syncFailed and syncCompleted are executed in the same thread, we will just face the same - // situation with #1. - // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are - // only 2 possible situations: - // a. we arrive before we actually roll the writer, then we will find out the writer is broken - // and give up. - // b. we arrive after we actually roll the writer, then we will find out the epoch is changed - // and give up. - // For both #a and #b, we do not need to hold the consumeLock as we will always update the - // epochAndState as a whole. - // So in general, for all the cases above, we do not need to hold the consumeLock. - int epochAndState = this.epochAndState; - if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { - LOG.warn("Got a sync complete call after the writer is broken, skip"); - return; - } - highestSyncedTxid.set(processedTxid); - for (Iterator iter = unackedAppends.iterator(); iter.hasNext();) { - FSWALEntry entry = iter.next(); - if (entry.getTxid() <= processedTxid) { - entry.release(); - iter.remove(); - } else { - break; - } - } - postSync(System.nanoTime() - startTimeNs, finishSync()); - if (trySetReadyForRolling()) { - // we have just finished a roll, then do not need to check for log rolling, the writer will be - // closed soon. - return; - } - // If we haven't already requested a roll, check if we have exceeded logrollsize - if (!isLogRollRequested() && writer.getLength() > logrollsize) { - if (LOG.isDebugEnabled()) { - LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() - + ", logrollsize=" + logrollsize); - } - requestLogRoll(SIZE); - } - } - - // find all the sync futures between these two txids to see if we need to issue a hsync, if no - // sync futures then just use the default one. - private boolean isHsync(long beginTxid, long endTxid) { - SortedSet futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), - new SyncFuture().reset(endTxid + 1, false)); - if (futures.isEmpty()) { - return useHsync; - } - for (SyncFuture future : futures) { - if (future.isForceSync()) { - return true; - } - } - return false; - } - - private void sync(AsyncWriter writer) { - fileLengthAtLastSync = writer.getLength(); - long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; - boolean shouldUseHsync = - isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); - highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; - final long startTimeNs = System.nanoTime(); - final long epoch = (long) epochAndState >>> 2L; - addListener(writer.sync(shouldUseHsync), (result, error) -> { - if (error != null) { - syncFailed(epoch, error); - } else { - syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); - } - }, consumeExecutor); - } - - private int finishSyncLowerThanTxid(long txid) { - int finished = 0; - for (Iterator iter = syncFutures.iterator(); iter.hasNext();) { - SyncFuture sync = iter.next(); - if (sync.getTxid() <= txid) { - markFutureDoneAndOffer(sync, txid, null); - iter.remove(); - finished++; - } else { - break; - } - } - return finished; - } - - // try advancing the highestSyncedTxid as much as possible - private int finishSync() { - if (unackedAppends.isEmpty()) { - // All outstanding appends have been acked. - if (toWriteAppends.isEmpty()) { - // Also no appends that wait to be written out, then just finished all pending syncs. - long maxSyncTxid = highestSyncedTxid.get(); - for (SyncFuture sync : syncFutures) { - maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); - markFutureDoneAndOffer(sync, maxSyncTxid, null); + this.hasConsumerTask = () -> false; } - highestSyncedTxid.set(maxSyncTxid); - int finished = syncFutures.size(); - syncFutures.clear(); - return finished; } else { - // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so - // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between - // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. - long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); - assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; - long doneTxid = lowestUnprocessedAppendTxid - 1; - highestSyncedTxid.set(doneTxid); - return finishSyncLowerThanTxid(doneTxid); + this.hasConsumerTask = () -> false; } } else { - // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the - // first unacked append minus 1. - long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); - long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); - highestSyncedTxid.set(doneTxid); - return finishSyncLowerThanTxid(doneTxid); + this.createSingleThreadPoolConsumeExecutor("AsyncFSWAL", rootDir, prefix); } - } - // confirm non-empty before calling - private static long getLastTxid(Deque queue) { - return queue.peekLast().getTxid(); + this.setWaitOnShutdownInSeconds(conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, + DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS), ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); } - private void appendAndSync() { - final AsyncWriter writer = this.writer; - // maybe a sync request is not queued when we issue a sync, so check here to see if we could - // finish some. - finishSync(); - long newHighestProcessedAppendTxid = -1L; - // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single - // threaded, this could save us some cycles - boolean addedToUnackedAppends = false; - for (Iterator iter = toWriteAppends.iterator(); iter.hasNext();) { - FSWALEntry entry = iter.next(); - boolean appended; - try { - appended = appendEntry(writer, entry); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } - newHighestProcessedAppendTxid = entry.getTxid(); - iter.remove(); - if (appended) { - // This is possible, when we fail to sync, we will add the unackedAppends back to - // toWriteAppends, so here we may get an entry which is already in the unackedAppends. - if ( - addedToUnackedAppends || unackedAppends.isEmpty() - || getLastTxid(unackedAppends) < entry.getTxid() - ) { - unackedAppends.addLast(entry); - addedToUnackedAppends = true; - } - // See HBASE-25905, here we need to make sure that, we will always write all the entries in - // unackedAppends out. As the code in the consume method will assume that, the entries in - // unackedAppends have all been sent out so if there is roll request and unackedAppends is - // not empty, we could just return as later there will be a syncCompleted call to clear the - // unackedAppends, or a syncFailed to lead us to another state. - // There could be other ways to fix, such as changing the logic in the consume method, but - // it will break the assumption and then (may) lead to a big refactoring. So here let's use - // this way to fix first, can optimize later. - if ( - writer.getLength() - fileLengthAtLastSync >= batchSize - && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) - ) { - break; - } - } - } - // if we have a newer transaction id, update it. - // otherwise, use the previous transaction id. - if (newHighestProcessedAppendTxid > 0) { - highestProcessedAppendTxid = newHighestProcessedAppendTxid; - } else { - newHighestProcessedAppendTxid = highestProcessedAppendTxid; - } - - if (writer.getLength() - fileLengthAtLastSync >= batchSize) { - // sync because buffer size limit. - sync(writer); - return; - } - if (writer.getLength() == fileLengthAtLastSync) { - // we haven't written anything out, just advance the highestSyncedSequence since we may only - // stamped some region sequence id. - if (unackedAppends.isEmpty()) { - highestSyncedTxid.set(highestProcessedAppendTxid); - finishSync(); - trySetReadyForRolling(); - } - return; - } - // reach here means that we have some unsynced data but haven't reached the batch size yet - // but we will not issue a sync directly here even if there are sync requests because we may - // have some new data in the ringbuffer, so let's just return here and delay the decision of - // whether to issue a sync in the caller method. + @Override + protected CompletableFuture doWriterSync(AsyncWriter writer, boolean shouldUseHsync, + long txidWhenSyn) { + return writer.sync(shouldUseHsync); } private void drainNonMarkerEditsAndFailSyncs() { @@ -599,92 +216,11 @@ private void drainNonMarkerEditsAndFailSyncs() { } } - private void consume() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState)) { - return; - } - if (waitingRoll(currentEpochAndState)) { - if (writer.getLength() > fileLengthAtLastSync) { - // issue a sync - sync(writer); - } else { - if (unackedAppends.isEmpty()) { - readyForRolling = true; - readyForRollingCond.signalAll(); - } - } - return; - } - } finally { - consumeLock.unlock(); - } - long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor - <= cursorBound; nextCursor++) { - if (!waitingConsumePayloads.isPublished(nextCursor)) { - break; - } - RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); - switch (truck.type()) { - case APPEND: - toWriteAppends.addLast(truck.unloadAppend()); - break; - case SYNC: - syncFutures.add(truck.unloadSync()); - break; - default: - LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); - break; - } - waitingConsumePayloadsGatingSequence.set(nextCursor); - } + @Override + protected void preAppendAndSync() { if (markerEditOnly()) { drainNonMarkerEditsAndFailSyncs(); } - appendAndSync(); - if (hasConsumerTask.get()) { - return; - } - if (toWriteAppends.isEmpty()) { - if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { - consumerScheduled.set(false); - // recheck here since in append and sync we do not hold the consumeLock. Thing may - // happen like - // 1. we check cursor, no new entry - // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and - // give up scheduling the consumer task. - // 3. we set consumerScheduled to false and also give up scheduling consumer task. - if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { - // we will give up consuming so if there are some unsynced data we need to issue a sync. - if ( - writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() - && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync - ) { - // no new data in the ringbuffer and we have at least one sync request - sync(writer); - } - return; - } else { - // maybe someone has grabbed this before us - if (!consumerScheduled.compareAndSet(false, true)) { - return; - } - } - } - } - // reschedule if we still have something to write. - consumeExecutor.execute(consumer); - } - - private boolean shouldScheduleConsumer() { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { - return false; - } - return consumerScheduled.compareAndSet(false, true); } // This is used by sync replication, where we are going to close the wal soon after we reopen all @@ -694,55 +230,11 @@ protected boolean markerEditOnly() { } @Override - protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException { + protected void precheckBeforeAppendWALEdit(RegionInfo hri, WALKeyImpl key, WALEdit edits, + boolean inMemstore) throws IOException { if (markerEditOnly() && !edits.isMetaEdit()) { throw new IOException("WAL is closing, only marker edit is allowed"); } - long txid = - stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); - if (shouldScheduleConsumer()) { - consumeExecutor.execute(consumer); - } - return txid; - } - - @Override - protected void doSync(boolean forceSync) throws IOException { - long txid = waitingConsumePayloads.next(); - SyncFuture future; - try { - future = getSyncFuture(txid, forceSync); - RingBufferTruck truck = waitingConsumePayloads.get(txid); - truck.load(future); - } finally { - waitingConsumePayloads.publish(txid); - } - if (shouldScheduleConsumer()) { - consumeExecutor.execute(consumer); - } - blockOnSync(future); - } - - @Override - protected void doSync(long txid, boolean forceSync) throws IOException { - if (highestSyncedTxid.get() >= txid) { - return; - } - // here we do not use ring buffer sequence as txid - long sequence = waitingConsumePayloads.next(); - SyncFuture future; - try { - future = getSyncFuture(txid, forceSync); - RingBufferTruck truck = waitingConsumePayloads.get(sequence); - truck.load(future); - } finally { - waitingConsumePayloads.publish(sequence); - } - if (shouldScheduleConsumer()) { - consumeExecutor.execute(consumer); - } - blockOnSync(future); } protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { @@ -755,120 +247,11 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { return createAsyncWriter(fs, path); } - private void waitForSafePoint() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || this.writer == null) { - return; - } - consumerScheduled.set(true); - epochAndState = currentEpochAndState | 1; - readyForRolling = false; - consumeExecutor.execute(consumer); - while (!readyForRolling) { - readyForRollingCond.awaitUninterruptibly(); - } - } finally { - consumeLock.unlock(); - } - } - - private void closeWriter(AsyncWriter writer, Path path) { - inflightWALClosures.put(path.getName(), writer); - closeExecutor.execute(() -> { - try { - writer.close(); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } finally { - // call this even if the above close fails, as there is no other chance we can set closed to - // true, it will not cause big problems. - markClosedAndClean(path); - inflightWALClosures.remove(path.getName()); - } - }); - } - @Override - protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) - throws IOException { - Preconditions.checkNotNull(nextWriter); - waitForSafePoint(); - // we will call rollWriter in init method, where we want to create the first writer and - // obviously the previous writer is null, so here we need this null check. And why we must call - // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after - // closing the writer asynchronously, we need to make sure the WALProps is put into - // walFile2Props before we call markClosedAndClean - if (writer != null) { - long oldFileLen = writer.getLength(); - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); - closeWriter(writer, oldPath); - } else { - logRollAndSetupWalProps(oldPath, newPath, 0); - } - - this.writer = nextWriter; + protected void onWriterReplaced(AsyncWriter nextWriter) { if (nextWriter instanceof AsyncProtobufLogWriter) { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } - this.fileLengthAtLastSync = nextWriter.getLength(); - this.highestProcessedAppendTxidAtLastSync = 0L; - consumeLock.lock(); - try { - consumerScheduled.set(true); - int currentEpoch = epochAndState >>> 2; - int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; - // set a new epoch and also clear waitingRoll and writerBroken - this.epochAndState = nextEpoch << 2; - // Reset rollRequested status - rollRequested.set(false); - consumeExecutor.execute(consumer); - } finally { - consumeLock.unlock(); - } - } - - @Override - protected void doShutdown() throws IOException { - waitForSafePoint(); - closeWriter(this.writer, getOldPath()); - this.writer = null; - closeExecutor.shutdown(); - try { - if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" - + " the close of async writer doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS - + "\""); - } - } catch (InterruptedException e) { - LOG.error("The wait for close of async writer is interrupted"); - Thread.currentThread().interrupt(); - } - IOException error = new IOException("WAL has been closed"); - long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - // drain all the pending sync requests - for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor - <= cursorBound; nextCursor++) { - if (!waitingConsumePayloads.isPublished(nextCursor)) { - break; - } - RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); - switch (truck.type()) { - case SYNC: - syncFutures.add(truck.unloadSync()); - break; - default: - break; - } - } - // and fail them - syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); - if (!(consumeExecutor instanceof EventLoop)) { - consumeExecutor.shutdown(); - } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index a1b07baadf67..28e6a460316a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,27 +17,15 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; -import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; -import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -45,14 +33,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.FSHLogProvider; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -61,8 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * The original implementation of FSWAL. */ @@ -110,8 +92,6 @@ public class FSHLog extends AbstractFSWAL { private static final String LOW_REPLICATION_ROLL_LIMIT = "hbase.regionserver.hlog.lowreplication.rolllimit"; private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5; - private static final String ROLL_ERRORS_TOLERATED = "hbase.regionserver.logroll.errors.tolerated"; - private static final int DEFAULT_ROLL_ERRORS_TOLERATED = 2; private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count"; private static final int DEFAULT_SYNCER_COUNT = 5; private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count"; @@ -121,22 +101,10 @@ public class FSHLog extends AbstractFSWAL { "hbase.wal.fshlog.wait.on.shutdown.seconds"; private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; - /** - * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends - * and syncs are each put on the ring which means handlers need to smash up against the ring twice - * (can we make it once only? ... maybe not since time to append is so different from time to sync - * and sometimes we don't want to sync or we want to async the sync). The ring is where we make - * sure of our ordering and it is also where we do batching up of handler sync calls. - */ - private final Disruptor disruptor; - - /** - * This fellow is run by the above appendExecutor service but it is all about batching up appends - * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against - * this, keep a reference to this handler and do explicit close on way out to make sure all - * flushed out before we exit. - */ - private final RingBufferEventHandler ringBufferEventHandler; + private static final IOException WITER_REPLACED_EXCEPTION = + new IOException("Writer was replaced!"); + private static final IOException WITER_BROKEN_EXCEPTION = new IOException("Wirter was broken!"); + private static final IOException WAL_CLOSE_EXCEPTION = new IOException("WAL was closed!"); /** * FSDataOutputStream associated with the current SequenceFile.writer @@ -161,37 +129,15 @@ public class FSHLog extends AbstractFSWAL { // Enable it if the replications recover. private volatile boolean lowReplicationRollEnabled = true; - /** Number of log close errors tolerated before we abort */ - private final int closeErrorsTolerated; - - private final AtomicInteger closeErrorCount = new AtomicInteger(); - - private final int waitOnShutdownInSeconds; + private final int syncerCount; + private final int maxSyncRequestCount; /** - * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs - * using our logger instead of java native logger. + * Which syncrunner to use next. */ - static class RingBufferExceptionHandler implements ExceptionHandler { + private int syncRunnerIndex = 0; - @Override - public void handleEventException(Throwable ex, long sequence, RingBufferTruck event) { - LOG.error("Sequence=" + sequence + ", event=" + event, ex); - throw new RuntimeException(ex); - } - - @Override - public void handleOnStartException(Throwable ex) { - LOG.error(ex.toString(), ex); - throw new RuntimeException(ex); - } - - @Override - public void handleOnShutdownException(Throwable ex) { - LOG.error(ex.toString(), ex); - throw new RuntimeException(ex); - } - } + private SyncRunner[] syncRunners = null; /** * Constructor. @@ -246,29 +192,34 @@ public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir)); this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); - this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED); - this.waitOnShutdownInSeconds = - conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); - // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is - // put on the ring buffer. - String hostingThreadName = Thread.currentThread().getName(); - // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense - // spinning as other strategies do. - this.disruptor = new Disruptor<>(RingBufferTruck::new, getPreallocatedEventCount(), - new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d") - .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), - ProducerType.MULTI, new BlockingWaitStrategy()); + // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. - this.disruptor.getRingBuffer().next(); - int syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT); - int maxBatchCount = conf.getInt(MAX_BATCH_COUNT, + + this.syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT); + this.maxSyncRequestCount = conf.getInt(MAX_BATCH_COUNT, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT)); - this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount); - this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler()); - this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); - // Starting up threads in constructor is a no no; Interface should have an init call. - this.disruptor.start(); + + this.createSingleThreadPoolConsumeExecutor("FSHLog", rootDir, prefix); + + this.setWaitOnShutdownInSeconds( + conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS), + FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); + } + + @Override + public void init() throws IOException { + super.init(); + this.createSyncRunnersAndStart(); + } + + private void createSyncRunnersAndStart() { + this.syncRunnerIndex = 0; + this.syncRunners = new SyncRunner[syncerCount]; + for (int i = 0; i < syncerCount; i++) { + this.syncRunners[i] = new SyncRunner("sync." + i, maxSyncRequestCount); + this.syncRunners[i].start(); + } } /** @@ -311,207 +262,70 @@ protected Writer createWriterInstance(final Path path) throws IOException { return writer; } - /** - * Used to manufacture race condition reliably. For testing only. - * @see #beforeWaitOnSafePoint() - */ - protected void afterCreatingZigZagLatch() { - } - - /** - * @see #afterCreatingZigZagLatch() - */ - protected void beforeWaitOnSafePoint() { - } - @Override protected void doAppend(Writer writer, FSWALEntry entry) throws IOException { writer.append(entry); } @Override - protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { - // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer - // thread will eventually pause. An error hereafter needs to release the writer thread - // regardless -- hence the finally block below. Note, this method is called from the FSHLog - // constructor BEFORE the ring buffer is set running so it is null on first time through - // here; allow for that. - SyncFuture syncFuture = null; - SafePointZigZagLatch zigzagLatch = null; - long sequence = -1L; - if (this.writer != null && this.ringBufferEventHandler != null) { - // Get sequence first to avoid dead lock when ring buffer is full - // Considering below sequence - // 1. replaceWriter is called and zigzagLatch is initialized - // 2. ringBufferEventHandler#onEvent is called and arrives at #attainSafePoint(long) then wait - // on safePointReleasedLatch - // 3. Since ring buffer is full, if we get sequence when publish sync, the replaceWriter - // thread will wait for the ring buffer to be consumed, but the only consumer is waiting - // replaceWriter thread to release safePointReleasedLatch, which causes a deadlock - sequence = getSequenceOnRingBuffer(); - zigzagLatch = this.ringBufferEventHandler.attainSafePoint(); - } - afterCreatingZigZagLatch(); - try { - // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the - // ring buffer between the above notification of writer that we want it to go to - // 'safe point' and then here where we are waiting on it to attain safe point. Use - // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it - // to come back. Cleanup this syncFuture down below after we are ready to run again. - try { - if (zigzagLatch != null) { - // use assert to make sure no change breaks the logic that - // sequence and zigzagLatch will be set together - assert sequence > 0L : "Failed to get sequence from ring buffer"; - syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence, false)); - } - } catch (FailedSyncBeforeLogCloseException e) { - // If unflushed/unsynced entries on close, it is reason to abort. - if (isUnflushedEntries()) { - throw e; - } - LOG.warn( - "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); - } - // It is at the safe point. Swap out writer from under the blocked writer thread. - // we will call rollWriter in init method, where we want to create the first writer and - // obviously the previous writer is null, so here we need this null check. And why we must - // call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean - // after closing the writer asynchronously, we need to make sure the WALProps is put into - // walFile2Props before we call markClosedAndClean - if (this.writer != null) { - long oldFileLen = this.writer.getLength(); - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); - // In case of having unflushed entries or we already reached the - // closeErrorsTolerated count, call the closeWriter inline rather than in async - // way so that in case of an IOE we will throw it back and abort RS. - inflightWALClosures.put(oldPath.getName(), writer); - if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) { - try { - closeWriter(this.writer, oldPath, true); - } finally { - inflightWALClosures.remove(oldPath.getName()); - } - } else { - Writer localWriter = this.writer; - closeExecutor.execute(() -> { - try { - closeWriter(localWriter, oldPath, false); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } finally { - // call this even if the above close fails, as there is no other chance we can set - // closed to true, it will not cause big problems. - markClosedAndClean(oldPath); - inflightWALClosures.remove(oldPath.getName()); - } - }); - } - } else { - logRollAndSetupWalProps(oldPath, newPath, 0); - } - - this.writer = nextWriter; - if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { - this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); - } else { - this.hdfs_out = null; - } - } catch (InterruptedException ie) { - // Perpetuate the interrupt - Thread.currentThread().interrupt(); - } catch (IOException e) { - long count = getUnflushedEntriesCount(); - LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e); - throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e); - } finally { - // Let the writer thread go regardless, whether error or not. - if (zigzagLatch != null) { - // Reset rollRequested status - rollRequested.set(false); - zigzagLatch.releaseSafePoint(); - // syncFuture will be null if we failed our wait on safe point above. Otherwise, if - // latch was obtained successfully, the sync we threw in either trigger the latch or it - // got stamped with an exception because the WAL was damaged and we could not sync. Now - // the write pipeline has been opened up again by releasing the safe point, process the - // syncFuture we got above. This is probably a noop but it may be stale exception from - // when old WAL was in place. Catch it if so. - if (syncFuture != null) { - try { - blockOnSync(syncFuture); - } catch (IOException ioe) { - if (LOG.isTraceEnabled()) { - LOG.trace("Stale sync exception", ioe); - } - } - } - } + protected void onWriterReplaced(Writer nextWriter) { + if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { + this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); + } else { + this.hdfs_out = null; } + this.createSyncRunnersAndStart(); } - private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException { - Span span = Span.current(); - try { - span.addEvent("closing writer"); - writer.close(); - span.addEvent("writer closed"); - } catch (IOException ioe) { - int errors = closeErrorCount.incrementAndGet(); - boolean hasUnflushedEntries = isUnflushedEntries(); - if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated))) { - LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\", errors=" - + errors + ", hasUnflushedEntries=" + hasUnflushedEntries); - throw ioe; + @Override + protected void doCleanUpResources() { + this.shutDownSyncRunners(); + }; + + private void shutDownSyncRunners() { + SyncRunner[] syncRunnersToUse = this.syncRunners; + if (syncRunnersToUse != null) { + for (SyncRunner syncRunner : syncRunnersToUse) { + syncRunner.shutDown(); } - LOG.warn("Riding over failed WAL close of " + path - + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe); } + this.syncRunners = null; } @Override - protected void doShutdown() throws IOException { - // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we - // have stopped incoming appends before calling this else it will not shutdown. We are - // conservative below waiting a long time and if not elapsed, then halting. - if (this.disruptor != null) { - long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); - try { - this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " - + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); - this.disruptor.halt(); - this.disruptor.shutdown(); - } - } + protected CompletableFuture doWriterSync(Writer writer, boolean shouldUseHSync, + long txidWhenSync) { + CompletableFuture future = new CompletableFuture<>(); + SyncRequest syncRequest = new SyncRequest(writer, shouldUseHSync, txidWhenSync, future); + this.offerSyncRequest(syncRequest); + return future; + } - if (LOG.isDebugEnabled()) { - LOG.debug("Closing WAL writer in " + CommonFSUtils.getPath(walDir)); - } - if (this.writer != null) { - this.writer.close(); - this.writer = null; - } - closeExecutor.shutdown(); - try { - if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error( - "We have waited {} seconds but the close of writer(s) doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"{}\"", - this.waitOnShutdownInSeconds, FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS); + private void offerSyncRequest(SyncRequest syncRequest) { + for (int i = 0; i < this.syncRunners.length; i++) { + this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; + if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) { + return; } - } catch (InterruptedException e) { - LOG.error("The wait for termination of FSHLog writer(s) is interrupted"); - Thread.currentThread().interrupt(); } + syncRequest.completableFuture + .completeExceptionally(new IOException("There is no available syncRunner.")); } - @Override - protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, - final boolean inMemstore) throws IOException { - return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, - disruptor.getRingBuffer()); + static class SyncRequest { + private final Writer writer; + private final boolean shouldUseHSync; + private final long sequenceWhenSync; + private final CompletableFuture completableFuture; + + public SyncRequest(Writer writer, boolean shouldUseHSync, long txidWhenSync, + CompletableFuture completableFuture) { + this.writer = writer; + this.shouldUseHSync = shouldUseHSync; + this.sequenceWhenSync = txidWhenSync; + this.completableFuture = completableFuture; + } + } /** @@ -531,10 +345,9 @@ protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit * completes. */ private class SyncRunner extends Thread { - private volatile long sequence; // Keep around last exception thrown. Clear on successful sync. - private final BlockingQueue syncFutures; - private volatile SyncFuture takeSyncFuture = null; + private final BlockingQueue syncRequests; + private volatile boolean shutDown = false; SyncRunner(final String name, final int maxHandlersCount) { super(name); @@ -551,183 +364,154 @@ private class SyncRunner extends Thread { // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually // much fewer in number than the user-space handlers so Q-size should be user handlers plus // some space for these other handlers. Lets multiply by 3 for good-measure. - this.syncFutures = new LinkedBlockingQueue<>(maxHandlersCount * 3); + this.syncRequests = new LinkedBlockingQueue<>(maxHandlersCount * 3); } - void offer(final long sequence, final SyncFuture[] syncFutures, final int syncFutureCount) { - // Set sequence first because the add to the queue will wake the thread if sleeping. - this.sequence = sequence; - for (int i = 0; i < syncFutureCount; ++i) { - this.syncFutures.add(syncFutures[i]); + boolean offer(SyncRequest syncRequest) { + if (this.shutDown) { + return false; } - } - /** - * Release the passed syncFuture - * @return Returns 1. - */ - private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, - final Throwable t) { - if (!syncFuture.done(currentSequence, t)) { - throw new IllegalStateException(); + if (!this.syncRequests.offer(syncRequest)) { + return false; } - // This function releases one sync future only. - return 1; + // recheck + if (this.shutDown) { + if (this.syncRequests.remove(syncRequest)) { + return false; + } + } + return true; } - /** - * Release all SyncFutures whose sequence is <= currentSequence. - * @param t May be non-null if we are processing SyncFutures because an exception was thrown. - * @return Count of SyncFutures we let go. - */ - private int releaseSyncFutures(final long currentSequence, final Throwable t) { - int syncCount = 0; - for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { - if (syncFuture.getTxid() > currentSequence) { + private void completeSyncRequests(SyncRequest syncRequest, long syncedSequenceId) { + if (syncRequest != null) { + syncRequest.completableFuture.complete(syncedSequenceId); + } + while (true) { + SyncRequest head = this.syncRequests.peek(); + if (head == null) { break; } - releaseSyncFuture(syncFuture, currentSequence, t); - if (!this.syncFutures.remove(syncFuture)) { - throw new IllegalStateException(syncFuture.toString()); + if (head.sequenceWhenSync > syncedSequenceId) { + break; } - syncCount++; + head.completableFuture.complete(syncedSequenceId); + this.syncRequests.poll(); } - return syncCount; } - /** - * @param sequence The sequence we ran the filesystem sync against. - * @return Current highest synced sequence. - */ - private long updateHighestSyncedSequence(long sequence) { - long currentHighestSyncedSequence; - // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. - do { - currentHighestSyncedSequence = highestSyncedTxid.get(); - if (currentHighestSyncedSequence >= sequence) { - // Set the sync number to current highwater mark; might be able to let go more - // queued sync futures - sequence = currentHighestSyncedSequence; + private void completeExceptionallySyncRequests(SyncRequest syncRequest, Exception exception) { + if (syncRequest != null) { + syncRequest.completableFuture.completeExceptionally(exception); + } + while (true) { + SyncRequest head = this.syncRequests.peek(); + if (head == null) { break; } - } while (!highestSyncedTxid.compareAndSet(currentHighestSyncedSequence, sequence)); - return sequence; + if (head.writer != syncRequest.writer) { + break; + } + head.completableFuture.completeExceptionally(exception); + this.syncRequests.poll(); + } } - boolean areSyncFuturesReleased() { - // check whether there is no sync futures offered, and no in-flight sync futures that is being - // processed. - return syncFutures.size() <= 0 && takeSyncFuture == null; + private SyncRequest takeSyncRequest() throws InterruptedException { + while (true) { + // We have to process what we 'take' from the queue + SyncRequest syncRequest = this.syncRequests.take(); + // See if we can process any syncfutures BEFORE we go sync. + long currentHighestSyncedSequence = highestSyncedTxid.get(); + if (syncRequest.sequenceWhenSync < currentHighestSyncedSequence) { + syncRequest.completableFuture.complete(currentHighestSyncedSequence); + continue; + } + return syncRequest; + } } @Override public void run() { - long currentSequence; - while (!isInterrupted()) { - int syncCount = 0; - + while (!this.shutDown) { try { - // Make a local copy of takeSyncFuture after we get it. We've been running into NPEs - // 2020-03-22 16:54:32,180 WARN [sync.1] wal.FSHLog$SyncRunner(589): UNEXPECTED - // java.lang.NullPointerException - // at org.apache.hadoop.hbase.regionserver.wal.FSHLog$SyncRunner.run(FSHLog.java:582) - // at java.lang.Thread.run(Thread.java:748) - SyncFuture sf; - while (true) { - takeSyncFuture = null; - // We have to process what we 'take' from the queue - takeSyncFuture = this.syncFutures.take(); - // Make local copy. - sf = takeSyncFuture; - currentSequence = this.sequence; - long syncFutureSequence = sf.getTxid(); - if (syncFutureSequence > currentSequence) { - throw new IllegalStateException("currentSequence=" + currentSequence - + ", syncFutureSequence=" + syncFutureSequence); - } - // See if we can process any syncfutures BEFORE we go sync. - long currentHighestSyncedSequence = highestSyncedTxid.get(); - if (currentSequence < currentHighestSyncedSequence) { - syncCount += releaseSyncFuture(sf, currentHighestSyncedSequence, null); - // Done with the 'take'. Go around again and do a new 'take'. - continue; - } - break; - } + SyncRequest syncRequest = this.takeSyncRequest(); // I got something. Lets run. Save off current sequence number in case it changes // while we run. - long start = System.nanoTime(); - Throwable lastException = null; + long currentSequenceToUse = syncRequest.sequenceWhenSync; + boolean writerBroken = isWriterBroken(); + long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; + Writer currentWriter = writer; + if (currentWriter != syncRequest.writer) { + syncRequest.completableFuture.completeExceptionally(WITER_REPLACED_EXCEPTION); + continue; + } + if (writerBroken) { + syncRequest.completableFuture.completeExceptionally(WITER_BROKEN_EXCEPTION); + continue; + } + if (currentHighestProcessedAppendTxid > currentSequenceToUse) { + currentSequenceToUse = currentHighestProcessedAppendTxid; + } + Exception lastException = null; try { - long unSyncedFlushSeq = highestUnsyncedTxid; - writer.sync(sf.isForceSync()); - if (unSyncedFlushSeq > currentSequence) { - currentSequence = unSyncedFlushSeq; - } - currentSequence = updateHighestSyncedSequence(currentSequence); + writer.sync(syncRequest.shouldUseHSync); } catch (IOException e) { - LOG.error("Error syncing, request close of WAL", e); + LOG.error("Error syncing", e); lastException = e; } catch (Exception e) { LOG.warn("UNEXPECTED", e); lastException = e; } finally { - // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); - // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { - requestLogRoll(ERROR); + this.completeExceptionallySyncRequests(syncRequest, lastException); } else { - checkLogRoll(); + this.completeSyncRequests(syncRequest, currentSequenceToUse); } } - postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { // Presume legit interrupt. - Thread.currentThread().interrupt(); + LOG.info("interrupted"); } catch (Throwable t) { LOG.warn("UNEXPECTED, continuing", t); } } + this.clearSyncRequestsWhenShutDown(); + } + + private void clearSyncRequestsWhenShutDown() { + while (true) { + SyncRequest syncRequest = this.syncRequests.poll(); + if (syncRequest == null) { + break; + } + syncRequest.completableFuture.completeExceptionally(WAL_CLOSE_EXCEPTION); + } + } + + void shutDown() { + try { + this.shutDown = true; + this.interrupt(); + this.join(); + } catch (InterruptedException e) { + LOG.warn("interrupted", e); + Thread.currentThread().interrupt(); + } } } - /** - * Schedule a log roll if needed. - */ - private boolean checkLogRoll() { - // If we have already requested a roll, do nothing + @Override + protected void checkSlowSyncCount() { if (isLogRollRequested()) { - return false; - } - // Will return immediately if we are in the middle of a WAL log roll currently. - if (!rollWriterLock.tryLock()) { - return false; + return; } - try { - if (doCheckLogLowReplication()) { - LOG.warn("Requesting log roll because of low replication, current pipeline: " - + Arrays.toString(getPipeline())); - requestLogRoll(LOW_REPLICATION); - return true; - } else if (writer != null && writer.getLength() > logrollsize) { - if (LOG.isDebugEnabled()) { - LOG.debug("Requesting log roll because of file size threshold; length=" - + writer.getLength() + ", logrollsize=" + logrollsize); - } - requestLogRoll(SIZE); - return true; - } else if (doCheckSlowSync()) { - // We log this already in checkSlowSync - requestLogRoll(SLOW_SYNC); - return true; - } - } finally { - rollWriterLock.unlock(); + if (doCheckSlowSync()) { + // We log this already in checkSlowSync + requestLogRoll(SLOW_SYNC); } - return false; } /** Returns true if number of replicas for the WAL is lower than threshold */ @@ -777,33 +561,6 @@ protected boolean doCheckLogLowReplication() { return logRollNeeded; } - protected long getSequenceOnRingBuffer() { - return this.disruptor.getRingBuffer().next(); - } - - private SyncFuture publishSyncOnRingBuffer(boolean forceSync) { - long sequence = getSequenceOnRingBuffer(); - return publishSyncOnRingBuffer(sequence, forceSync); - } - - protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) { - // here we use ring buffer sequence as transaction id - SyncFuture syncFuture = getSyncFuture(sequence, forceSync); - try { - RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - truck.load(syncFuture); - } finally { - this.disruptor.getRingBuffer().publish(sequence); - } - return syncFuture; - } - - // Sync all known transactions - private void publishSyncThenBlockOnCompletion(boolean forceSync) throws IOException { - SyncFuture syncFuture = publishSyncOnRingBuffer(forceSync); - blockOnSync(syncFuture); - } - /** * {@inheritDoc} *

@@ -824,20 +581,6 @@ int getLogReplication() { return 0; } - @Override - protected void doSync(boolean forceSync) throws IOException { - publishSyncThenBlockOnCompletion(forceSync); - } - - @Override - protected void doSync(long txid, boolean forceSync) throws IOException { - if (this.highestSyncedTxid.get() >= txid) { - // Already sync'd. - return; - } - publishSyncThenBlockOnCompletion(forceSync); - } - boolean isLowReplicationRollEnabled() { return lowReplicationRollEnabled; } @@ -846,361 +589,6 @@ boolean isLowReplicationRollEnabled() { ClassSize.align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER) + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG)); - /** - * This class is used coordinating two threads holding one thread at a 'safe point' while the - * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL - * writer while its WAL is swapped out from under it by another thread. - *

- * Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until Thread B - * gets there. When the 'safe point' has been attained, Thread B signals Thread A. Thread B then - * holds at the 'safe point'. Thread A on notification that Thread B is paused, goes ahead and - * does the work it needs to do while Thread B is holding. When Thread A is done, it flags B and - * then Thread A and Thread B continue along on their merry way. Pause and signalling 'zigzags' - * between the two participating threads. We use two latches -- one the inverse of the other -- - * pausing and signaling when states are achieved. - *

- * To start up the drama, Thread A creates an instance of this class each time it would do this - * zigzag dance and passes it to Thread B (these classes use Latches so it is one shot only). - * Thread B notices the new instance (via reading a volatile reference or how ever) and it starts - * to work toward the 'safe point'. Thread A calls {@link #waitSafePoint(SyncFuture)} when it - * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in - * {@link #waitSafePoint(SyncFuture)} until Thread B reaches the 'safe point'. Once there, Thread - * B frees Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B is at the - * 'safe point' and that it is holding there (When Thread B calls {@link #safePointAttained()} it - * blocks here until Thread A calls {@link #releaseSafePoint()}). Thread A proceeds to do what it - * needs to do while Thread B is paused. When finished, it lets Thread B lose by calling - * {@link #releaseSafePoint()} and away go both Threads again. - */ - static class SafePointZigZagLatch { - /** - * Count down this latch when safe point attained. - */ - private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1); - /** - * Latch to wait on. Will be released when we can proceed. - */ - private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - - private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException { - Throwable t = syncFuture.getThrowable(); - if (t != null) { - throw new FailedSyncBeforeLogCloseException(t); - } - } - - /** - * For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A - * will be held in here until Thread B calls {@link #safePointAttained()} - * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with an - * exception, then something is up w/ our syncing. - * @return The passed syncFuture - */ - SyncFuture waitSafePoint(SyncFuture syncFuture) - throws InterruptedException, FailedSyncBeforeLogCloseException { - while (!this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) { - checkIfSyncFailed(syncFuture); - } - checkIfSyncFailed(syncFuture); - return syncFuture; - } - - /** Returns if the safepoint has been attained. */ - @InterfaceAudience.Private - boolean isSafePointAttained() { - return this.safePointAttainedLatch.getCount() == 0; - } - - /** - * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread - * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called - * by Thread A. - */ - void safePointAttained() throws InterruptedException { - this.safePointAttainedLatch.countDown(); - this.safePointReleasedLatch.await(); - } - - /** - * Called by Thread A when it is done with the work it needs to do while Thread B is halted. - * This will release the Thread B held in a call to {@link #safePointAttained()} - */ - void releaseSafePoint() { - this.safePointReleasedLatch.countDown(); - } - - /** Returns True is this is a 'cocked', fresh instance, and not one that has already fired. */ - boolean isCocked() { - return this.safePointAttainedLatch.getCount() > 0 - && this.safePointReleasedLatch.getCount() > 0; - } - } - - /** - * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE - * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up - * syncs. There is no discernible benefit batching appends so we just append as they come in - * because it simplifies the below implementation. See metrics for batching effectiveness (In - * measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 handler - * sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, YMMV). - *

- * Herein, we have an array into which we store the sync futures as they come in. When we have a - * 'batch', we'll then pass what we have collected to a SyncRunner thread to do the filesystem - * sync. When it completes, it will then call {@link SyncFuture#done(long, Throwable)} on each of - * SyncFutures in the batch to release blocked Handler threads. - *

- * I've tried various effects to try and make latencies low while keeping throughput high. I've - * tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs - * coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've - * tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or - * ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to - * 'perf stats') that has to be done; small increases in stall percentages seem to have a big - * impact on throughput/latencies. The below model where we have an array into which we stash the - * syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 - * for more detail. - */ - class RingBufferEventHandler implements EventHandler, LifecycleAware { - private final SyncRunner[] syncRunners; - private final SyncFuture[] syncFutures; - // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all - // syncFutures to the next sync'ing thread. - private AtomicInteger syncFuturesCount = new AtomicInteger(); - private volatile SafePointZigZagLatch zigzagLatch; - /** - * Set if we get an exception appending or syncing so that all subsequence appends and syncs on - * this WAL fail until WAL is replaced. - */ - private Exception exception = null; - /** - * Object to block on while waiting on safe point. - */ - private final Object safePointWaiter = new Object(); - private volatile boolean shutdown = false; - - /** - * Which syncrunner to use next. - */ - private int syncRunnerIndex; - - RingBufferEventHandler(final int syncRunnerCount, final int maxBatchCount) { - this.syncFutures = new SyncFuture[maxBatchCount]; - this.syncRunners = new SyncRunner[syncRunnerCount]; - for (int i = 0; i < syncRunnerCount; i++) { - this.syncRunners[i] = new SyncRunner("sync." + i, maxBatchCount); - } - } - - private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { - // There could be handler-count syncFutures outstanding. - for (int i = 0; i < this.syncFuturesCount.get(); i++) { - this.syncFutures[i].done(sequence, e); - } - offerDoneSyncsBackToCache(); - } - - /** - * Offers the finished syncs back to the cache for reuse. - */ - private void offerDoneSyncsBackToCache() { - for (int i = 0; i < this.syncFuturesCount.get(); i++) { - syncFutureCache.offer(syncFutures[i]); - } - this.syncFuturesCount.set(0); - } - - /** Returns True if outstanding sync futures still */ - private boolean isOutstandingSyncs() { - // Look at SyncFutures in the EventHandler - for (int i = 0; i < this.syncFuturesCount.get(); i++) { - if (!this.syncFutures[i].isDone()) { - return true; - } - } - - return false; - } - - private boolean isOutstandingSyncsFromRunners() { - // Look at SyncFutures in the SyncRunners - for (SyncRunner syncRunner : syncRunners) { - if (syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) { - return true; - } - } - return false; - } - - @Override - // We can set endOfBatch in the below method if at end of our this.syncFutures array - public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) - throws Exception { - // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll - // add appends to dfsclient as they come in. Batching appends doesn't give any significant - // benefit on measurement. Handler sync calls we will batch up. If we get an exception - // appending an edit, we fail all subsequent appends and syncs with the same exception until - // the WAL is reset. It is important that we not short-circuit and exit early this method. - // It is important that we always go through the attainSafePoint on the end. Another thread, - // the log roller may be waiting on a signal from us here and will just hang without it. - - try { - if (truck.type() == RingBufferTruck.Type.SYNC) { - this.syncFutures[this.syncFuturesCount.getAndIncrement()] = truck.unloadSync(); - // Force flush of syncs if we are carrying a full complement of syncFutures. - if (this.syncFuturesCount.get() == this.syncFutures.length) { - endOfBatch = true; - } - } else if (truck.type() == RingBufferTruck.Type.APPEND) { - FSWALEntry entry = truck.unloadAppend(); - try { - if (this.exception != null) { - // Return to keep processing events coming off the ringbuffer - return; - } - append(entry); - } catch (Exception e) { - // Failed append. Record the exception. - this.exception = e; - // invoking cleanupOutstandingSyncsOnException when append failed with exception, - // it will cleanup existing sync requests recorded in syncFutures but not offered to - // SyncRunner yet, - // so there won't be any sync future left over if no further truck published to - // disruptor. - cleanupOutstandingSyncsOnException(sequence, - this.exception instanceof DamagedWALException - ? this.exception - : new DamagedWALException("On sync", this.exception)); - // Return to keep processing events coming off the ringbuffer - return; - } finally { - entry.release(); - } - } else { - // What is this if not an append or sync. Fail all up to this!!! - cleanupOutstandingSyncsOnException(sequence, - new IllegalStateException("Neither append nor sync")); - // Return to keep processing. - return; - } - - // TODO: Check size and if big go ahead and call a sync if we have enough data. - // This is a sync. If existing exception, fall through. Else look to see if batch. - if (this.exception == null) { - // If not a batch, return to consume more events from the ring buffer before proceeding; - // we want to get up a batch of syncs and appends before we go do a filesystem sync. - if (!endOfBatch || this.syncFuturesCount.get() <= 0) { - return; - } - // syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows: - // * The maximum value possible for syncRunners.length is Integer.MAX_INT - // * syncRunnerIndex starts at 0 and is incremented only here - // * after the increment, the value is bounded by the '%' operator to - // [0, syncRunners.length), presuming the value was positive prior to - // the '%' operator. - // * after being bound to [0, Integer.MAX_INT - 1], the new value is stored in - // syncRunnerIndex ensuring that it can't grow without bound and overflow. - // * note that the value after the increment must be positive, because the most it - // could have been prior was Integer.MAX_INT - 1 and we only increment by 1. - this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; - try { - // Below expects that the offer 'transfers' responsibility for the outstanding syncs to - // the syncRunner. We should never get an exception in here. - this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures, - this.syncFuturesCount.get()); - } catch (Exception e) { - // Should NEVER get here. - requestLogRoll(ERROR); - this.exception = new DamagedWALException("Failed offering sync", e); - } - } - // We may have picked up an exception above trying to offer sync - if (this.exception != null) { - cleanupOutstandingSyncsOnException(sequence, - this.exception instanceof DamagedWALException - ? this.exception - : new DamagedWALException("On sync", this.exception)); - } - attainSafePoint(sequence); - // It is critical that we offer the futures back to the cache for reuse here after the - // safe point is attained and all the clean up has been done. There have been - // issues with reusing sync futures early causing WAL lockups, see HBASE-25984. - offerDoneSyncsBackToCache(); - } catch (Throwable t) { - LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); - } - } - - SafePointZigZagLatch attainSafePoint() { - this.zigzagLatch = new SafePointZigZagLatch(); - return this.zigzagLatch; - } - - /** - * Check if we should attain safe point. If so, go there and then wait till signalled before we - * proceeding. - */ - private void attainSafePoint(final long currentSequence) { - if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) { - return; - } - // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. - beforeWaitOnSafePoint(); - try { - // Wait on outstanding syncers; wait for them to finish syncing (unless we've been - // shutdown or unless our latch has been thrown because we have been aborted or unless - // this WAL is broken and we can't get a sync/append to complete). - while ( - (!this.shutdown && this.zigzagLatch.isCocked() - && highestSyncedTxid.get() < currentSequence && - // We could be in here and all syncs are failing or failed. Check for this. Otherwise - // we'll just be stuck here for ever. In other words, ensure there syncs running. - isOutstandingSyncs()) - // Wait for all SyncRunners to finish their work so that we can replace the writer - || isOutstandingSyncsFromRunners() - ) { - synchronized (this.safePointWaiter) { - this.safePointWaiter.wait(0, 1); - } - } - // Tell waiting thread we've attained safe point. Can clear this.throwable if set here - // because we know that next event through the ringbuffer will be going to a new WAL - // after we do the zigzaglatch dance. - this.exception = null; - this.zigzagLatch.safePointAttained(); - } catch (InterruptedException e) { - LOG.warn("Interrupted ", e); - Thread.currentThread().interrupt(); - } - } - - /** - * Append to the WAL. Does all CP and WAL listener calls. - */ - void append(final FSWALEntry entry) throws Exception { - try { - FSHLog.this.appendEntry(writer, entry); - } catch (Exception e) { - String msg = - "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; - LOG.warn(msg, e); - requestLogRoll(ERROR); - throw new DamagedWALException(msg, e); - } - } - - @Override - public void onStart() { - for (SyncRunner syncRunner : this.syncRunners) { - syncRunner.start(); - } - } - - @Override - public void onShutdown() { - for (SyncRunner syncRunner : this.syncRunners) { - syncRunner.interrupt(); - } - } - } - /** * This method gets the pipeline for the current WAL. */ @@ -1221,4 +609,5 @@ Writer getWriter() { void setWriter(Writer writer) { this.writer = writer; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 476e3bd330bc..333ec4b78b1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -93,6 +94,7 @@ public void setup() throws IOException { CONF = TEST_UTIL.getConfiguration(); // Disable block cache. CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); + CONF.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, 10000); dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); tableName = TableName.valueOf(name.getMethodName()); } @@ -258,22 +260,16 @@ public void testLockupAroundBadAssignSync() throws IOException { dodgyWAL.throwSyncException = true; Put put = new Put(value); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("2"), value); + region.rsServices = services; region.put(put); } catch (IOException ioe) { threwOnSync = true; } - // An append in the WAL but the sync failed is a server abort condition. That is our - // current semantic. Verify. It takes a while for abort to be called. Just hang here till it - // happens. If it don't we'll timeout the whole test. That is fine. - while (true) { - try { - verify(services, atLeast(1)).abort(anyString(), any(Throwable.class)); - break; - } catch (WantedButNotInvoked t) { - Threads.sleep(1); - } - } + region.rsServices = null; + // An append in the WAL but the sync failed is a server abort condition. That is our + // current semantic. Verify. + verify(services, atLeast(1)).abort(anyString(), any()); try { dodgyWAL.throwAppendException = false; dodgyWAL.throwSyncException = false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index f7e2de16c5d7..03bbbbe47ae6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -147,6 +147,8 @@ import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; @@ -178,6 +180,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -260,6 +263,7 @@ public void setup() throws IOException { method = name.getMethodName(); tableName = TableName.valueOf(method); CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09)); + CONF.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, 10000); } @After @@ -5415,7 +5419,14 @@ public void testPutWithMemStoreFlush() throws Exception { assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0))); } + /** + * For this test,the spied {@link AsyncFSWAL} can not work properly because of a Mockito defect + * that can not deal with classes which have a field of an inner class. See discussions in + * HBASE-15536.When we reuse the code of {@link AsyncFSWAL} for {@link FSHLog}, this test could + * not work for {@link FSHLog} also. + */ @Test + @Ignore public void testDurability() throws Exception { // there are 5 x 5 cases: // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation @@ -5469,6 +5480,7 @@ private void durabilityTest(String method, Durability tableDurability, Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception { Configuration conf = HBaseConfiguration.create(CONF); + conf.setLong(AbstractFSWAL.WAL_SHUTDOWN_WAIT_TIMEOUT_MS, 60 * 60 * 1000); method = method + "_" + tableDurability.name() + "_" + mutationDurability.name(); byte[] family = Bytes.toBytes("family"); Path logDir = new Path(new Path(dir + method), "log"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java deleted file mode 100644 index 75f6c4868994..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.io.Closeables; - -/** - * Testing for lock up of FSHLog. - */ -@Category({ RegionServerTests.class, MediumTests.class }) -public class TestWALLockup { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestWALLockup.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestWALLockup.class); - - @Rule - public TestName name = new TestName(); - - private static final String COLUMN_FAMILY = "MyCF"; - private static final byte[] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); - - private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); - private static Configuration CONF; - private String dir; - - // Test names - protected TableName tableName; - - @Before - public void setup() throws IOException { - CONF = TEST_UTIL.getConfiguration(); - // Disable block cache. - CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); - dir = TEST_UTIL.getDataTestDir("TestHRegion").toString(); - tableName = TableName.valueOf(name.getMethodName()); - } - - @After - public void tearDown() throws Exception { - EnvironmentEdgeManagerTestHelper.reset(); - LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); - TEST_UTIL.cleanupTestDir(); - } - - private String getName() { - return name.getMethodName(); - } - - // A WAL that we can have throw exceptions when a flag is set. - private static final class DodgyFSLog extends FSHLog { - // Set this when want the WAL to start throwing exceptions. - volatile boolean throwException = false; - - // Latch to hold up processing until after another operation has had time to run. - CountDownLatch latch = new CountDownLatch(1); - - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) - throws IOException { - super(fs, root, logDir, conf); - } - - @Override - protected void afterCreatingZigZagLatch() { - // If throwException set, then append will throw an exception causing the WAL to be - // rolled. We'll come in here. Hold up processing until a sync can get in before - // the zigzag has time to complete its setup and get its own sync in. This is what causes - // the lock up we've seen in production. - if (throwException) { - try { - LOG.info("LATCHED"); - // So, timing can have it that the test can run and the bad flush below happens - // before we get here. In this case, we'll be stuck waiting on this latch but there - // is nothing in the WAL pipeline to get us to the below beforeWaitOnSafePoint... - // because all WALs have rolled. In this case, just give up on test. - if (!this.latch.await(5, TimeUnit.SECONDS)) { - LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!"); - } - } catch (InterruptedException e) { - } - } - } - - @Override - protected void beforeWaitOnSafePoint() { - if (throwException) { - LOG.info("COUNTDOWN"); - // Don't countdown latch until someone waiting on it otherwise, the above - // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll - // be stuck; test won't go down - while (this.latch.getCount() <= 0) - Threads.sleep(1); - this.latch.countDown(); - } - } - - @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - if (throwException) { - throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); - } - w.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - if (throwException) { - throw new IOException("FAKE! Failed to replace a bad datanode...APPEND"); - } - w.append(entry); - } - - @Override - public long getLength() { - return w.getLength(); - } - - @Override - public long getSyncedLength() { - return w.getSyncedLength(); - } - }; - } - } - - /** - * Reproduce locking up that happens when we get an inopportune sync during setup for zigzaglatch - * wait. See HBASE-14317. If below is broken, we will see this test timeout because it is locked - * up. - *

- * First I need to set up some mocks for Server and RegionServerServices. I also need to set up a - * dodgy WAL that will throw an exception when we go to append to it. - */ - @Test - public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException { - // Mocked up server and regionserver services. Needed below. - RegionServerServices services = Mockito.mock(RegionServerServices.class); - Mockito.when(services.getConfiguration()).thenReturn(CONF); - Mockito.when(services.isStopped()).thenReturn(false); - Mockito.when(services.isAborted()).thenReturn(false); - - // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test. - FileSystem fs = FileSystem.get(CONF); - Path rootDir = new Path(dir + getName()); - DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); - dodgyWAL.init(); - Path originalWAL = dodgyWAL.getCurrentFileName(); - // I need a log roller running. - LogRoller logRoller = new LogRoller(services); - logRoller.addWAL(dodgyWAL); - // There is no 'stop' once a logRoller is running.. it just dies. - logRoller.start(); - // Now get a region and start adding in edits. - final HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL); - byte[] bytes = Bytes.toBytes(getName()); - NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - scopes.put(COLUMN_FAMILY_BYTES, 0); - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - try { - // First get something into memstore. Make a Put and then pull the Cell out of it. Will - // manage append and sync carefully in below to manufacture hang. We keep adding same - // edit. WAL subsystem doesn't care. - Put put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), - TableName.META_TABLE_NAME, EnvironmentEdgeManager.currentTime(), mvcc, scopes); - WALEdit edit = new WALEdit(); - CellScanner CellScanner = put.cellScanner(); - assertTrue(CellScanner.advance()); - edit.add(CellScanner.current()); - // Put something in memstore and out in the WAL. Do a big number of appends so we push - // out other side of the ringbuffer. If small numbers, stuff doesn't make it to WAL - for (int i = 0; i < 1000; i++) { - region.put(put); - } - // Set it so we start throwing exceptions. - LOG.info("SET throwing of exception on append"); - dodgyWAL.throwException = true; - // This append provokes a WAL roll request - dodgyWAL.appendData(region.getRegionInfo(), key, edit); - boolean exception = false; - try { - dodgyWAL.sync(false); - } catch (Exception e) { - exception = true; - } - assertTrue("Did not get sync exception", exception); - - // Get a memstore flush going too so we have same hung profile as up in the issue over - // in HBASE-14317. Flush hangs trying to get sequenceid because the ringbuffer is held up - // by the zigzaglatch waiting on syncs to come home. - Thread t = new Thread("Flusher") { - @Override - public void run() { - try { - if (region.getMemStoreDataSize() <= 0) { - throw new IOException("memstore size=" + region.getMemStoreDataSize()); - } - region.flush(false); - } catch (IOException e) { - // Can fail trying to flush in middle of a roll. Not a failure. Will succeed later - // when roll completes. - LOG.info("In flush", e); - } - LOG.info("Exiting"); - } - }; - t.setDaemon(true); - t.start(); - // Wait until - while (dodgyWAL.latch.getCount() > 0) { - Threads.sleep(1); - } - // Now assert I got a new WAL file put in place even though loads of errors above. - assertTrue(originalWAL != dodgyWAL.getCurrentFileName()); - // Can I append to it? - dodgyWAL.throwException = false; - try { - region.put(put); - } catch (Exception e) { - LOG.info("In the put", e); - } - } finally { - // To stop logRoller, its server has to say it is stopped. - Mockito.when(services.isStopped()).thenReturn(true); - Closeables.close(logRoller, true); - try { - if (region != null) { - region.close(); - } - if (dodgyWAL != null) { - dodgyWAL.close(); - } - } catch (Exception e) { - LOG.info("On way out", e); - } - } - } - - /** - * If below is broken, we will see this test timeout because RingBufferEventHandler was stuck in - * attainSafePoint. Everyone will wait for sync to finish forever. See HBASE-14317. - */ - @Test - public void testRingBufferEventHandlerStuckWhenSyncFailed() - throws IOException, InterruptedException { - - // A WAL that we can have throw exceptions and slow FSHLog.replaceWriter down - class DodgyFSLog extends FSHLog { - - private volatile boolean zigZagCreated = false; - - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) - throws IOException { - super(fs, root, logDir, conf); - } - - @Override - protected void afterCreatingZigZagLatch() { - zigZagCreated = true; - // Sleep a while to wait for RingBufferEventHandler to get stuck first. - try { - Thread.sleep(3000); - } catch (InterruptedException ignore) { - } - } - - @Override - protected long getSequenceOnRingBuffer() { - return super.getSequenceOnRingBuffer(); - } - - protected void publishSyncOnRingBufferAndBlock(long sequence) { - try { - super.blockOnSync(super.publishSyncOnRingBuffer(sequence, false)); - Assert.fail("Expect an IOException here."); - } catch (IOException ignore) { - // Here, we will get an IOException. - } - } - - @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); - } - - @Override - public void append(Entry entry) throws IOException { - w.append(entry); - } - - @Override - public long getLength() { - return w.getLength(); - } - - @Override - public long getSyncedLength() { - return w.getSyncedLength(); - } - }; - } - } - - // Mocked up server and regionserver services. Needed below. - RegionServerServices services = Mockito.mock(RegionServerServices.class); - Mockito.when(services.getConfiguration()).thenReturn(CONF); - Mockito.when(services.isStopped()).thenReturn(false); - Mockito.when(services.isAborted()).thenReturn(false); - - // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test. - FileSystem fs = FileSystem.get(CONF); - Path rootDir = new Path(dir + getName()); - final DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); - // I need a log roller running. - LogRoller logRoller = new LogRoller(services); - logRoller.addWAL(dodgyWAL); - // There is no 'stop' once a logRoller is running.. it just dies. - logRoller.start(); - - try { - final long seqForSync = dodgyWAL.getSequenceOnRingBuffer(); - - // This call provokes a WAL roll, and we will get a new RingBufferEventHandler.ZigZagLatch - // in LogRoller. - // After creating ZigZagLatch, RingBufferEventHandler would get stuck due to sync event, - // as long as HBASE-14317 hasn't be fixed. - LOG.info("Trigger log roll for creating a ZigZagLatch."); - logRoller.requestRollAll(); - - while (!dodgyWAL.zigZagCreated) { - Thread.sleep(10); - } - - // Send a sync event for RingBufferEventHandler, - // and it gets blocked in RingBufferEventHandler.attainSafePoint - LOG.info("Send sync for RingBufferEventHandler"); - Thread syncThread = new Thread() { - @Override - public void run() { - dodgyWAL.publishSyncOnRingBufferAndBlock(seqForSync); - } - }; - // Sync in another thread to avoid reset SyncFuture again. - syncThread.start(); - syncThread.join(); - - try { - LOG.info("Call sync for testing whether RingBufferEventHandler is hanging."); - dodgyWAL.sync(false); // Should not get a hang here, otherwise we will see timeout in this - // test. - Assert.fail("Expect an IOException here."); - } catch (IOException ignore) { - } - - } finally { - // To stop logRoller, its server has to say it is stopped. - Mockito.when(services.isStopped()).thenReturn(true); - if (logRoller != null) { - logRoller.close(); - } - if (dodgyWAL != null) { - dodgyWAL.close(); - } - } - } - - /** - * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} - * when done. - */ - private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, - Configuration conf, WAL wal) throws IOException { - ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, - MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false, - Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index ecd518631b0d..2e7c97ef4de4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -545,7 +545,7 @@ private AbstractFSWAL createHoldingWAL(String testName, AtomicBoolean startHo CountDownLatch holdAppend) throws IOException { AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); - wal.init(); + // newWAL has already called wal.init() wal.registerWALActionsListener(new WALActionsListener() { @Override public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 5b3cbfa3b1a3..07a97a1e0e97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Field; @@ -29,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -37,7 +34,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -53,10 +49,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -113,14 +107,9 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); log.init(); try { - Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); - ringBufferEventHandlerField.setAccessible(true); - FSHLog.RingBufferEventHandler ringBufferEventHandler = - (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); - Field syncRunnerIndexField = - FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex"); + Field syncRunnerIndexField = FSHLog.class.getDeclaredField("syncRunnerIndex"); syncRunnerIndexField.setAccessible(true); - syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1); + syncRunnerIndexField.set(log, Integer.MAX_VALUE - 1); TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); @@ -138,93 +127,6 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti } } - /** - * Test for WAL stall due to sync future overwrites. See HBASE-25984. - */ - @Test - public void testDeadlockWithSyncOverwrites() throws Exception { - final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1); - - class FailingWriter implements WALProvider.Writer { - @Override - public void sync(boolean forceSync) throws IOException { - throw new IOException("Injected failure.."); - } - - @Override - public void append(WAL.Entry entry) throws IOException { - } - - @Override - public long getLength() { - return 0; - } - - @Override - public long getSyncedLength() { - return 0; - } - - @Override - public void close() throws IOException { - } - } - - /* - * Custom FSHLog implementation with a conditional wait before attaining safe point. - */ - class CustomFSHLog extends FSHLog { - public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir, - Configuration conf, List listeners, boolean failIfWALExists, - String prefix, String suffix) throws IOException { - super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); - } - - @Override - protected void beforeWaitOnSafePoint() { - try { - assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - public SyncFuture publishSyncOnRingBuffer() { - long sequence = getSequenceOnRingBuffer(); - return publishSyncOnRingBuffer(sequence, false); - } - } - - final String name = this.name.getMethodName(); - try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name, - HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { - log.setWriter(new FailingWriter()); - Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); - ringBufferEventHandlerField.setAccessible(true); - FSHLog.RingBufferEventHandler ringBufferEventHandler = - (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); - // Force a safe point - FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint(); - try { - SyncFuture future0 = log.publishSyncOnRingBuffer(); - // Wait for the sync to be done. - Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone); - // Publish another sync from the same thread, this should not overwrite the done sync. - SyncFuture future1 = log.publishSyncOnRingBuffer(); - assertFalse(future1.isDone()); - // Unblock the safe point trigger.. - blockBeforeSafePoint.countDown(); - // Wait for the safe point to be reached. - // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync - // pipeline. - Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained); - } finally { - // Force release the safe point, for the clean up. - latch.releaseSafePoint(); - } - } - } - /** * Test case for https://issues.apache.org/jira/browse/HBASE-16721 */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 09c19dde65f8..90f595003cb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -52,21 +57,29 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ipc.RemoteException; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + /** * Tests for conditions that should trigger RegionServer aborts when rolling the current WAL fails. */ +@RunWith(Parameterized.class) @Category({ RegionServerTests.class, MediumTests.class }) public class TestLogRollAbort { @@ -103,14 +116,23 @@ public static void setUpBeforeClass() throws Exception { // the namenode might still try to choose the recently-dead datanode // for a pipeline, so try to a new pipeline multiple times TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10); - TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem"); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "asyncfs"); + } + + @Parameters(name = "{index}: walProvider={0}") + public static List params() { + return Arrays.asList(new Object[] { "filesystem" }, new Object[] { "asyncfs" }); } private Configuration conf; private FileSystem fs; + @Parameter + public String walProvider; + @Before public void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider); TEST_UTIL.startMiniCluster(2); cluster = TEST_UTIL.getHBaseCluster(); @@ -211,7 +233,7 @@ public void testLogRollAfterSplitStart() throws IOException { } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); - ((AbstractFSWAL) log).replaceWriter(((FSHLog) log).getOldPath(), null, null); + closeWriter((AbstractFSWAL) log); // code taken from MasterFileSystem.getLogDirs(), which is called from // MasterFileSystem.splitLog() handles RS shutdowns (as observed by the splitting process) @@ -226,16 +248,13 @@ public void testLogRollAfterSplitStart() throws IOException { WALSplitter.split(HBASELOGDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals); LOG.debug("Trying to roll the WAL."); - try { - log.rollWriter(); - Assert.fail("rollWriter() did not throw any exception."); - } catch (IOException ioe) { - if (ioe.getCause() instanceof FileNotFoundException) { - LOG.info("Got the expected exception: ", ioe.getCause()); - } else { - Assert.fail("Unexpected exception: " + ioe); - } + IOException error = assertThrows(IOException.class, () -> log.rollWriter()); + if (error instanceof RemoteException) { + error = ((RemoteException) error).unwrapRemoteException(); } + assertTrue("unexpected error: " + Throwables.getStackTraceAsString(error), + error instanceof FileNotFoundException + || error.getCause() instanceof FileNotFoundException); } finally { wals.close(); if (fs.exists(thisTestsDir)) { @@ -243,4 +262,14 @@ public void testLogRollAfterSplitStart() throws IOException { } } } + + private void closeWriter(AbstractFSWAL wal) { + wal.waitForSafePoint(); + long oldFileLen = wal.writer.getLength(); + wal.closeWriter(wal.writer, wal.getOldPath()); + wal.logRollAndSetupWalProps(wal.getOldPath(), null, oldFileLen); + wal.writer = null; + wal.onWriterReplaced(null); + wal.rollRequested.set(false); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index c098140fbe93..f07a02cb25d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -319,108 +319,131 @@ void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, */ @Test public void testLogRollOnDatanodeDeath() throws Exception { - TEST_UTIL.ensureSomeRegionServersAvailable(2); - assertTrue("This test requires WAL file replication set to 2.", - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); - LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); - this.server = cluster.getRegionServer(0); + Long oldValue = TEST_UTIL.getConfiguration() + .getLong("hbase.regionserver.hlog.check.lowreplication.interval", -1); - // Create the test table and open it - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + try { + /** + * When we reuse the code of AsyncFSWAL to FSHLog, the low replication is only checked by + * {@link LogRoller#checkLowReplication},so in order to make this test spend less time,we + * should minimize following config which is maximized by + * {@link AbstractTestLogRolling#setUpBeforeClass} + */ + TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", + 1000); + this.tearDown(); + this.setUp(); + + TEST_UTIL.ensureSomeRegionServersAvailable(2); + assertTrue("This test requires WAL file replication set to 2.", + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); + LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); - admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + this.server = cluster.getRegionServer(0); + + // Create the test table and open it + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final FSHLog log = (FSHLog) server.getWAL(region); - final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); - - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case LOW_REPLICATION: - lowReplicationHookCalled.lazySet(true); - break; - default: - break; + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final FSHLog log = (FSHLog) server.getWAL(region); + final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); + + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case LOW_REPLICATION: + lowReplicationHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + + // add up the datanode count, to ensure proper replication when we kill 1 + // This function is synchronous; when it returns, the dfs cluster is active + // We start 3 servers and then stop 2 to avoid a directory naming conflict + // when we stop/start a namenode later, as mentioned in HBASE-5163 + List existingNodes = dfsCluster.getDataNodes(); + int numDataNodes = 3; + TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", + 1000); + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); + List allNodes = dfsCluster.getDataNodes(); + for (int i = allNodes.size() - 1; i >= 0; i--) { + if (existingNodes.contains(allNodes.get(i))) { + dfsCluster.stopDataNode(i); } } - }); - - // add up the datanode count, to ensure proper replication when we kill 1 - // This function is synchronous; when it returns, the dfs cluster is active - // We start 3 servers and then stop 2 to avoid a directory naming conflict - // when we stop/start a namenode later, as mentioned in HBASE-5163 - List existingNodes = dfsCluster.getDataNodes(); - int numDataNodes = 3; - dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); - List allNodes = dfsCluster.getDataNodes(); - for (int i = allNodes.size() - 1; i >= 0; i--) { - if (existingNodes.contains(allNodes.get(i))) { - dfsCluster.stopDataNode(i); - } - } - assertTrue( - "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " - + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), - dfsCluster.getDataNodes().size() - >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); - - writeData(table, 2); - - long curTime = EnvironmentEdgeManager.currentTime(); - LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); - long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); - assertTrue("Log should have a timestamp older than now", - curTime > oldFilenum && oldFilenum != -1); - - assertTrue("The log shouldn't have rolled yet", - oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); - final DatanodeInfo[] pipeline = log.getPipeline(); - assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); - - // kill a datanode in the pipeline to force a log roll on the next sync() - // This function is synchronous, when it returns the node is killed. - assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); - - // this write should succeed, but trigger a log roll - writeData(table, 2); - long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); - - assertTrue("Missing datanode should've triggered a log roll", - newFilenum > oldFilenum && newFilenum > curTime); - - assertTrue("The log rolling hook should have been called with the low replication flag", - lowReplicationHookCalled.get()); - - // write some more log data (this should use a new hdfs_out) - writeData(table, 3); - assertTrue("The log should not roll again.", - AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); - // kill another datanode in the pipeline, so the replicas will be lower than - // the configured value 2. - assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); - - batchWriteAndWait(table, log, 3, false, 14000); - int replication = log.getLogReplication(); - assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, - !log.isLowReplicationRollEnabled()); - - dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); - - // Force roll writer. The new log file will have the default replications, - // and the LowReplication Roller will be enabled. - log.rollWriter(true); - batchWriteAndWait(table, log, 13, true, 10000); - replication = log.getLogReplication(); - assertTrue("New log file should have the default replication instead of " + replication, - replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); - assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); + assertTrue( + "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " + + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), + dfsCluster.getDataNodes().size() + >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); + + writeData(table, 2); + + long curTime = EnvironmentEdgeManager.currentTime(); + LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); + long oldFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); + assertTrue("Log should have a timestamp older than now", + curTime > oldFilenum && oldFilenum != -1); + + assertTrue("The log shouldn't have rolled yet", + oldFilenum == AbstractFSWALProvider.extractFileNumFromWAL(log)); + final DatanodeInfo[] pipeline = log.getPipeline(); + assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); + + // kill a datanode in the pipeline to force a log roll on the next sync() + // This function is synchronous, when it returns the node is killed. + assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null); + + // this write should succeed, but trigger a log roll + writeData(table, 2); + + TEST_UTIL.waitFor(10000, 100, () -> { + long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); + return newFilenum > oldFilenum && newFilenum > curTime && lowReplicationHookCalled.get(); + }); + + long newFilenum = AbstractFSWALProvider.extractFileNumFromWAL(log); + + // write some more log data (this should use a new hdfs_out) + writeData(table, 3); + assertTrue("The log should not roll again.", + AbstractFSWALProvider.extractFileNumFromWAL(log) == newFilenum); + // kill another datanode in the pipeline, so the replicas will be lower than + // the configured value 2. + assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); + + batchWriteAndWait(table, log, 3, false, 14000); + int replication = log.getLogReplication(); + assertTrue( + "LowReplication Roller should've been disabled, current replication=" + replication, + !log.isLowReplicationRollEnabled()); + + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); + + // Force roll writer. The new log file will have the default replications, + // and the LowReplication Roller will be enabled. + log.rollWriter(true); + batchWriteAndWait(table, log, 13, true, 10000); + replication = log.getLogReplication(); + assertTrue("New log file should have the default replication instead of " + replication, + replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); + assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); + } finally { + TEST_UTIL.getConfiguration().setLong("hbase.regionserver.hlog.check.lowreplication.interval", + oldValue); + } } /** diff --git a/pom.xml b/pom.xml index 7958e5c221f2..0382e2ecf4fa 100644 --- a/pom.xml +++ b/pom.xml @@ -819,7 +819,7 @@ 2.11.0 3.9 3.6.1 - 3.4.2 + 3.4.4 4.5.13 4.4.13 3.2.6