Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ALLUXIO-2408] Fix potential journal checkpoint failure #4306

Merged
merged 10 commits into from
Nov 8, 2016
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal;

import alluxio.Constants;
import alluxio.underfs.UnderFileSystem;
import alluxio.util.UnderFileSystemUtils;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* A class which manages the checkpoint for a journal. The {@link #updateCheckpoint(String)} method
* will update the journal's checkpoint file to a specified checkpoint, and
* {@link recoverCheckpoint()} will recover from any failures that may occur during
* {@link #updateCheckpoint(String)}.
*
* The checkpoint updating process goes
* <pre>
* 1. Write a new checkpoint named checkpoint.data.tmp
* 2. Rename checkpoint.data to checkpoint.data.backup.tmp
* 3. Rename checkpoint.data.backup.tmp to checkpoint.data.backup
* 4. Rename checkpoint.data.tmp to checkpoint.data
* 5. Delete completed logs
* 6. Delete checkpoint.data.backup
* </pre>
*/
public final class CheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

/** The UFS where the journal is being written to. */
private final UnderFileSystem mUfs;
/**
* Absolute path to the checkpoint file. This is where the latest checkpoint is stored. During
* normal operation (when not writing a new checkpoint file), this is the only checkpoint file
* that exists. If this file exists and there is no {@link #mTempBackupCheckpointPath}, it plus
* all completed logs, plus the active log, should represent the full state of the master.
*/
private final String mCheckpointPath;
/**
* Absolute path to the backup checkpoint file. The latest checkpoint is saved here while renaming
* the temporary checkpoint so that we can recover in case the rename fails. If this file and
* {@link #mCheckpointPath} both exist, {@link #mCheckpointPath} is the most up to date checkpoint
* and {@link #mBackupCheckpointPath} should be deleted.
*/
private final String mBackupCheckpointPath;
/**
* Absolute path to the temporary backup checkpoint file. This path is used as an intermediate
* rename step when backing up {@link #mCheckpointPath} to {@link #mBackupCheckpointPath}. As long
* as this file exists, it supercedes mCheckpointPath as the most up to date checkpoint file.
*/
private final String mTempBackupCheckpointPath;
/**
* A journal writer through which this checkpoint manager can delete completed logs when the
* checkpoint is updated.
*/
private final JournalWriter mWriter;

/**
* Creates a new instance of {@link CheckpointManager}.
*
* @param ufs the under file system holding the journal
* @param the directory for the journal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: the javadoc params seem out of date with the actual parameters of the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*/
public CheckpointManager(UnderFileSystem ufs, String checkpointPath, JournalWriter writer) {
mUfs = ufs;
mCheckpointPath = checkpointPath;
mBackupCheckpointPath = mCheckpointPath + ".backup";
mTempBackupCheckpointPath = mBackupCheckpointPath + ".tmp";
mWriter = writer;
}

/**
* Recovers the checkpoint file in case the master crashed while updating it previously.
*/
public void recoverCheckpoint() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document the expected state after this method is called? ie.
CheckpointPath is always valid, CheckpointPath + editlogs -> up to date master state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
Preconditions.checkState(
!(mUfs.exists(mCheckpointPath) && mUfs.exists(mTempBackupCheckpointPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be duplicate exists calls for these files? Maybe add a TODO to remove the duplicate calls?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we cannot guarantee atomicity between exists & next operation, I think we should just cache the values for the duration of the recovery, unless we expect an update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

&& mUfs.exists(mBackupCheckpointPath)),
"checkpoint, temp backup checkpoint, and backup checkpoint should never exist "
+ "simultaneously");
if (mUfs.exists(mTempBackupCheckpointPath)) {
// If mCheckpointPath exists, step 2 must have implemented rename as copy + delete, and
// failed during the delete.
UnderFileSystemUtils.deleteIfExists(mUfs, mCheckpointPath);
mUfs.rename(mTempBackupCheckpointPath, mCheckpointPath);
}
if (mUfs.exists(mBackupCheckpointPath)) {
// We must have crashed after step 3
if (mUfs.exists(mCheckpointPath)) {
// We crashed after step 4, so we can finish steps 5 and 6.
mWriter.deleteCompletedLogs();
mUfs.delete(mBackupCheckpointPath, false);
} else {
// We crashed before step 4, so we roll back to backup checkpoint.
mUfs.rename(mBackupCheckpointPath, mCheckpointPath);
}
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
}

/**
* Updates the checkpoint file to the checkpoint at the specified path. The update is done in such
* a way that {@link #recoverCheckpoint()} will recover from any failures that occur during the
* update.
*
* @param newCheckpointPath the path to the new checkpoint file
*/
public void updateCheckpoint(String newCheckpointPath) {
try {
if (mUfs.exists(mCheckpointPath)) {
UnderFileSystemUtils.deleteIfExists(mUfs, mTempBackupCheckpointPath);
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);
// Rename in two steps so that we never have identical mCheckpointPath and
// mBackupCheckpointPath. This is a concern since UFS may implement rename as copy + delete.
mUfs.rename(mCheckpointPath, mTempBackupCheckpointPath);
mUfs.rename(mTempBackupCheckpointPath, mBackupCheckpointPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: maybe add a log message after this that the backup of the checkpoint was completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
mUfs.rename(newCheckpointPath, mCheckpointPath);
LOG.info("Renamed checkpoint file {} to {}", newCheckpointPath, mCheckpointPath);

// The checkpoint already reflects the information in the completed logs.
mWriter.deleteCompletedLogs();
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
86 changes: 14 additions & 72 deletions core/server/src/main/java/alluxio/master/journal/JournalWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import alluxio.util.UnderFileSystemUtils;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,27 +51,11 @@ public final class JournalWriter {
private final String mJournalDirectory;
/** Absolute path to the directory storing all completed logs. */
private final String mCompletedDirectory;
/**
* Absolute path to the checkpoint file. This is where the latest checkpoint is stored. During
* normal operation (when not writing a new checkpoint file), this is the only checkpoint file
* that exists.
*/
private final String mCheckpointPath;
/**
* Absolute path to the temporary checkpoint file. This is where a new checkpoint file is fully
* written before being renamed to {@link #mCheckpointPath}
*/
private final String mTempCheckpointPath;
/**
* Absolute path to the backup checkpoint file. The latest checkpoint is saved here while renaming
* the temporary checkpoint so that we can recover in case the rename fails.
*/
private final String mBackupCheckpointPath;
/**
* Absolute path to the temporary backup checkpoint file. This path is used as an intermediate
* rename step when backing up mCheckpointPath to mBackupCheckpointPath.
*/
private final String mTempBackupCheckpointPath;
/** The UFS where the journal is being written to. */
private final UnderFileSystem mUfs;
private final long mMaxLogSize;
Expand All @@ -88,6 +71,9 @@ public final class JournalWriter {
/** The sequence number for the next entry in the log. */
private long mNextEntrySequenceNumber = 1;

/** Checkpoint manager for updating and recovering the checkpoint file. */
private CheckpointManager mCheckpointManager;

/**
* Creates a new instance of {@link JournalWriter}.
*
Expand All @@ -97,49 +83,10 @@ public final class JournalWriter {
mJournal = Preconditions.checkNotNull(journal);
mJournalDirectory = mJournal.getDirectory();
mCompletedDirectory = mJournal.getCompletedDirectory();
mCheckpointPath = mJournal.getCheckpointFilePath();
mTempCheckpointPath = mJournal.getCheckpointFilePath() + ".tmp";
mTempBackupCheckpointPath = mJournal.getCheckpointFilePath() + ".backup.tmp";
mBackupCheckpointPath = mJournal.getCheckpointFilePath() + ".backup";
mUfs = UnderFileSystem.get(mJournalDirectory);
mMaxLogSize = Configuration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
}

/**
* Recovers the checkpoint file in case the master crashed while updating it previously.
*
* The checkpointing process goes
* <pre>
* 1. Write mTempCheckpointPath based on all completed logs
* 2. Rename mCheckpointPath to mTempBackupCheckpointPath
* 3. Rename mTempBackupCheckpointPath to mBackupCheckpointPath
* 4. Rename mTempCheckpointPath to mCheckpointPath
* 5. Delete completed logs
* 6. Delete mBackupCheckpointPath
* </pre>
*/
public synchronized void recoverCheckpoint() {
try {
if (mUfs.exists(mTempBackupCheckpointPath)) {
// If mCheckpointPath exists, step 2 must have implemented rename as copy + delete, and
// failed during the delete.
UnderFileSystemUtils.deleteIfExists(mUfs, mCheckpointPath);
mUfs.rename(mTempBackupCheckpointPath, mCheckpointPath);
}
if (mUfs.exists(mBackupCheckpointPath)) {
// We must have crashed after step 3
if (mUfs.exists(mCheckpointPath)) {
// We crashed after step 4, so we can finish steps 5 and 6.
deleteCompletedLogs();
mUfs.delete(mBackupCheckpointPath, false);
} else {
// We crashed before step 4, so we roll back to backup checkpoint.
mUfs.rename(mBackupCheckpointPath, mCheckpointPath);
}
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
mCheckpointManager = new CheckpointManager(mUfs, mJournal.getCheckpointFilePath(), this);
}

/**
Expand Down Expand Up @@ -173,6 +120,7 @@ public synchronized void completeAllLogs() throws IOException {
public synchronized JournalOutputStream getCheckpointOutputStream(long latestSequenceNumber)
throws IOException {
if (mCheckpointOutputStream == null) {
mCheckpointManager.recoverCheckpoint();
LOG.info("Creating tmp checkpoint file: {}", mTempCheckpointPath);
if (!mUfs.exists(mJournalDirectory)) {
LOG.info("Creating journal folder: {}", mJournalDirectory);
Expand Down Expand Up @@ -234,12 +182,19 @@ private synchronized OutputStream openCurrentLog() throws IOException {
return os;
}

/**
* Recovers the checkpoint file in case the master crashed while updating it previously.
*/
public void recoverCheckpoint() {
mCheckpointManager.recoverCheckpoint();
}

/**
* Deletes all of the logs in the completed folder.
*
* @throws IOException if an I/O error occurs
*/
private synchronized void deleteCompletedLogs() throws IOException {
public synchronized void deleteCompletedLogs() throws IOException {
LOG.info("Deleting all completed log files...");
// Loop over all complete logs starting from the end.
long logNumber = Journal.FIRST_COMPLETED_LOG_NUMBER;
Expand Down Expand Up @@ -335,21 +290,8 @@ public synchronized void close() throws IOException {
mOutputStream.close();

LOG.info("Successfully created tmp checkpoint file: {}", mTempCheckpointPath);
if (mUfs.exists(mCheckpointPath)) {
UnderFileSystemUtils.deleteIfExists(mUfs, mTempBackupCheckpointPath);
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);
// Rename in two steps so that we never have identical mCheckpointPath and
// mBackupCheckpointPath. This is a concern since UFS may implement rename as copy + delete.
mUfs.rename(mCheckpointPath, mTempBackupCheckpointPath);
mUfs.rename(mTempBackupCheckpointPath, mBackupCheckpointPath);
}
mUfs.rename(mTempCheckpointPath, mCheckpointPath);
LOG.info("Renamed checkpoint file {} to {}", mTempCheckpointPath, mCheckpointPath);

// The checkpoint already reflects the information in the completed logs.
deleteCompletedLogs();

UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);
mCheckpointManager.updateCheckpoint(mTempCheckpointPath);

// Consider the current log to be complete.
completeCurrentLog();
Expand Down