-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove node from cluster when node locks broken #61400
Changes from 1 commit
9478e2b
3ce8839
eeb51ba
c43b6fb
841a8b1
4a207fb
9caac47
fd9d292
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH | |
private static final Logger logger = LogManager.getLogger(FsHealthService.class); | ||
private final ThreadPool threadPool; | ||
private volatile boolean enabled; | ||
private volatile boolean brokenLock; | ||
private final TimeValue refreshInterval; | ||
private volatile TimeValue slowPathLoggingThreshold; | ||
private final NodeEnvironment nodeEnv; | ||
|
@@ -117,6 +118,8 @@ public StatusInfo getHealth() { | |
Set<Path> unhealthyPaths = this.unhealthyPaths; | ||
if (enabled == false) { | ||
statusInfo = new StatusInfo(HEALTHY, "health check disabled"); | ||
} else if (brokenLock == true) { | ||
statusInfo = new StatusInfo(UNHEALTHY, "health check failed on node due to broken locks"); | ||
} else if (unhealthyPaths == null) { | ||
statusInfo = new StatusInfo(HEALTHY, "health check passed"); | ||
} else { | ||
|
@@ -150,32 +153,39 @@ public void run() { | |
|
||
private void monitorFSHealth() { | ||
Set<Path> currentUnhealthyPaths = null; | ||
for (Path path : nodeEnv.nodeDataPaths()) { | ||
long executionStartTime = currentTimeMillisSupplier.getAsLong(); | ||
try { | ||
if (Files.exists(path)) { | ||
Path tempDataPath = path.resolve(TEMP_FILE_NAME); | ||
Files.deleteIfExists(tempDataPath); | ||
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) { | ||
os.write(byteToWrite); | ||
IOUtils.fsync(tempDataPath, false); | ||
boolean lockAssertionFailed = false; | ||
try { | ||
for (Path path : nodeEnv.nodeDataPaths()) { | ||
long executionStartTime = currentTimeMillisSupplier.getAsLong(); | ||
try { | ||
if (Files.exists(path)) { | ||
Path tempDataPath = path.resolve(TEMP_FILE_NAME); | ||
Files.deleteIfExists(tempDataPath); | ||
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) { | ||
os.write(byteToWrite); | ||
IOUtils.fsync(tempDataPath, false); | ||
} | ||
Files.delete(tempDataPath); | ||
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime; | ||
if (elapsedTime > slowPathLoggingThreshold.millis()) { | ||
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]", | ||
path, elapsedTime, slowPathLoggingThreshold); | ||
} | ||
} | ||
Files.delete(tempDataPath); | ||
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime; | ||
if (elapsedTime > slowPathLoggingThreshold.millis()) { | ||
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]", | ||
path, elapsedTime, slowPathLoggingThreshold); | ||
} catch (Exception ex) { | ||
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); | ||
if (currentUnhealthyPaths == null) { | ||
currentUnhealthyPaths = new HashSet<>(1); | ||
} | ||
currentUnhealthyPaths.add(path); | ||
} | ||
} catch (Exception ex) { | ||
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); | ||
if (currentUnhealthyPaths == null) { | ||
currentUnhealthyPaths = new HashSet<>(1); | ||
} | ||
currentUnhealthyPaths.add(path); | ||
} | ||
} catch (IllegalStateException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer the |
||
logger.error("Lock assertions failed due to", e); | ||
lockAssertionFailed = true; | ||
} | ||
unhealthyPaths = currentUnhealthyPaths; | ||
brokenLock = lockAssertionFailed; | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,8 +42,11 @@ | |
import java.io.OutputStream; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.FileSystem; | ||
import java.nio.file.OpenOption; | ||
import java.nio.file.Path; | ||
import java.nio.file.Files; | ||
import java.nio.file.LinkOption; | ||
import java.nio.file.OpenOption; | ||
import java.nio.file.attribute.BasicFileAttributes; | ||
import java.nio.file.attribute.FileAttribute; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -231,6 +234,132 @@ public void testFailsHealthOnSinglePathWriteFailure() throws IOException { | |
} | ||
} | ||
|
||
public void testFailsHealthOnMissingLockFile() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thorough tests 😄 However they're not really testing anything in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, i am thinking to keep two of them where one throws an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, one is all we need here.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, Got it. |
||
final Settings settings = Settings.EMPTY; | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertEquals("health check passed", fsHealthService.getHealth().getInfo()); | ||
|
||
//Deleting Lock file | ||
Path[] paths = env.nodeDataPaths(); | ||
Path deletedPath = randomFrom(paths); | ||
deletedPath = deletedPath.resolve(NodeEnvironment.NODE_LOCK_FILENAME); | ||
Files.deleteIfExists(deletedPath); | ||
|
||
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on node due to broken locks")); | ||
} finally { | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
public void testFailsHealthOnMissingLockFileWithDisruptedPath() throws IOException { | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
FileSystemIOExceptionProvider disruptWritesFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem); | ||
fileSystem = disruptWritesFileSystemProvider.getFileSystem(null); | ||
PathUtilsForTesting.installMock(fileSystem); | ||
final Settings settings = Settings.EMPTY; | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
Path[] paths = env.nodeDataPaths(); | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertEquals("health check passed", fsHealthService.getHealth().getInfo()); | ||
|
||
//disrupt file system writes on single path | ||
disruptWritesFileSystemProvider.injectIOException.set(true); | ||
String disruptedPath = randomFrom(paths).toString(); | ||
disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath); | ||
|
||
//Deleting Lock file | ||
Path deletedPath = randomFrom(paths); | ||
deletedPath = deletedPath.resolve(NodeEnvironment.NODE_LOCK_FILENAME); | ||
Files.deleteIfExists(deletedPath); | ||
|
||
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on node due to broken locks")); | ||
assertEquals(0, disruptWritesFileSystemProvider.getInjectedPathCount()); | ||
|
||
} finally { | ||
disruptWritesFileSystemProvider.injectIOException.set(false); | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
public void testFailsHealthOnUnexpectedLockFileSize() throws IOException { | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
final Settings settings = Settings.EMPTY; | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider( | ||
fileSystem, 1, testThreadPool); | ||
fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null); | ||
PathUtilsForTesting.installMock(fileSystem); | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertEquals("health check passed", fsHealthService.getHealth().getInfo()); | ||
|
||
// enabling unexpected file size injection | ||
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true); | ||
|
||
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on node due to broken locks")); | ||
assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount()); | ||
|
||
} finally { | ||
unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false); | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
public void testFailsHealthOnIncorrectCreationTime() throws IOException { | ||
FileSystem fileSystem = PathUtils.getDefaultFileSystem(); | ||
final Settings settings = Settings.EMPTY; | ||
TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); | ||
FileSystemIncorrectCreationTimeProvider incorrectCreationTimeFileSystemProvider = new FileSystemIncorrectCreationTimeProvider( | ||
fileSystem, System.currentTimeMillis()-100000, testThreadPool); | ||
fileSystem = incorrectCreationTimeFileSystemProvider.getFileSystem(null); | ||
PathUtilsForTesting.installMock(fileSystem); | ||
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
try (NodeEnvironment env = newNodeEnvironment()) { | ||
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(HEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertEquals("health check passed", fsHealthService.getHealth().getInfo()); | ||
|
||
// enabling incorrect file creation time injection | ||
incorrectCreationTimeFileSystemProvider.injectIncorrectCreationTime.set(true); | ||
|
||
fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); | ||
fsHealthService.new FsHealthMonitor().run(); | ||
assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus()); | ||
assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on node due to broken locks")); | ||
assertEquals(1, incorrectCreationTimeFileSystemProvider.getInjectedPathCount()); | ||
|
||
} finally { | ||
incorrectCreationTimeFileSystemProvider.injectIncorrectCreationTime.set(false); | ||
PathUtilsForTesting.teardown(); | ||
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider { | ||
|
||
AtomicBoolean injectIOException = new AtomicBoolean(); | ||
|
@@ -254,7 +383,8 @@ public int getInjectedPathCount(){ | |
public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException { | ||
if (injectIOException.get()){ | ||
assert pathPrefix != null : "must set pathPrefix before starting disruptions"; | ||
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) { | ||
if (path.toString().startsWith(pathPrefix) && path.toString(). | ||
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
injectedPaths.incrementAndGet(); | ||
throw new IOException("fake IOException"); | ||
} | ||
|
@@ -289,7 +419,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, | |
public void force(boolean metaData) throws IOException { | ||
if (injectIOException.get()) { | ||
assert pathPrefix != null : "must set pathPrefix before starting disruptions"; | ||
if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(".es_temp_file")) { | ||
if (path.toString().startsWith(pathPrefix) && path.toString(). | ||
endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
injectedPaths.incrementAndGet(); | ||
throw new IOException("fake IOException"); | ||
} | ||
|
@@ -341,4 +472,69 @@ public void force(boolean metaData) throws IOException { | |
}; | ||
} | ||
} | ||
|
||
private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider { | ||
|
||
AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean(); | ||
AtomicInteger injectedPaths = new AtomicInteger(); | ||
|
||
private final long size; | ||
private final ThreadPool threadPool; | ||
|
||
FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) { | ||
super("disrupt_fs_health://", inner); | ||
this.size = size; | ||
this.threadPool = threadPool; | ||
} | ||
|
||
public int getInjectedPathCount(){ | ||
return injectedPaths.get(); | ||
} | ||
|
||
@Override | ||
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException { | ||
return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { | ||
@Override | ||
public long size() throws IOException { | ||
if (injectUnexpectedFileSize.get()) { | ||
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) { | ||
injectedPaths.incrementAndGet(); | ||
return size; | ||
} | ||
} | ||
return super.size(); | ||
} | ||
}; | ||
} | ||
} | ||
|
||
private static class FileSystemIncorrectCreationTimeProvider extends FilterFileSystemProvider { | ||
|
||
AtomicBoolean injectIncorrectCreationTime = new AtomicBoolean(); | ||
AtomicInteger injectedPaths = new AtomicInteger(); | ||
|
||
private final long time; | ||
private final ThreadPool threadPool; | ||
|
||
FileSystemIncorrectCreationTimeProvider(FileSystem inner, long time, ThreadPool threadPool) { | ||
super("disrupt_fs_health://", inner); | ||
this.time = time; | ||
this.threadPool = threadPool; | ||
} | ||
|
||
public int getInjectedPathCount(){ | ||
return injectedPaths.get(); | ||
} | ||
|
||
@Override | ||
public <A extends BasicFileAttributes> A readAttributes(Path path, Class<A> type, LinkOption... options) throws IOException { | ||
if (injectIncorrectCreationTime.get()) { | ||
if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) { | ||
injectedPaths.incrementAndGet(); | ||
path.toFile().setLastModified(time); | ||
} | ||
} | ||
return super.readAttributes(path, type, options); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor wording nit: specify which lock was broken (and remove redundant
on node
), suggest this: