Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove node from cluster when node locks broken #61400

Merged
merged 8 commits into from
Sep 22, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH
private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
private volatile TimeValue slowPathLoggingThreshold;
private final NodeEnvironment nodeEnv;
Expand Down Expand Up @@ -117,6 +118,8 @@ public StatusInfo getHealth() {
Set<Path> unhealthyPaths = this.unhealthyPaths;
if (enabled == false) {
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
} else if (brokenLock) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
} else if (unhealthyPaths == null) {
statusInfo = new StatusInfo(HEALTHY, "health check passed");
} else {
Expand Down Expand Up @@ -150,7 +153,16 @@ public void run() {

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
for (Path path : nodeEnv.nodeDataPaths()) {
Path[] paths = null;
try {
paths = nodeEnv.nodeDataPaths();
} catch (IllegalStateException e) {
logger.error("health check failed", e);
brokenLock = true;
return;
}

for (Path path : paths) {
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Expand All @@ -176,6 +188,7 @@ private void monitorFSHealth() {
}
}
unhealthyPaths = currentUnhealthyPaths;
brokenLock = false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -231,6 +231,36 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
}
}

public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
FileSystem fileSystem = PathUtils.getDefaultFileSystem();
final Settings settings = Settings.EMPTY;
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider(
fileSystem, 1, testThreadPool);
fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null);
PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
assertEquals("health check passed", fsHealthService.getHealth().getInfo());

// enabling unexpected file size injection
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);

fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
fsHealthService.new FsHealthMonitor().run();
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
} finally {
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false);
PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {

AtomicBoolean injectIOException = new AtomicBoolean();
Expand All @@ -254,7 +284,8 @@ public int getInjectedPathCount(){
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
if (injectIOException.get()){
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -289,7 +320,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
public void force(boolean metaData) throws IOException {
if (injectIOException.get()) {
assert pathPrefix != null : "must set pathPrefix before starting disruptions";
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) {
if (path.toString().startsWith(pathPrefix) && path.toString().
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

injectedPaths.incrementAndGet();
throw new IOException("fake IOException");
}
Expand Down Expand Up @@ -341,4 +373,39 @@ public void force(boolean metaData) throws IOException {
};
}
}

private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider {

AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean();
AtomicInteger injectedPaths = new AtomicInteger();

private final long size;
private final ThreadPool threadPool;

FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) {
super("disrupt_fs_health://", inner);
this.size = size;
this.threadPool = threadPool;
}

public int getInjectedPathCount(){
return injectedPaths.get();
}

@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
@Override
public long size() throws IOException {
if (injectUnexpectedFileSize.get()) {
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) {
injectedPaths.incrementAndGet();
return size;
}
}
return super.size();
}
};
}
}
}