Skip to content

Commit

Permalink
HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntrySt…
Browse files Browse the repository at this point in the history
…ream (#5105)

Signed-off-by: Liangjun He <[email protected]>
  • Loading branch information
Apache9 authored Mar 15, 2023
1 parent c8bee23 commit 1f2e1f5
Showing 1 changed file with 120 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,73 +206,127 @@ private void setCurrentPath(Path path) {

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private HasNext tryAdvanceEntry() {
if (reader == null) {
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath != null) {
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten =
walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
private HasNext prepareReader() {
if (reader != null) {
if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
return HasNext.RETRY;
}
} else {
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
return HasNext.YES;
}
} else if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
}
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath == null) {
LOG.debug("No more WAL files in queue");
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
}
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
currentPositionOfEntry, beingWritten);
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
return HasNext.YES;
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
currentPositionOfEntry, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
return HasNext.RETRY;
}
}

private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !pair.getSecond();
if (!state.eof()) {
// we still have something to read after reopen, so return YES. Or there are something wrong
// and we need to retry
return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
}
// No data available after reopen
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}

private HasNext tryAdvanceEntry() {
HasNext prepared = prepareReader();
if (prepared != HasNext.YES) {
return prepared;
}

Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
Expand All @@ -292,46 +346,16 @@ private HasNext tryAdvanceEntry() {
return HasNext.RETRY_IMMEDIATELY;
case EOF_AND_RESET:
case EOF_AND_RESET_COMPRESSION:
if (!beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !p.getSecond();
if (state.eof()) {
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
} else {
if (beingWritten) {
// just sleep a bit and retry to see if there are new entries coming since the file is
// still being written
return HasNext.RETRY;
}
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
return lastAttempt();
case ERROR_AND_RESET:
case ERROR_AND_RESET_COMPRESSION:
// we have meet an error, just sleep a bit and retry again
Expand Down Expand Up @@ -393,10 +417,8 @@ private boolean checkAllBytesParsed() {
return false;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
}
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
metrics.incrCompletedWAL();
return true;
}
Expand Down

0 comments on commit 1f2e1f5

Please sign in to comment.