Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow ConfigWatcher to watch several files (#236) #237

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.intellij.openapi.diagnostic.Logger;
import io.fabric8.kubernetes.api.model.Config;
import io.fabric8.kubernetes.client.internal.KubeConfigUtils;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.file.FileSystems;
Expand All @@ -24,14 +23,19 @@
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ConfigWatcher implements Runnable {

private static final Logger LOG = Logger.getInstance(ConfigWatcher.class);

private final Path config;
protected Listener listener;
private final List<Path> configs;
protected final Listener listener;
private final HighSensitivityRegistrar registrar;
private WatchService service;

public interface Listener {
void onUpdate(ConfigWatcher source, Config config);
Expand All @@ -42,94 +46,146 @@ public ConfigWatcher(String config, Listener listener) {
}

public ConfigWatcher(Path config, Listener listener) {
this.config = config;
this(List.of(config), listener);
}

public ConfigWatcher(List<Path> configs, Listener listener) {
this(configs, listener, new HighSensitivityRegistrar());
}

public ConfigWatcher(List<Path> configs, Listener listener, HighSensitivityRegistrar registrar) {
this.configs = configs;
this.listener = listener;
this.registrar = registrar;
}

@Override
public void run() {
runOnConfigChange((Config config) -> {
listener.onUpdate(this, config);
});
watch((Config config) -> listener.onUpdate(this, config));
}

protected Config loadConfig() {
try {
return ConfigHelper.loadKubeConfig(config.toAbsolutePath().toString());
} catch (IOException e) {
return null;
public void close() throws IOException {
if (service != null) {
service.close();
}
}

private void runOnConfigChange(Consumer<Config> consumer) {
try (WatchService service = newWatchService()) {
registerWatchService(service);
WatchKey key;
while ((key = service.take()) != null) {
key.pollEvents().stream()
.forEach((event) -> consumer.accept(loadConfig(getPath(event))));
key.reset();
}
} catch (IOException | InterruptedException e) {
private void watch(Consumer<Config> consumer) {
try (WatchService service = createWatchService()) {
Collection<Path> watchedDirectories = getWatchedDirectories();
watchedDirectories.forEach(directory ->
new ConfigDirectoryWatch(directory, consumer, service, registrar).start()
);
} catch (IOException e) {
String configPaths = configs.stream()
.map(path -> path.toAbsolutePath().toString())
.collect(Collectors.joining());
Logger.getInstance(ConfigWatcher.class).warn(
"Could not watch kubernetes config file at " + config.toAbsolutePath(), e);
"Could not watch kubernetes config file at " + configPaths, e);
}
}

protected WatchService newWatchService() throws IOException {
return FileSystems.getDefault().newWatchService();
protected WatchService createWatchService() throws IOException {
return this.service = FileSystems.getDefault().newWatchService();
}

@NotNull
private void registerWatchService(WatchService service) throws IOException {
HighSensitivityRegistrar modifier = new HighSensitivityRegistrar();
modifier.registerService(getWatchedPath(),
new WatchEvent.Kind[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE},
service);
private Collection<Path> getWatchedDirectories() {
return configs.stream()
.filter(this::isFileInDirectory)
.map(Path::getParent)
.collect(Collectors.toSet());
}

protected boolean isConfigPath(Path path) {
return path.equals(config);
protected boolean isFileInDirectory(Path path) {
return path != null
&& Files.isRegularFile(path)
&& Files.isDirectory(path.getParent());
}

/**
* Returns {@link Config} for the given path if the kube config file
* <ul>
* <li>exists and</li>
* <li>is not empty and</li>
* <li>is valid yaml</li>
* </ul>
* Returns {@code null} otherwise.
*
* @param path the path to the kube config
* @return returns true if the kube config that the event points to exists, is not empty and is valid yaml
*/
private Config loadConfig(Path path) {
if (path == null) {
return null;
private class ConfigDirectoryWatch {
private final Path directory;
private final WatchService service;
private final HighSensitivityRegistrar registrar;
private final Consumer<Config> consumer;

private ConfigDirectoryWatch(Path directory, Consumer<Config> consumer, WatchService service, HighSensitivityRegistrar registrar) {
this.directory = directory;
this.consumer = consumer;
this.service = service;
this.registrar = registrar;
}
try {
if (Files.exists(path)
&& isConfigPath(path)
&& Files.size(path) > 0) {
return KubeConfigUtils.parseConfig(path.toFile());

private void start() {
try {
register(directory, service, registrar);
watch(consumer, service);
} catch (InterruptedException e) {
LOG.warn("Watching " + directory + " was interrupted", e);
} catch (IOException e) {
LOG.warn("Could not watch " + directory, e);
}
} catch (Exception e) {
// only catch
LOG.warn("Could not load kube config at " + path.toAbsolutePath(), e);
}
return null;
}

private Path getPath(WatchEvent<?> event) {
return getWatchedPath().resolve((Path) event.context());
}
private void register(Path path, WatchService service, HighSensitivityRegistrar registrar) throws IOException {
registrar.registerService(path,
new WatchEvent.Kind[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE
},
service);
}

private void watch(Consumer<Config> consumer, WatchService service) throws InterruptedException {
for (WatchKey key = service.take(); key != null; key = service.take()) {
key.pollEvents().forEach((event) -> {
Path changed = getAbsolutePath(directory, (Path) event.context());
if (isConfigPath(changed)) {
consumer.accept(loadConfig(changed));
}
});
key.reset();
}
}

protected boolean isConfigPath(Path path) {
return configs != null
&& configs.contains(path);
}

/**
* Returns {@link Config} for the given path if the kube config file
* <ul>
* <li>exists and</li>
* <li>is not empty and</li>
* <li>is valid yaml</li>
* </ul>
* Returns {@code null} otherwise.
*
* @param path the path to the kube config
* @return returns true if the kube config that the event points to exists, is not empty and is valid yaml
*/
private Config loadConfig(Path path) {
// TODO: replace by Config#getKubeConfigFiles once kubernetes-client 7.0 is available
if (path == null) {
return null;
}
try {
if (Files.exists(path)
&& Files.size(path) > 0) {
return KubeConfigUtils.parseConfig(path.toFile());
}
} catch (Exception e) {
// only catch
LOG.warn("Could not load kube config at " + path.toAbsolutePath(), e);
}
return null;
}

private Path getAbsolutePath(Path directory, Path relativePath) {
return directory.resolve(relativePath);
}

private Path getWatchedPath() {
return config.getParent();
}

}
Loading
Loading