Skip to content

Commit

Permalink
Make file watcher startable on any node (elastic#101572)
Browse files Browse the repository at this point in the history
The file settings service must only be started on the master node.
However, the abstract super class for watching any file should be usable
on any node type. This commit reworks the abstract file watcher to work
on any node by moving the cluster state based logic to detect the master
node directly into the file settings service. Additionally, if the file
watcher cannot be started because the grandparent directory of the
watched file does not exist, a warning is emitted.

relates ES-6783
  • Loading branch information
rjernst authored Oct 31, 2023
1 parent 553d2d4 commit 93b69a9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.reservedstate.service.FileChangedListener;
Expand All @@ -27,8 +22,6 @@
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
Expand All @@ -52,11 +45,10 @@
* An implementation class should override {@link #processFileChanges()} to define
* the correct behavior.</p>
*/
public abstract class AbstractFileWatchingService extends AbstractLifecycleComponent implements ClusterStateListener {
public abstract class AbstractFileWatchingService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(AbstractFileWatchingService.class);
private static final int REGISTER_RETRY_COUNT = 5;
private final ClusterService clusterService;
private final Path watchedFileDir;
private final Path watchedFile;
private final List<FileChangedListener> eventListeners;
Expand All @@ -65,10 +57,8 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
private FileUpdateState fileUpdateState;
private WatchKey settingsDirWatchKey;
private WatchKey configDirWatchKey;
private volatile boolean active = false;

public AbstractFileWatchingService(ClusterService clusterService, Path watchedFile) {
this.clusterService = clusterService;
public AbstractFileWatchingService(Path watchedFile) {
this.watchedFile = watchedFile;
this.watchedFileDir = watchedFile.getParent();
this.eventListeners = new CopyOnWriteArrayList<>();
Expand All @@ -84,40 +74,6 @@ public AbstractFileWatchingService(ClusterService clusterService, Path watchedFi
*/
protected abstract void processFileChanges() throws InterruptedException, ExecutionException, IOException;

/**
* There may be an indication in cluster state that the file we are watching
* should be re-processed: for example, after cluster state has been restored
* from a snapshot. By default, we do nothing, but this method should be overridden
* if different behavior is desired.
* @param clusterState State of the cluster
* @return false, by default
*/
protected boolean shouldRefreshFileState(ClusterState clusterState) {
return false;
}

/**
* 'Touches' the settings file so the file watcher will re-processes it.
* <p>
* The file processing is asynchronous, the cluster state or the file must be already updated such that
* the version information in the file is newer than what's already saved as processed in the
* cluster state.
*
* For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state
* metadata version must be reset to 0 and saved in the cluster state.
*/
private void refreshExistingFileStateIfNeeded(ClusterState clusterState) {
if (watching()) {
if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) {
try {
Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now()));
} catch (IOException e) {
logger.warn("encountered I/O error trying to update file settings timestamp", e);
}
}
}
}

public final void addFileChangedListener(FileChangedListener listener) {
eventListeners.add(listener);
}
Expand All @@ -131,33 +87,12 @@ public final Path watchedFile() {
}

@Override
public final void clusterChanged(ClusterChangedEvent event) {
ClusterState clusterState = event.state();
if (clusterState.nodes().isLocalNodeElectedMaster()) {
startWatcher(clusterState);
} else if (event.previousState().nodes().isLocalNodeElectedMaster()) {
stopWatcher();
}
protected void doStart() {
startWatcher();
}

@Override
protected final void doStart() {
// We start the file watcher when we know we are master from a cluster state change notification.
// We need the additional active flag, since cluster state can change after we've shutdown the service
// causing the watcher to start again.
this.active = Files.exists(watchedFileDir().getParent());
if (active == false) {
// we don't have a config directory, we can't possibly launch the file settings service
return;
}
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected final void doStop() {
this.active = false;
protected void doStop() {
logger.debug("Stopping file watching service");
stopWatcher();
}
Expand All @@ -184,10 +119,9 @@ final boolean watchedFileChanged(Path path) throws IOException {
return (previousUpdateState == null || previousUpdateState.equals(fileUpdateState) == false);
}

private synchronized void startWatcher(ClusterState clusterState) {
if (watching() || active == false) {
refreshExistingFileStateIfNeeded(clusterState);

protected final synchronized void startWatcher() {
if (Files.exists(watchedFileDir.getParent()) == false) {
logger.warn("File watcher for [{}] cannot start because grandparent directory does not exist", watchedFile);
return;
}

Expand Down Expand Up @@ -295,7 +229,7 @@ protected final void watcherThread() {
}
}

final synchronized void stopWatcher() {
protected final synchronized void stopWatcher() {
if (watching()) {
logger.debug("stopping watcher ...");
// make sure watch service is closed whatever
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.file.AbstractFileWatchingService;
import org.elasticsearch.env.Environment;
Expand All @@ -22,6 +25,8 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xcontent.XContentType.JSON;
Expand All @@ -37,14 +42,16 @@
* the service as a listener to cluster state changes, so that we can enable the file watcher thread when this
* node becomes a master node.
*/
public class FileSettingsService extends AbstractFileWatchingService {
public class FileSettingsService extends AbstractFileWatchingService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(FileSettingsService.class);

public static final String SETTINGS_FILE_NAME = "settings.json";
public static final String NAMESPACE = "file_settings";
public static final String OPERATOR_DIRECTORY = "operator";
private final ClusterService clusterService;
private final ReservedClusterStateService stateService;
private volatile boolean active = false;

/**
* Constructs the {@link FileSettingsService}
Expand All @@ -54,10 +61,70 @@ public class FileSettingsService extends AbstractFileWatchingService {
* @param environment we need the environment to pull the location of the config and operator directories
*/
public FileSettingsService(ClusterService clusterService, ReservedClusterStateService stateService, Environment environment) {
super(clusterService, environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME));
super(environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME));
this.clusterService = clusterService;
this.stateService = stateService;
}

@Override
protected void doStart() {
// We start the file watcher when we know we are master from a cluster state change notification.
// We need the additional active flag, since cluster state can change after we've shutdown the service
// causing the watcher to start again.
this.active = Files.exists(watchedFileDir().getParent());
if (active == false) {
// we don't have a config directory, we can't possibly launch the file settings service
return;
}
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
this.active = false;
super.doStop();
}

@Override
public final void clusterChanged(ClusterChangedEvent event) {
ClusterState clusterState = event.state();
if (clusterState.nodes().isLocalNodeElectedMaster()) {
synchronized (this) {
if (watching() || active == false) {
refreshExistingFileStateIfNeeded(clusterState);
return;
}
startWatcher();
}
} else if (event.previousState().nodes().isLocalNodeElectedMaster()) {
stopWatcher();
}
}

/**
* 'Touches' the settings file so the file watcher will re-processes it.
* <p>
* The file processing is asynchronous, the cluster state or the file must be already updated such that
* the version information in the file is newer than what's already saved as processed in the
* cluster state.
*
* For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state
* metadata version must be reset to 0 and saved in the cluster state.
*/
private void refreshExistingFileStateIfNeeded(ClusterState clusterState) {
if (watching()) {
if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) {
try {
Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now()));
} catch (IOException e) {
logger.warn("encountered I/O error trying to update file settings timestamp", e);
}
}
}
}

/**
* Used by snapshot restore service {@link org.elasticsearch.snapshots.RestoreService} to prepare the reserved
* state of the snapshot for the current cluster.
Expand Down Expand Up @@ -95,8 +162,7 @@ public void handleSnapshotRestore(ClusterState clusterState, Metadata.Builder md
* @param clusterState State of the cluster
* @return true if file settings metadata version is exactly 0, false otherwise.
*/
@Override
protected boolean shouldRefreshFileState(ClusterState clusterState) {
private boolean shouldRefreshFileState(ClusterState clusterState) {
// We check if the version was reset to 0, and force an update if a file exists. This can happen in situations
// like snapshot restores.
ReservedStateMetadata fileSettingsMetadata = clusterState.metadata().reservedStateMetadata().get(NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.common.file;

import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -58,13 +57,13 @@ class TestFileWatchingService extends AbstractFileWatchingService {

private final CountDownLatch countDownLatch;

TestFileWatchingService(ClusterService clusterService, Path watchedFile) {
super(clusterService, watchedFile);
TestFileWatchingService(Path watchedFile) {
super(watchedFile);
this.countDownLatch = null;
}

TestFileWatchingService(ClusterService clusterService, Path watchedFile, CountDownLatch countDownLatch) {
super(clusterService, watchedFile);
TestFileWatchingService(Path watchedFile, CountDownLatch countDownLatch) {
super(watchedFile);
this.countDownLatch = countDownLatch;
}

Expand Down Expand Up @@ -99,7 +98,7 @@ public void setUp() throws Exception {

Files.createDirectories(env.configFile());

fileWatchingService = new TestFileWatchingService(clusterService, getWatchedFilePath(env));
fileWatchingService = new TestFileWatchingService(getWatchedFilePath(env));
}

@After
Expand All @@ -110,7 +109,6 @@ public void tearDown() throws Exception {

public void testStartStop() {
fileWatchingService.start();
fileWatchingService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
assertTrue(fileWatchingService.watching());
fileWatchingService.stop();
assertFalse(fileWatchingService.watching());
Expand Down Expand Up @@ -143,10 +141,9 @@ public void testWatchedFile() throws Exception {
public void testCallsProcessing() throws Exception {
CountDownLatch processFileLatch = new CountDownLatch(1);

AbstractFileWatchingService service = new TestFileWatchingService(clusterService, getWatchedFilePath(env), processFileLatch);
AbstractFileWatchingService service = new TestFileWatchingService(getWatchedFilePath(env), processFileLatch);

service.start();
service.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
assertTrue(service.watching());

Files.createDirectories(service.watchedFileDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ public void tearDown() throws Exception {
threadpool.shutdownNow();
}

public void testStartStop() {
fileSettingsService.start();
assertFalse(fileSettingsService.watching());
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
assertTrue(fileSettingsService.watching());
fileSettingsService.stop();
assertFalse(fileSettingsService.watching());
fileSettingsService.close();
}

public void testOperatorDirName() {
Path operatorPath = fileSettingsService.watchedFileDir();
assertTrue(operatorPath.startsWith(env.configFile()));
Expand Down

0 comments on commit 93b69a9

Please sign in to comment.