Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ALLUXIO-2408] Fix potential journal checkpoint failure #4306

Merged
merged 10 commits into from
Nov 8, 2016
12 changes: 12 additions & 0 deletions core/common/src/main/java/alluxio/underfs/UnderFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ UnderFileSystem get(String path, Object ufsConf) {
}
return cachedFs;
}

void clear() {
mUnderFileSystemMap.clear();
}
}

/**
Expand Down Expand Up @@ -182,6 +186,7 @@ public String toString() {
return mScheme + "://" + mAuthority;
}
}

/**
* Gets the UnderFileSystem instance according to its schema.
*
Expand All @@ -205,6 +210,13 @@ public static UnderFileSystem get(String path, Object ufsConf) {
return UFS_CACHE.get(path, ufsConf);
}

/**
* Clears the under file system cache.
*/
public static void clearCache() {
UFS_CACHE.clear();
}

/**
* Returns the name of the under filesystem implementation.
*
Expand Down
19 changes: 19 additions & 0 deletions core/common/src/main/java/alluxio/util/UnderFileSystemUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import alluxio.underfs.UnderFileSystem;

import com.google.common.base.Throwables;

import java.io.IOException;
import java.io.OutputStream;

Expand Down Expand Up @@ -67,5 +69,22 @@ public static void touch(final String path) throws IOException {
os.close();
}

/**
* Deletes the specified path from the specified under file system if it exists. This will not
* delete nonempty directories.
*
* @param ufs the under file system to delete from
* @param path the path to delete
*/
public static void deleteIfExists(UnderFileSystem ufs, String path) {
try {
if (ufs.exists(path)) {
ufs.delete(path, false);
}
} catch (IOException e) {
throw Throwables.propagate(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which exceptions will be propagated? Technically, even if you check for exists before you delete, there is no guarantee it will still exist at the time of the delete. Therefore, there is currently no way to do this atomically.

So, when should this method not throw an error, and when should it? What is the purpose/goal for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc for UnderFileSystem just says @throws IOException if a non-Alluxio error occurs for both exists and delete. This method is never expected to throw an exception.

I don't think the lack of atomicity is a problem. The contract is that if the file path exists, it gets deleted. If it gets deleted between the exists and the delete, the delete will return false and the result is the same (no more file)

}
}

private UnderFileSystemUtils() {} // prevent instantiation
}
2 changes: 2 additions & 0 deletions core/server/src/main/java/alluxio/master/AbstractMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void start(boolean isLeader) throws IOException {
* concurrent access to the master during these phases.
*/

mJournalWriter.recoverCheckpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be commented on in the previous comments about the steps/phases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// Phase 1: Mark all logs as complete, including the current log. After this call, the current
// log should not exist, and all the log files will be complete.
mJournalWriter.completeAllLogs();
Expand Down
62 changes: 51 additions & 11 deletions core/server/src/main/java/alluxio/master/journal/JournalWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import alluxio.exception.ExceptionMessage;
import alluxio.proto.journal.Journal.JournalEntry;
import alluxio.underfs.UnderFileSystem;
import alluxio.util.UnderFileSystemUtils;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -50,8 +52,14 @@ public final class JournalWriter {
private final String mJournalDirectory;
/** Absolute path to the directory storing all completed logs. */
private final String mCompletedDirectory;
/** Absolute path to the checkpoint file. */
private final String mCheckpointPath;
/** Absolute path to the temporary checkpoint file. */
private final String mTempCheckpointPath;
/** Absolute path to the backup checkpoint file. */
private final String mBackupCheckpointPath;
/** Absolute path to the temporary backup checkpoint file. */
private final String mTempBackupCheckpointPath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrmmm, maybe there should be a javadoc somewhere that describes this process. There is a temp, backup, and tempbackup file, which can be confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more detailed javadocs for each of these

/** The UFS where the journal is being written to. */
private final UnderFileSystem mUfs;
private final long mMaxLogSize;
Expand All @@ -76,11 +84,36 @@ public final class JournalWriter {
mJournal = Preconditions.checkNotNull(journal);
mJournalDirectory = mJournal.getDirectory();
mCompletedDirectory = mJournal.getCompletedDirectory();
mCheckpointPath = mJournal.getCheckpointFilePath();
mTempCheckpointPath = mJournal.getCheckpointFilePath() + ".tmp";
mTempBackupCheckpointPath = mJournal.getCheckpointFilePath() + ".backup.tmp";
mBackupCheckpointPath = mJournal.getCheckpointFilePath() + ".backup";
mUfs = UnderFileSystem.get(mJournalDirectory);
mMaxLogSize = Configuration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
}

/**
* Recovers the checkpoint file in case the master crashed while updating it previously.
*/
public synchronized void recoverCheckpoint() {
try {
if (mUfs.exists(mBackupCheckpointPath)) {
if (mUfs.exists(mJournal.getCheckpointFilePath())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should mJournal.getCheckpointFilePath() be mCheckpointPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks

// We must have crashed while cleaning up the completed logs directory and backup
// checkpoint, so we finish these steps now.
deleteCompletedLogs();
mUfs.delete(mBackupCheckpointPath, false);
} else {
// We must have crashed before writing the checkpoint file, restore the checkpoint from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure when this would happen, so if there is a javadoc for this class that describes the checkpointing process, that would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more docs here

// backup.
mUfs.rename(mBackupCheckpointPath, mJournal.getCheckpointFilePath());
}
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
}

/**
* Marks all logs as completed.
*
Expand Down Expand Up @@ -120,6 +153,7 @@ public synchronized JournalOutputStream getCheckpointOutputStream(long latestSeq
mNextEntrySequenceNumber = latestSequenceNumber + 1;
LOG.info("Latest journal sequence number: {} Next journal sequence number: {}",
latestSequenceNumber, mNextEntrySequenceNumber);
UnderFileSystemUtils.deleteIfExists(mUfs, mTempCheckpointPath);
mCheckpointOutputStream =
new CheckpointOutputStream(new DataOutputStream(mUfs.create(mTempCheckpointPath)));
}
Expand Down Expand Up @@ -179,16 +213,15 @@ private synchronized OutputStream openCurrentLog() throws IOException {
*/
private synchronized void deleteCompletedLogs() throws IOException {
LOG.info("Deleting all completed log files...");
// Loop over all complete logs starting from the beginning.
// TODO(gpang): should the deletes start from the end?
// Loop over all complete logs starting from the end.
long logNumber = Journal.FIRST_COMPLETED_LOG_NUMBER;
String logFilename = mJournal.getCompletedLogFilePath(logNumber);
while (mUfs.exists(logFilename)) {
while (mUfs.exists(mJournal.getCompletedLogFilePath(logNumber))) {
logNumber++;
}
for (long i = logNumber - 1; i >= 0; i--) {
String logFilename = mJournal.getCompletedLogFilePath(i);
LOG.info("Deleting completed log: {}", logFilename);
mUfs.delete(logFilename, true);
logNumber++;
// generate the next completed log filename in the sequence.
logFilename = mJournal.getCompletedLogFilePath(logNumber);
}
LOG.info("Finished deleting all completed log files.");

Expand Down Expand Up @@ -274,16 +307,23 @@ public synchronized void close() throws IOException {
mOutputStream.close();

LOG.info("Successfully created tmp checkpoint file: {}", mTempCheckpointPath);
mUfs.delete(mJournal.getCheckpointFilePath(), false);
// TODO(gpang): the real checkpoint should not be overwritten here, but after all operations.
mUfs.rename(mTempCheckpointPath, mJournal.getCheckpointFilePath());
mUfs.delete(mTempCheckpointPath, false);
if (mUfs.exists(mCheckpointPath)) {
UnderFileSystemUtils.deleteIfExists(mUfs, mTempBackupCheckpointPath);
UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);
// Rename in two steps so that we never have identical mCheckpointPath and
// mBackupCheckpointPath. This is a concern since UFS may implement rename as copy + delete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an invariant we want to maintain. Maybe this invariant should be explicitly mentioned in a class javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more docs and centralized the code in one place to make it easier to see all the invariants

mUfs.rename(mCheckpointPath, mTempBackupCheckpointPath);
mUfs.rename(mTempBackupCheckpointPath, mBackupCheckpointPath);
}
mUfs.rename(mTempCheckpointPath, mCheckpointPath);
LOG.info("Renamed checkpoint file {} to {}", mTempCheckpointPath,
mJournal.getCheckpointFilePath());

// The checkpoint already reflects the information in the completed logs.
deleteCompletedLogs();

UnderFileSystemUtils.deleteIfExists(mUfs, mBackupCheckpointPath);

// Consider the current log to be complete.
completeCurrentLog();

Expand Down
77 changes: 77 additions & 0 deletions tests/src/test/java/alluxio/UnderFileSystemSpy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio;

import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemFactory;
import alluxio.underfs.UnderFileSystemRegistry;

import org.mockito.Mockito;

import java.io.Closeable;
import java.io.IOException;

/**
* {@link SpyLocalUnderFileSystem} replaces an {@link UnderFileSystem} in the under file system
* registry and acts identically, but with the ability to mock ufs methods via {@link #get()}.
*
* <pre>
* SpyLocalUnderFileSystem spyUfs = new SpyLocalUnderFileSystem();
* doThrow(new RuntimeException()).when(spyUfs.get()).rename(anyString(), anyString());
* </pre>
*
* Objects of this class must be closed after use so that they unregister themselves from the
* global under file system registry.
*/
public final class UnderFileSystemSpy implements Closeable {
private final UnderFileSystemFactory mFactory;
private final UnderFileSystem mUfsSpy;

/**
* Creates a new {@link SpyLocalUnderFileSystem}.
*
* @param prefix the path prefix to intercept UFS calls on
* @param ufs the under file system to spy
*/
public UnderFileSystemSpy(final String prefix, UnderFileSystem ufs) {
mUfsSpy = Mockito.spy(ufs);
mFactory = new UnderFileSystemFactory() {
@Override
public UnderFileSystem create(String path, Object ufsConf) {
return mUfsSpy;
}

@Override
public boolean supportsPath(String path) {
return path.startsWith(prefix);
}
};
UnderFileSystemRegistry.register(mFactory);
UnderFileSystem.clearCache();
}

/**
* Returns the spy object for the under file system being spied. This is the object to mock in
* tests.
*
* @return the underlying spy object
*/
public UnderFileSystem get() {
return mUfsSpy;
}

@Override
public void close() throws IOException {
UnderFileSystemRegistry.unregister(mFactory);
UnderFileSystem.clearCache();
}
}
79 changes: 79 additions & 0 deletions tests/src/test/java/alluxio/master/JournalIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@

package alluxio.master;

import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;

import alluxio.AlluxioURI;
import alluxio.Configuration;
import alluxio.Constants;
import alluxio.LocalAlluxioClusterResource;
import alluxio.PropertyKey;
import alluxio.UnderFileSystemSpy;
import alluxio.client.WriteType;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
Expand All @@ -35,6 +40,7 @@
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.group.GroupMappingService;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.local.LocalUnderFileSystem;
import alluxio.util.CommonUtils;
import alluxio.util.IdUtils;
import alluxio.util.io.PathUtils;
Expand All @@ -48,6 +54,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -588,6 +595,78 @@ public void setAcl() throws Exception {
aclTestUtil(status, user);
}

@Test
public void failDuringCheckpointRename() throws Exception {
AlluxioURI file = new AlluxioURI("/file");
mFileSystem.createFile(file).close();
// Restart the master once so that it creates a checkpoint file.
mLocalAlluxioCluster.stopFS();
createFsMasterFromJournal();
try (UnderFileSystemSpy ufsSpy =
new UnderFileSystemSpy("/", new LocalUnderFileSystem(new AlluxioURI("/")))) {
doThrow(new RuntimeException("Failed to rename")).when(ufsSpy.get())
.rename(Mockito.contains("FileSystemMaster/checkpoint.data.tmp"), anyString());
try {
// Restart the master again, but with renaming the checkpoint file failing.
mLocalAlluxioCluster.stopFS();
createFsMasterFromJournal();
Assert.fail("Should have failed during rename");
} catch (RuntimeException e) {
Assert.assertEquals("Failed to rename", e.getMessage());
}
}
// We shouldn't lose track of the fact that the file is loaded into memory.
FileSystemMaster fsMaster = createFsMasterFromJournal();
Assert.assertTrue(fsMaster.getInMemoryFiles().contains(file));
}

@Test
public void failDuringCheckpointDelete() throws Exception {
AlluxioURI file = new AlluxioURI("/file");
mFileSystem.createFile(file).close();
// Restart the master once so that it creates a checkpoint file.
mLocalAlluxioCluster.stopFS();
createFsMasterFromJournal();
try (UnderFileSystemSpy ufsSpy =
new UnderFileSystemSpy("/", new LocalUnderFileSystem(new AlluxioURI("/")))) {
doThrow(new RuntimeException("Failed to delete")).when(ufsSpy.get())
.delete(Mockito.contains("FileSystemMaster/checkpoint.data"), anyBoolean());
try {
// Restart the master again, but with deleting the checkpoint file failing.
mLocalAlluxioCluster.stopFS();
createFsMasterFromJournal();
Assert.fail("Should have failed during delete");
} catch (RuntimeException e) {
Assert.assertEquals("Failed to delete", e.getMessage());
}
}
// We shouldn't lose track of the fact that the file is loaded into memory.
FileSystemMaster fsMaster = createFsMasterFromJournal();
Assert.assertTrue(fsMaster.getInMemoryFiles().contains(file));
}

@Test
public void failWhileDeletingCompletedLogs() throws Exception {
AlluxioURI file = new AlluxioURI("/file");
mFileSystem.createFile(file).close();
try (UnderFileSystemSpy ufsSpy =
new UnderFileSystemSpy("/", new LocalUnderFileSystem(new AlluxioURI("/")))) {
doThrow(new RuntimeException("Failed to delete completed log")).when(ufsSpy.get())
.delete(Mockito.contains("FileSystemMaster/completed"), anyBoolean());
try {
// Restart the master again, but with deleting the checkpoint file failing.
mLocalAlluxioCluster.stopFS();
createFsMasterFromJournal();
Assert.fail("Should have failed during delete");
} catch (RuntimeException e) {
Assert.assertEquals("Failed to delete completed log", e.getMessage());
}
}
// We shouldn't lose track of the fact that the file is loaded into memory.
FileSystemMaster fsMaster = createFsMasterFromJournal();
Assert.assertTrue(fsMaster.getInMemoryFiles().contains(file));
}

private void aclTestUtil(URIStatus status, String user) throws Exception {
FileSystemMaster fsMaster = createFsMasterFromJournal();

Expand Down