Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24340][Core] Clean up non-shuffle disk block manager files following executor exits on a Standalone cluster #21390

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.network.util;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -91,11 +88,24 @@ public static String bytesToString(ByteBuffer b) {
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
deleteRecursively(file, null);
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
*
* @param file Input file / dir to be deleted
* @param filter A filename filter that make sure only files / dirs with the satisfied filenames
* are deleted.
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
if (file == null) { return; }

// On Unix systems, use operating system command to run faster
// If that does not work out, fallback to the Java IO way
if (SystemUtils.IS_OS_UNIX) {
if (SystemUtils.IS_OS_UNIX && filter == null) {
try {
deleteRecursivelyUsingUnixNative(file);
return;
Expand All @@ -105,15 +115,17 @@ public static void deleteRecursively(File file) throws IOException {
}
}

deleteRecursivelyUsingJavaIO(file);
deleteRecursivelyUsingJavaIO(file, filter);
}

private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
private static void deleteRecursivelyUsingJavaIO(
File file,
FilenameFilter filter) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
for (File child : listFilesSafely(file, filter)) {
try {
deleteRecursively(child);
deleteRecursively(child, filter);
} catch (IOException e) {
// In case of multiple exceptions, only last one will be thrown
savedIOException = e;
Expand All @@ -124,10 +136,13 @@ private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
}
}

boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
// Delete file only when it's a normal file or an empty directory.
if (file.isFile() || (file.isDirectory() && listFilesSafely(file, null).length == 0)) {
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
}
}
}

Expand Down Expand Up @@ -157,10 +172,10 @@ private static void deleteRecursivelyUsingUnixNative(File file) throws IOExcepti
}
}

private static File[] listFilesSafely(File file) throws IOException {
private static File[] listFilesSafely(File file, FilenameFilter filter) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
if (files == null) {
File[] files = file.listFiles(filter);
if (files == null && filter == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that lisFiles(FileFilter filter) (with a non-null filter) returns a null when there are no hits? The documentation isn't clear, but if it was to be consistent with the zero-parameter listFiles(), it would return an empty array. null should be reserved for errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, null is reserved for errors, and it's possible that listFiles(filter) returns null when the list operation fail.

throw new IOException("Failed to list files for dir: " + file);
}
return files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}

/**
* Clean up any non-shuffle files in any local directories associated with an finished executor.
*/
public void executorRemoved(String executorId, String appId) {
blockManager.executorRemoved(executorId, appId);
}

/**
* Register an (application, executor) with the given shuffle info.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,26 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
}
}

/**
* Removes all the non-shuffle files in any local directories associated with the finished
* executor.
*/
public void executorRemoved(String executorId, String appId) {
logger.info("Clean up non-shuffle files associated with the finished executor {}", executorId);
AppExecId fullId = new AppExecId(appId, executorId);
final ExecutorShuffleInfo executor = executors.get(fullId);
if (executor == null) {
// Executor not registered, skip clean up of the local directories.
logger.info("Executor is not registered (appId={}, execId={})", appId, executorId);
} else {
logger.info("Cleaning up non-shuffle files in executor {}'s {} local dirs", fullId,
executor.localDirs.length);

// Execute the actual deletion in a different thread, as it may take some time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 , thanks for doing this in a separate thread. This indeed could take a very long time under certain rare conditions.

directoryCleaner.execute(() -> deleteNonShuffleFiles(executor.localDirs));
}
}

/**
* Synchronously deletes each directory one at a time.
* Should be executed in its own thread, as this may take a long time.
Expand All @@ -226,6 +246,29 @@ private void deleteExecutorDirs(String[] dirs) {
}
}

private FilenameFilter filter = new FilenameFilter() {
Copy link
Contributor

@JoshRosen JoshRosen May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it feasible to define this as a local variable inside of deleteNonShuffleFiles in order to reduce its scope to be as small as possible and make it clear that this is only used within deleteNonShuffleFiles?

@Override
public boolean accept(File dir, String name) {
// Don't delete shuffle data or shuffle index files.
return !name.endsWith(".index") && !name.endsWith(".data");
}
};

/**
* Synchronously deletes non-shuffle files in each directory recursively.
* Should be executed in its own thread, as this may take a long time.
*/
private void deleteNonShuffleFiles(String[] dirs) {
for (String localDir : dirs) {
try {
JavaUtils.deleteRecursively(new File(localDir), filter);
logger.debug("Successfully cleaned up non-shuffle files in directory: {}", localDir);
} catch (Exception e) {
logger.error("Failed to delete non-shuffle files in directory: " + localDir, e);
}
}
}

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;

public class NonShuffleFilesCleanupSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test to check that we preserve the old behavior in case the new configuration is set to false? An end-to-end test will likely be prone to flakiness, so instead maybe we could somehow test that shuffleService. executorRemoved() is not called if the configuration is false. One way to do that would be to move the construction of new ExternalShuffleService from the default constructor of Worker into its public constructor and then inject it in the new Worker call. This, in turn, would let you inject either a mock or spy in order to verify call counts. Do you know if we have this style of test for other Worker functionality? Is this a ton of work or is it relatively simple to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, Let me try it later. :)


// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
private TransportConf conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";

@Test
public void cleanupOnRemovedExecutorWithShuffleFiles() throws IOException {
cleanupOnRemovedExecutor(true);
}

@Test
public void cleanupOnRemovedExecutorWithoutShuffleFiles() throws IOException {
cleanupOnRemovedExecutor(false);
}

private void cleanupOnRemovedExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);

ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
resolver.executorRemoved("exec0", "app");

assertCleanedUp(dataContext);
}

@Test
public void cleanupUsesExecutorWithShuffleFiles() throws IOException {
cleanupUsesExecutor(true);
}

@Test
public void cleanupUsesExecutorWithoutShuffleFiles() throws IOException {
cleanupUsesExecutor(false);
}

private void cleanupUsesExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);

AtomicBoolean cleanupCalled = new AtomicBoolean(false);

// Executor which does nothing to ensure we're actually using it.
Executor noThreadExecutor = runnable -> cleanupCalled.set(true);

ExternalShuffleBlockResolver manager =
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);

manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
manager.executorRemoved("exec0", "app");

assertTrue(cleanupCalled.get());
assertStillThere(dataContext);
}

@Test
public void cleanupOnlyRemovedExecutorWithShuffleFiles() throws IOException {
cleanupOnlyRemovedExecutor(true);
}

@Test
public void cleanupOnlyRemovedExecutorWithoutShuffleFiles() throws IOException {
cleanupOnlyRemovedExecutor(false);
}

private void cleanupOnlyRemovedExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext0 = initDataContext(withShuffleFiles);
TestShuffleDataContext dataContext1 = initDataContext(withShuffleFiles);

ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));


resolver.executorRemoved("exec-nonexistent", "app");
assertStillThere(dataContext0);
assertStillThere(dataContext1);

resolver.executorRemoved("exec0", "app");
assertCleanedUp(dataContext0);
assertStillThere(dataContext1);

resolver.executorRemoved("exec1", "app");
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);

// Make sure it's not an error to cleanup multiple times
resolver.executorRemoved("exec1", "app");
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
}

@Test
public void cleanupOnlyRegisteredExecutorWithShuffleFiles() throws IOException {
cleanupOnlyRegisteredExecutor(true);
}

@Test
public void cleanupOnlyRegisteredExecutorWithoutShuffleFiles() throws IOException {
cleanupOnlyRegisteredExecutor(false);
}

private void cleanupOnlyRegisteredExecutor(boolean withShuffleFiles) throws IOException {
TestShuffleDataContext dataContext = initDataContext(withShuffleFiles);

ExternalShuffleBlockResolver resolver =
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));

resolver.executorRemoved("exec1", "app");
assertStillThere(dataContext);

resolver.executorRemoved("exec0", "app");
assertCleanedUp(dataContext);
}

private static void assertStillThere(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
}
}

private static FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
// Don't delete shuffle data or shuffle index files.
return !name.endsWith(".index") && !name.endsWith(".data");
}
};

private static boolean assertOnlyShuffleDataInDir(File[] dirs) {
for (File dir : dirs) {
assertTrue(dir.getName() + " wasn't cleaned up", !dir.exists() ||
dir.listFiles(filter).length == 0 || assertOnlyShuffleDataInDir(dir.listFiles()));
}
return true;
}

private static void assertCleanedUp(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
File[] dirs = new File[] {new File(localDir)};
assertOnlyShuffleDataInDir(dirs);
}
}

private static TestShuffleDataContext initDataContext(boolean withShuffleFiles)
throws IOException {
if (withShuffleFiles) {
return initDataContextWithShuffleFiles();
} else {
return initDataContextWithoutShuffleFiles();
}
}

private static TestShuffleDataContext initDataContextWithShuffleFiles() throws IOException {
TestShuffleDataContext dataContext = createDataContext();
createShuffleFiles(dataContext);
createNonShuffleFiles(dataContext);
return dataContext;
}

private static TestShuffleDataContext initDataContextWithoutShuffleFiles() throws IOException {
TestShuffleDataContext dataContext = createDataContext();
createNonShuffleFiles(dataContext);
return dataContext;
}

private static TestShuffleDataContext createDataContext() {
TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
dataContext.create();
return dataContext;
}

private static void createShuffleFiles(TestShuffleDataContext dataContext) throws IOException {
Random rand = new Random(123);
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
}

private static void createNonShuffleFiles(TestShuffleDataContext dataContext) throws IOException {
// Create spill file(s)
dataContext.insertSpillData();
}
}
Loading