From 93b69a95846753de4cb4db76e4a91a7724613cf3 Mon Sep 17 00:00:00 2001
From: Ryan Ernst
- * The file processing is asynchronous, the cluster state or the file must be already updated such that - * the version information in the file is newer than what's already saved as processed in the - * cluster state. - * - * For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state - * metadata version must be reset to 0 and saved in the cluster state. - */ - private void refreshExistingFileStateIfNeeded(ClusterState clusterState) { - if (watching()) { - if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) { - try { - Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now())); - } catch (IOException e) { - logger.warn("encountered I/O error trying to update file settings timestamp", e); - } - } - } - } - public final void addFileChangedListener(FileChangedListener listener) { eventListeners.add(listener); } @@ -131,33 +87,12 @@ public final Path watchedFile() { } @Override - public final void clusterChanged(ClusterChangedEvent event) { - ClusterState clusterState = event.state(); - if (clusterState.nodes().isLocalNodeElectedMaster()) { - startWatcher(clusterState); - } else if (event.previousState().nodes().isLocalNodeElectedMaster()) { - stopWatcher(); - } + protected void doStart() { + startWatcher(); } @Override - protected final void doStart() { - // We start the file watcher when we know we are master from a cluster state change notification. - // We need the additional active flag, since cluster state can change after we've shutdown the service - // causing the watcher to start again. - this.active = Files.exists(watchedFileDir().getParent()); - if (active == false) { - // we don't have a config directory, we can't possibly launch the file settings service - return; - } - if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { - clusterService.addListener(this); - } - } - - @Override - protected final void doStop() { - this.active = false; + protected void doStop() { logger.debug("Stopping file watching service"); stopWatcher(); } @@ -184,10 +119,9 @@ final boolean watchedFileChanged(Path path) throws IOException { return (previousUpdateState == null || previousUpdateState.equals(fileUpdateState) == false); } - private synchronized void startWatcher(ClusterState clusterState) { - if (watching() || active == false) { - refreshExistingFileStateIfNeeded(clusterState); - + protected final synchronized void startWatcher() { + if (Files.exists(watchedFileDir.getParent()) == false) { + logger.warn("File watcher for [{}] cannot start because grandparent directory does not exist", watchedFile); return; } @@ -295,7 +229,7 @@ protected final void watcherThread() { } } - final synchronized void stopWatcher() { + protected final synchronized void stopWatcher() { if (watching()) { logger.debug("stopping watcher ..."); // make sure watch service is closed whatever diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java index e780b36568190..e1f6b4e80977f 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java @@ -11,9 +11,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ReservedStateMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.file.AbstractFileWatchingService; import org.elasticsearch.env.Environment; @@ -22,6 +25,8 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.attribute.FileTime; +import java.time.Instant; import java.util.concurrent.ExecutionException; import static org.elasticsearch.xcontent.XContentType.JSON; @@ -37,14 +42,16 @@ * the service as a listener to cluster state changes, so that we can enable the file watcher thread when this * node becomes a master node. */ -public class FileSettingsService extends AbstractFileWatchingService { +public class FileSettingsService extends AbstractFileWatchingService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(FileSettingsService.class); public static final String SETTINGS_FILE_NAME = "settings.json"; public static final String NAMESPACE = "file_settings"; public static final String OPERATOR_DIRECTORY = "operator"; + private final ClusterService clusterService; private final ReservedClusterStateService stateService; + private volatile boolean active = false; /** * Constructs the {@link FileSettingsService} @@ -54,10 +61,70 @@ public class FileSettingsService extends AbstractFileWatchingService { * @param environment we need the environment to pull the location of the config and operator directories */ public FileSettingsService(ClusterService clusterService, ReservedClusterStateService stateService, Environment environment) { - super(clusterService, environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME)); + super(environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME)); + this.clusterService = clusterService; this.stateService = stateService; } + @Override + protected void doStart() { + // We start the file watcher when we know we are master from a cluster state change notification. + // We need the additional active flag, since cluster state can change after we've shutdown the service + // causing the watcher to start again. + this.active = Files.exists(watchedFileDir().getParent()); + if (active == false) { + // we don't have a config directory, we can't possibly launch the file settings service + return; + } + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + clusterService.addListener(this); + } + } + + @Override + protected void doStop() { + this.active = false; + super.doStop(); + } + + @Override + public final void clusterChanged(ClusterChangedEvent event) { + ClusterState clusterState = event.state(); + if (clusterState.nodes().isLocalNodeElectedMaster()) { + synchronized (this) { + if (watching() || active == false) { + refreshExistingFileStateIfNeeded(clusterState); + return; + } + startWatcher(); + } + } else if (event.previousState().nodes().isLocalNodeElectedMaster()) { + stopWatcher(); + } + } + + /** + * 'Touches' the settings file so the file watcher will re-processes it. + *
+ * The file processing is asynchronous, the cluster state or the file must be already updated such that + * the version information in the file is newer than what's already saved as processed in the + * cluster state. + * + * For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state + * metadata version must be reset to 0 and saved in the cluster state. + */ + private void refreshExistingFileStateIfNeeded(ClusterState clusterState) { + if (watching()) { + if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) { + try { + Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now())); + } catch (IOException e) { + logger.warn("encountered I/O error trying to update file settings timestamp", e); + } + } + } + } + /** * Used by snapshot restore service {@link org.elasticsearch.snapshots.RestoreService} to prepare the reserved * state of the snapshot for the current cluster. @@ -95,8 +162,7 @@ public void handleSnapshotRestore(ClusterState clusterState, Metadata.Builder md * @param clusterState State of the cluster * @return true if file settings metadata version is exactly 0, false otherwise. */ - @Override - protected boolean shouldRefreshFileState(ClusterState clusterState) { + private boolean shouldRefreshFileState(ClusterState clusterState) { // We check if the version was reset to 0, and force an update if a file exists. This can happen in situations // like snapshot restores. ReservedStateMetadata fileSettingsMetadata = clusterState.metadata().reservedStateMetadata().get(NAMESPACE); diff --git a/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java b/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java index 7f271a21cd08b..e12312844e571 100644 --- a/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.common.file; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -58,13 +57,13 @@ class TestFileWatchingService extends AbstractFileWatchingService { private final CountDownLatch countDownLatch; - TestFileWatchingService(ClusterService clusterService, Path watchedFile) { - super(clusterService, watchedFile); + TestFileWatchingService(Path watchedFile) { + super(watchedFile); this.countDownLatch = null; } - TestFileWatchingService(ClusterService clusterService, Path watchedFile, CountDownLatch countDownLatch) { - super(clusterService, watchedFile); + TestFileWatchingService(Path watchedFile, CountDownLatch countDownLatch) { + super(watchedFile); this.countDownLatch = countDownLatch; } @@ -99,7 +98,7 @@ public void setUp() throws Exception { Files.createDirectories(env.configFile()); - fileWatchingService = new TestFileWatchingService(clusterService, getWatchedFilePath(env)); + fileWatchingService = new TestFileWatchingService(getWatchedFilePath(env)); } @After @@ -110,7 +109,6 @@ public void tearDown() throws Exception { public void testStartStop() { fileWatchingService.start(); - fileWatchingService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE)); assertTrue(fileWatchingService.watching()); fileWatchingService.stop(); assertFalse(fileWatchingService.watching()); @@ -143,10 +141,9 @@ public void testWatchedFile() throws Exception { public void testCallsProcessing() throws Exception { CountDownLatch processFileLatch = new CountDownLatch(1); - AbstractFileWatchingService service = new TestFileWatchingService(clusterService, getWatchedFilePath(env), processFileLatch); + AbstractFileWatchingService service = new TestFileWatchingService(getWatchedFilePath(env), processFileLatch); service.start(); - service.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE)); assertTrue(service.watching()); Files.createDirectories(service.watchedFileDir()); diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java index 130dd8b0025e1..c25f6dd7e97c2 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java @@ -112,6 +112,16 @@ public void tearDown() throws Exception { threadpool.shutdownNow(); } + public void testStartStop() { + fileSettingsService.start(); + assertFalse(fileSettingsService.watching()); + fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE)); + assertTrue(fileSettingsService.watching()); + fileSettingsService.stop(); + assertFalse(fileSettingsService.watching()); + fileSettingsService.close(); + } + public void testOperatorDirName() { Path operatorPath = fileSettingsService.watchedFileDir(); assertTrue(operatorPath.startsWith(env.configFile()));