-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ALLUXIO-2408] Fix potential journal checkpoint failure #4306
Changes from all commits
ad66225
e33873f
b50235d
7b5c785
2ed0c2e
deb1ee1
c0a9908
d731c30
04b690f
bf2bf4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,27 +102,33 @@ public void start(boolean isLeader) throws IOException { | |
/** | ||
* The sequence for dealing with the journal before starting as the leader: | ||
* | ||
* Phase 1. Mark all the logs as completed. Since this master is the leader, it is allowed to | ||
* Phase 1. Recover from a backup checkpoint if the last startup failed while writing the | ||
* checkpoint. | ||
* | ||
* Phase 2. Mark all the logs as completed. Since this master is the leader, it is allowed to | ||
* write the journal, so it can mark the current log as completed. After this step, the | ||
* current log file will not exist, and all logs will be complete. | ||
* | ||
* Phase 2. Reconstruct the state from the journal. This uses the JournalTailer to process all | ||
* Phase 3. Reconstruct the state from the journal. This uses the JournalTailer to process all | ||
* of the checkpoint and the complete log files. Since all logs are complete, after this step, | ||
* the master will reflect the state of all of the journal entries. | ||
* | ||
* Phase 3. Write out the checkpoint file. Since this master is completely up-to-date, it | ||
* Phase 4. Write out the checkpoint file. Since this master is completely up-to-date, it | ||
* writes out the checkpoint file. When the checkpoint file is closed, it will then delete the | ||
* complete log files. | ||
* | ||
* Since this method is called before the master RPC server starts serving, there is no | ||
* concurrent access to the master during these phases. | ||
*/ | ||
|
||
// Phase 1: Mark all logs as complete, including the current log. After this call, the current | ||
// Phase 1: Recover from a backup checkpoint if necessary. | ||
mJournalWriter.recoverCheckpoint(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be commented on in the previous comments about the steps/phases? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
// Phase 2: Mark all logs as complete, including the current log. After this call, the current | ||
// log should not exist, and all the log files will be complete. | ||
mJournalWriter.completeAllLogs(); | ||
|
||
// Phase 2: Replay all the state of the checkpoint and the completed log files. | ||
// Phase 3: Replay all the state of the checkpoint and the completed log files. | ||
JournalTailer catchupTailer; | ||
if (mStandbyJournalTailer != null && mStandbyJournalTailer.getLatestJournalTailer() != null | ||
&& mStandbyJournalTailer.getLatestJournalTailer().isValid()) { | ||
|
@@ -146,7 +152,7 @@ public void start(boolean isLeader) throws IOException { | |
} | ||
long latestSequenceNumber = catchupTailer.getLatestSequenceNumber(); | ||
|
||
// Phase 3: initialize the journal and write out the checkpoint file (the state of all | ||
// Phase 4: initialize the journal and write out the checkpoint file (the state of all | ||
// completed logs). | ||
JournalOutputStream checkpointStream = | ||
mJournalWriter.getCheckpointOutputStream(latestSequenceNumber); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
/* | ||
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 | ||
* (the "License"). You may not use this work except in compliance with the License, which is | ||
* available at www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, | ||
* either express or implied, as more fully set forth in the License. | ||
* | ||
* See the NOTICE file distributed with this work for information regarding copyright ownership. | ||
*/ | ||
|
||
package alluxio.master.journal; | ||
|
||
import alluxio.Constants; | ||
import alluxio.underfs.UnderFileSystem; | ||
import alluxio.util.UnderFileSystemUtils; | ||
|
||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Throwables; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* A class which manages the checkpoint for a journal. The {@link #updateCheckpoint(String)} method | ||
* will update the journal's checkpoint file to a specified checkpoint, and | ||
* {@link recoverCheckpoint()} will recover from any failures that may occur during | ||
* {@link #updateCheckpoint(String)}. | ||
* | ||
* The checkpoint updating process goes | ||
* <pre> | ||
* 1. Write a new checkpoint named checkpoint.data.tmp | ||
* 2. Rename checkpoint.data to checkpoint.data.backup.tmp | ||
* 3. Rename checkpoint.data.backup.tmp to checkpoint.data.backup | ||
* 4. Rename checkpoint.data.tmp to checkpoint.data | ||
* 5. Delete completed logs | ||
* 6. Delete checkpoint.data.backup | ||
* </pre> | ||
*/ | ||
public final class CheckpointManager { | ||
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); | ||
|
||
/** The UFS where the journal is being written to. */ | ||
private final UnderFileSystem mUfs; | ||
/** | ||
* Absolute path to the checkpoint file. This is where the latest checkpoint is stored. During | ||
* normal operation (when not writing a new checkpoint file), this is the only checkpoint file | ||
* that exists. If this file exists and there is no {@link #mTempBackupCheckpointPath}, it plus | ||
* all completed logs, plus the active log, should represent the full state of the master. | ||
*/ | ||
private final String mCheckpointPath; | ||
/** | ||
* Absolute path to the backup checkpoint file. The latest checkpoint is saved here while renaming | ||
* the temporary checkpoint so that we can recover in case the rename fails. If this file and | ||
* {@link #mCheckpointPath} both exist, {@link #mCheckpointPath} is the most up to date checkpoint | ||
* and {@link #mBackupCheckpointPath} should be deleted. | ||
*/ | ||
private final String mBackupCheckpointPath; | ||
/** | ||
* Absolute path to the temporary backup checkpoint file. This path is used as an intermediate | ||
* rename step when backing up {@link #mCheckpointPath} to {@link #mBackupCheckpointPath}. As long | ||
* as this file exists, it supercedes mCheckpointPath as the most up to date checkpoint file. | ||
*/ | ||
private final String mTempBackupCheckpointPath; | ||
/** | ||
* A journal writer through which this checkpoint manager can delete completed logs when the | ||
* checkpoint is updated. | ||
*/ | ||
private final JournalWriter mWriter; | ||
|
||
/** | ||
* Creates a new instance of {@link CheckpointManager}. | ||
* | ||
* @param ufs the under file system holding the journal | ||
* @param checkpointPath the path to the checkpoint file | ||
* @param writer a journal writer which can be used to delete completed logs | ||
*/ | ||
public CheckpointManager(UnderFileSystem ufs, String checkpointPath, JournalWriter writer) { | ||
mUfs = ufs; | ||
mCheckpointPath = checkpointPath; | ||
mBackupCheckpointPath = mCheckpointPath + ".backup"; | ||
mTempBackupCheckpointPath = mBackupCheckpointPath + ".tmp"; | ||
mWriter = writer; | ||
} | ||
|
||
/** | ||
* Recovers the checkpoint file in case the master crashed while updating it previously. | ||
* | ||
* After this method has completed, the checkpoint at {@link #mCheckpointPath} plus any completed | ||
* logs will fully represent the master's state, and there will be no files at | ||
* {@link mBackupCheckpointPath} or {@link #mTempBackupCheckpointPath}. | ||
*/ | ||
public void recoverCheckpoint() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you document the expected state after this method is called? ie. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
try { | ||
boolean checkpointExists = mUfs.exists(mCheckpointPath); | ||
boolean backupCheckpointExists = mUfs.exists(mBackupCheckpointPath); | ||
boolean tempBackupCheckpointExists = mUfs.exists(mTempBackupCheckpointPath); | ||
Preconditions.checkState( | ||
!(checkpointExists && backupCheckpointExists && tempBackupCheckpointExists), | ||
"checkpoint, temp backup checkpoint, and backup checkpoint should never all exist "); | ||
if (tempBackupCheckpointExists) { | ||
// If mCheckpointPath also exists, step 2 must have implemented rename as copy + delete, and | ||
// failed during the delete. | ||
UnderFileSystemUtils.deleteIfExists(mUfs, mCheckpointPath); | ||
mUfs.rename(mTempBackupCheckpointPath, mCheckpointPath); | ||
} | ||
if (backupCheckpointExists) { | ||
// We must have crashed after step 3 | ||
if (checkpointExists) { | ||
// We crashed after step 4, so we can finish steps 5 and 6. | ||
mWriter.deleteCompletedLogs(); | ||
mUfs.delete(mBackupCheckpointPath, false); | ||
} else { | ||
// We crashed before step 4, so we roll back to the backup checkpoint. | ||
mUfs.rename(mBackupCheckpointPath, mCheckpointPath); | ||
} | ||
} | ||
} catch (IOException e) { | ||
throw Throwables.propagate(e); | ||
} | ||
} | ||
|
||
/** | ||
* Updates the checkpoint file to the checkpoint at the specified path. The update is done in such | ||
* a way that {@link #recoverCheckpoint()} will recover from any failures that occur during the | ||
* update. | ||
* | ||
* @param newCheckpointPath the path to the new checkpoint file | ||
*/ | ||
public void updateCheckpoint(String newCheckpointPath) { | ||
try { | ||
if (mUfs.exists(mCheckpointPath)) { | ||
UnderFileSystemUtils.deleteIfExists(mUfs, mTempBackupCheckpointPath); | ||
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath); | ||
// Rename in two steps so that we never have identical mCheckpointPath and | ||
// mBackupCheckpointPath. This is a concern since UFS may implement rename as copy + delete. | ||
mUfs.rename(mCheckpointPath, mTempBackupCheckpointPath); | ||
mUfs.rename(mTempBackupCheckpointPath, mBackupCheckpointPath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: maybe add a log message after this that the backup of the checkpoint was completed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
LOG.info("Backed up the checkpoint file to {}", mBackupCheckpointPath); | ||
} | ||
mUfs.rename(newCheckpointPath, mCheckpointPath); | ||
LOG.info("Renamed the checkpoint file from {} to {}", newCheckpointPath, mCheckpointPath); | ||
|
||
// The checkpoint already reflects the information in the completed logs. | ||
mWriter.deleteCompletedLogs(); | ||
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath); | ||
} catch (IOException e) { | ||
throw Throwables.propagate(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import alluxio.exception.ExceptionMessage; | ||
import alluxio.proto.journal.Journal.JournalEntry; | ||
import alluxio.underfs.UnderFileSystem; | ||
import alluxio.util.UnderFileSystemUtils; | ||
|
||
import com.google.common.base.Preconditions; | ||
import org.apache.hadoop.fs.FSDataOutputStream; | ||
|
@@ -50,7 +51,10 @@ public final class JournalWriter { | |
private final String mJournalDirectory; | ||
/** Absolute path to the directory storing all completed logs. */ | ||
private final String mCompletedDirectory; | ||
/** Absolute path to the temporary checkpoint file. */ | ||
/** | ||
* Absolute path to the temporary checkpoint file. This is where a new checkpoint file is fully | ||
* written before being renamed to {@link #mCheckpointPath} | ||
*/ | ||
private final String mTempCheckpointPath; | ||
/** The UFS where the journal is being written to. */ | ||
private final UnderFileSystem mUfs; | ||
|
@@ -67,6 +71,9 @@ public final class JournalWriter { | |
/** The sequence number for the next entry in the log. */ | ||
private long mNextEntrySequenceNumber = 1; | ||
|
||
/** Checkpoint manager for updating and recovering the checkpoint file. */ | ||
private CheckpointManager mCheckpointManager; | ||
|
||
/** | ||
* Creates a new instance of {@link JournalWriter}. | ||
* | ||
|
@@ -79,6 +86,7 @@ public final class JournalWriter { | |
mTempCheckpointPath = mJournal.getCheckpointFilePath() + ".tmp"; | ||
mUfs = UnderFileSystem.get(mJournalDirectory); | ||
mMaxLogSize = Configuration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX); | ||
mCheckpointManager = new CheckpointManager(mUfs, mJournal.getCheckpointFilePath(), this); | ||
} | ||
|
||
/** | ||
|
@@ -112,6 +120,7 @@ public synchronized void completeAllLogs() throws IOException { | |
public synchronized JournalOutputStream getCheckpointOutputStream(long latestSequenceNumber) | ||
throws IOException { | ||
if (mCheckpointOutputStream == null) { | ||
mCheckpointManager.recoverCheckpoint(); | ||
LOG.info("Creating tmp checkpoint file: {}", mTempCheckpointPath); | ||
if (!mUfs.exists(mJournalDirectory)) { | ||
LOG.info("Creating journal folder: {}", mJournalDirectory); | ||
|
@@ -120,6 +129,7 @@ public synchronized JournalOutputStream getCheckpointOutputStream(long latestSeq | |
mNextEntrySequenceNumber = latestSequenceNumber + 1; | ||
LOG.info("Latest journal sequence number: {} Next journal sequence number: {}", | ||
latestSequenceNumber, mNextEntrySequenceNumber); | ||
UnderFileSystemUtils.deleteIfExists(mUfs, mTempCheckpointPath); | ||
mCheckpointOutputStream = | ||
new CheckpointOutputStream(new DataOutputStream(mUfs.create(mTempCheckpointPath))); | ||
} | ||
|
@@ -172,23 +182,29 @@ private synchronized OutputStream openCurrentLog() throws IOException { | |
return os; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think this is still relevant? I think I put in this comment since we do not delete completed logs "atomically". However, with your change, it seems like the deletion of completed logs can look atomic now? if something bad happens during the deletion, it will continue the deletion on the next startup, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method finds the logs by seeing how high it can count before the log of that number doesn't exist. If we have 10 logs and only delete log #1, the next time we try to delete the logs we won't delete logs 2-10 because we see that log 1 doesn't exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahhh, I see. I guess the TODO was correct. Thanks for fixing it. |
||
|
||
/** | ||
* Recovers the checkpoint file in case the master crashed while updating it previously. | ||
*/ | ||
public void recoverCheckpoint() { | ||
mCheckpointManager.recoverCheckpoint(); | ||
} | ||
|
||
/** | ||
* Deletes all of the logs in the completed folder. | ||
* | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
private synchronized void deleteCompletedLogs() throws IOException { | ||
public synchronized void deleteCompletedLogs() throws IOException { | ||
LOG.info("Deleting all completed log files..."); | ||
// Loop over all complete logs starting from the beginning. | ||
// TODO(gpang): should the deletes start from the end? | ||
// Loop over all complete logs starting from the end. | ||
long logNumber = Journal.FIRST_COMPLETED_LOG_NUMBER; | ||
String logFilename = mJournal.getCompletedLogFilePath(logNumber); | ||
while (mUfs.exists(logFilename)) { | ||
while (mUfs.exists(mJournal.getCompletedLogFilePath(logNumber))) { | ||
logNumber++; | ||
} | ||
for (long i = logNumber - 1; i >= 0; i--) { | ||
String logFilename = mJournal.getCompletedLogFilePath(i); | ||
LOG.info("Deleting completed log: {}", logFilename); | ||
mUfs.delete(logFilename, true); | ||
logNumber++; | ||
// generate the next completed log filename in the sequence. | ||
logFilename = mJournal.getCompletedLogFilePath(logNumber); | ||
} | ||
LOG.info("Finished deleting all completed log files."); | ||
|
||
|
@@ -274,15 +290,8 @@ public synchronized void close() throws IOException { | |
mOutputStream.close(); | ||
|
||
LOG.info("Successfully created tmp checkpoint file: {}", mTempCheckpointPath); | ||
mUfs.delete(mJournal.getCheckpointFilePath(), false); | ||
// TODO(gpang): the real checkpoint should not be overwritten here, but after all operations. | ||
mUfs.rename(mTempCheckpointPath, mJournal.getCheckpointFilePath()); | ||
mUfs.delete(mTempCheckpointPath, false); | ||
LOG.info("Renamed checkpoint file {} to {}", mTempCheckpointPath, | ||
mJournal.getCheckpointFilePath()); | ||
|
||
// The checkpoint already reflects the information in the completed logs. | ||
deleteCompletedLogs(); | ||
|
||
mCheckpointManager.updateCheckpoint(mTempCheckpointPath); | ||
|
||
// Consider the current log to be complete. | ||
completeCurrentLog(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which exceptions will be propagated? Technically, even if you check for
exists
before youdelete
, there is no guarantee it will still exist at the time of thedelete
. Therefore, there is currently no way to do this atomically.So, when should this method not throw an error, and when should it? What is the purpose/goal for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc for UnderFileSystem just says
@throws IOException if a non-Alluxio error occurs
for bothexists
anddelete
. This method is never expected to throw an exception.I don't think the lack of atomicity is a problem. The contract is that if the file path exists, it gets deleted. If it gets deleted between the exists and the delete, the delete will return false and the result is the same (no more file)