diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c6268674c5b8..e2109a79eb2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -334,11 +334,22 @@ private HasNext tryAdvanceEntry() { boolean beingWritten = pair.getSecond(); LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state, beingWritten); + // The below implementation needs to make sure that when beingWritten == true, we should not + // dequeue the current WAL file in logQueue. switch (state) { case NORMAL: // everything is fine, just return return HasNext.YES; case EOF_WITH_TRAILER: + // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only + // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so + // typically if beingWritten == true, we should not reach here, as we need to reopen the + // reader after writing the trailer. The only possible way to reach here while beingWritten + // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is + // still in this map, we will consider it as beingWritten, but actually, here we could make + // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing + // the current log file is safe. + assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1; // we have reached the trailer, which means this WAL file has been closed cleanly and we // have finished reading it successfully, just move to the next WAL file and let the upper // layer start reading the next WAL file @@ -436,6 +447,16 @@ private void dequeueCurrentLog() { * Returns whether the file is opened for writing. */ private Pair readNextEntryAndRecordReaderPosition() { + // we must call this before actually reading from the reader, as this method will acquire the + // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll, + // and before this happens, we could have already finished closing the previous WAL file. If we + // do not acquire the rollWriteLock and return whether the current file is being written to, we + // may finish reading the previous WAL file and start to read the next one, before it is + // enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think the + // queue is already ended and quit. See HBASE-28114 and related issues for more details. + // in the future, if we want to optimize the logic here, for example, do not call this method + // every time, or do not acquire rollWriteLock in the implementation of this method, we need to + // carefully review the optimized implementation OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1)); long readerPos = readResult.getEntryEndPos();