diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java new file mode 100644 index 00000000000..a09267f449d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java @@ -0,0 +1,162 @@ +package org.apache.zookeeper.common; + +import com.sun.nio.file.SensitivityWatchEventModifier; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.function.Consumer; + +/** + * Instances of this class can be used to watch a directory for file changes. When a file is added to, deleted from, + * or is modified in the given directory, the callback provided by the user will be called from a background thread. + * Some things to keep in mind: + * + */ +public final class FileChangeWatcher { + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); + + private final WatcherThread watcherThread; + + /** + * Creates a watcher that watches dirPath and invokes callback on changes. + * + * @param dirPath the directory to watch. + * @param callback the callback to invoke with events. event.kind() will return the type of event, + * and event.context() will return the filename relative to dirPath. + * @throws IOException if there is an error creating the WatchService. + */ + public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + FileSystem fs = dirPath.getFileSystem(); + WatchService watchService = fs.newWatchService(); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering with watch service: " + dirPath); + } + dirPath.register( + watchService, + new WatchEvent.Kind[]{ + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.OVERFLOW}, + SensitivityWatchEventModifier.HIGH); + this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread.setDaemon(true); + this.watcherThread.start(); + } + + /** + * Waits for the background thread to enter the main loop before returning. This method exists mostly to make + * the unit tests simpler, which is why it is package private. + * + * @throws InterruptedException if this thread is interrupted while waiting for the background thread to start. + */ + void waitForBackgroundThreadToStart() throws InterruptedException { + synchronized (watcherThread) { + while (!watcherThread.started) { + watcherThread.wait(); + } + } + } + + /** + * Tells the background thread to stop. Does not wait for it to exit. + */ + public void stop() { + watcherThread.shouldStop = true; + watcherThread.interrupt(); + } + + /** + * Tells the background thread to stop and waits for it to exit. Only used by unit tests, which is why it is package + * private. + */ + void stopAndJoinBackgroundThread() throws InterruptedException { + stop(); + watcherThread.join(); + } + + /** + * Inner class that implements the watcher thread logic. + */ + private static class WatcherThread extends ZooKeeperThread { + private static final String THREAD_NAME = "FileChangeWatcher"; + + volatile boolean shouldStop; + volatile boolean started; + final WatchService watchService; + final Consumer> callback; + + WatcherThread(WatchService watchService, Consumer> callback) { + super(THREAD_NAME); + this.shouldStop = this.started = false; + this.watchService = watchService; + this.callback = callback; + } + + @Override + public void run() { + LOG.info(getName() + " thread started"); + synchronized (this) { + started = true; + this.notifyAll(); + } + try { + runLoop(); + } finally { + try { + watchService.close(); + } catch (IOException e) { + LOG.warn("Error closing watch service", e); + } + LOG.info(getName() + " thread finished"); + } + } + + private void runLoop() { + while (!shouldStop) { + WatchKey key; + try { + key = watchService.take(); + } catch (InterruptedException|ClosedWatchServiceException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + " was interrupted and is shutting down ..."); + } + break; + } + for (WatchEvent event : key.pollEvents()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got file changed event: " + event.kind() + " with context: " + event.context()); + } + try { + callback.accept(event); + } catch (Throwable e) { + LOG.error("Error from callback", e); + } + } + boolean isKeyValid = key.reset(); + if (!isKeyValid) { + // This is likely a problem, it means that file reloading is broken, probably because the + // directory we are watching was deleted or otherwise became inaccessible (unmounted, permissions + // changed, ???). + // For now, we log an error and exit the watcher thread. + LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); + break; + } + } + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java index ac65b7d303a..12d556f5cf0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java @@ -18,6 +18,9 @@ package org.apache.zookeeper.common; +import org.apache.zookeeper.common.X509Exception.KeyManagerException; +import org.apache.zookeeper.common.X509Exception.SSLContextException; +import org.apache.zookeeper.common.X509Exception.TrustManagerException; import org.apache.zookeeper.util.PemReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +44,10 @@ import java.io.IOException; import java.net.Socket; import java.security.InvalidAlgorithmParameterException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; import java.security.GeneralSecurityException; import java.security.KeyManagementException; import java.security.KeyStore; @@ -54,10 +61,7 @@ import java.util.Arrays; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; - -import org.apache.zookeeper.common.X509Exception.KeyManagerException; -import org.apache.zookeeper.common.X509Exception.SSLContextException; -import org.apache.zookeeper.common.X509Exception.TrustManagerException; +import java.util.function.Consumer; /** * Utility code for X509 handling @@ -144,6 +148,8 @@ public static StoreFileType fromPropertyValue(String prop) { private String[] cipherSuites; private AtomicReference defaultSSLContext = new AtomicReference<>(null); + private FileChangeWatcher keyStoreFileWatcher; + private FileChangeWatcher trustStoreFileWatcher; public X509Util() { String cipherSuitesInput = System.getProperty(cipherSuitesProperty); @@ -223,6 +229,11 @@ public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextExceptio return result; } + private void resetDefaultSSLContext() throws X509Exception.SSLContextException { + SSLContext newContext = createSSLContext(); + defaultSSLContext.set(newContext); + } + private SSLContext createSSLContext() throws SSLContextException { /* * Since Configuration initializes the key store and trust store related @@ -544,4 +555,109 @@ public static StoreFileType detectStoreFileTypeFromFileExtension(File filename) } throw new IOException("Unable to auto-detect store file type from file name: " + filename); } + + /** + * Enables automatic reloading of the trust store and key store files when they change on disk. + * + * @throws IOException if creating the FileChangeWatcher objects fails. + */ + public void enableCertFileReloading() throws IOException { + ZKConfig config = new ZKConfig(); + String keyStoreLocation = config.getProperty(sslKeystoreLocationProperty); + if (keyStoreLocation != null && !keyStoreLocation.isEmpty()) { + final Path filePath = Paths.get(keyStoreLocation).toAbsolutePath(); + FileChangeWatcher newKeyStoreFileWatcher = new FileChangeWatcher( + filePath.getParent(), + new Consumer>() { + @Override + public void accept(WatchEvent watchEvent) { + handleWatchEvent(filePath, watchEvent); + } + }); + // stop old watcher if there is one + if (keyStoreFileWatcher != null) { + keyStoreFileWatcher.stop(); + keyStoreFileWatcher = newKeyStoreFileWatcher; + } + } + String trustStoreLocation = config.getProperty(sslTruststoreLocationProperty); + if (trustStoreLocation != null && !trustStoreLocation.isEmpty()) { + final Path filePath = Paths.get(trustStoreLocation).toAbsolutePath(); + FileChangeWatcher newTrustStoreFileWatcher = new FileChangeWatcher( + filePath.getParent(), + new Consumer>() { + @Override + public void accept(WatchEvent watchEvent) { + handleWatchEvent(filePath, watchEvent); + } + }); + // stop old watcher if there is one + if (trustStoreFileWatcher != null) { + trustStoreFileWatcher.stop(); + trustStoreFileWatcher = newTrustStoreFileWatcher; + } + } + } + + /** + * Disables automatic reloading of the trust store and key store files when they change on disk. + * Stops background threads and closes WatchService instances. + */ + public void disableCertFileReloading() { + if (keyStoreFileWatcher != null) { + keyStoreFileWatcher.stop(); + keyStoreFileWatcher = null; + } + if (trustStoreFileWatcher != null) { + trustStoreFileWatcher.stop(); + trustStoreFileWatcher = null; + } + } + + // Finalizer guardian object, see Effective Java item 7 + @SuppressWarnings("unused") + private final Object finalizerGuardian = new Object() { + @Override + protected void finalize() { + disableCertFileReloading(); + } + }; + + /** + * Handler for watch events that let us know a file we may care about has changed on disk. + * + * @param filePath the path to the file we are watching for changes. + * @param event the WatchEvent. + */ + private void handleWatchEvent(Path filePath, WatchEvent event) { + boolean shouldResetContext = false; + Path dirPath = filePath.getParent(); + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + // If we get notified about possibly missed events, reload the key store / trust store just to be sure. + shouldResetContext = true; + } else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) || + event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) { + Path eventFilePath = dirPath.resolve((Path) event.context()); + if (filePath.equals(eventFilePath)) { + shouldResetContext = true; + } + } + // Note: we don't care about delete events + if (shouldResetContext) { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to reset default SSL context after receiving watch event: " + + event.kind() + " with context: " + event.context()); + } + try { + this.resetDefaultSSLContext(); + } catch (SSLContextException e) { + throw new RuntimeException(e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " + + event.kind() + " with context: " + event.context()); + } + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index aee5efcd68b..b061768edd6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -71,6 +71,7 @@ public class QuorumPeerConfig { protected InetSocketAddress secureClientPortAddress; protected boolean sslQuorum = false; protected boolean shouldUsePortUnification = false; + protected boolean sslQuorumReloadCertFiles = false; protected File dataDir; protected File dataLogDir; protected String dynamicConfigFileStr = null; @@ -317,6 +318,8 @@ public void parseProperties(Properties zkProp) sslQuorum = Boolean.parseBoolean(value); } else if (key.equals("portUnification")){ shouldUsePortUnification = Boolean.parseBoolean(value); + } else if (key.equals("sslQuorumReloadCertFiles")) { + sslQuorumReloadCertFiles = Boolean.parseBoolean(value); } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) { throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file"); } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index d2a02b20303..a73b4162a2d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -202,6 +202,9 @@ public void runFromConfig(QuorumPeerConfig config) quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); + if (config.sslQuorumReloadCertFiles) { + quorumPeer.getX509Util().enableCertFileReloading(); + } // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java new file mode 100644 index 00000000000..d615f192700 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java @@ -0,0 +1,242 @@ +package org.apache.zookeeper.common; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.test.ClientBase; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class FileChangeWatcherTest extends ZKTestCase { + private static File tempDir; + private static File tempFile; + + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcherTest.class); + + @BeforeClass + public static void createTempFile() throws IOException { + tempDir = ClientBase.createEmptyTestDir(); + tempFile = File.createTempFile("zk_test_", "", tempDir); + tempFile.deleteOnExit(); + } + + @Test + public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + new Consumer>() { + @Override + public void accept(WatchEvent event) { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + for (int i = 0; i < 3; i++) { + LOG.info("Modifying file, attempt " + (i + 1)); + FileUtils.writeStringToFile( + tempFile, + "Hello world " + i + "\n", + StandardCharsets.UTF_8, + true); + synchronized (events) { + if (events.size() < i + 1) { + events.wait(3000L); + } + assertEquals("Wrong number of events", i + 1, events.size()); + WatchEvent event = events.get(i); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + new Consumer>() { + @Override + public void accept(WatchEvent event) { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + LOG.info("Touching file"); + FileUtils.touch(tempFile); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + new Consumer>() { + @Override + public void accept(WatchEvent event) { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + File tempFile2 = File.createTempFile("zk_test_", "", tempDir); + tempFile2.deleteOnExit(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); + assertEquals(tempFile2.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + new Consumer>() { + @Override + public void accept(WatchEvent event) { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + tempFile.delete(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackErrorDoesNotCrashWatcherThread() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher( + tempDir.toPath(), + new Consumer>() { + @Override + public void accept(WatchEvent event) { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (callCount) { + if (callCount.getAndIncrement() == 0) { + callCount.notifyAll(); + throw new RuntimeException("This error should not crash the watcher thread"); + } + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + while (callCount.get() == 0) { + callCount.wait(3000L); + } + } + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertEquals(2, callCount.get()); + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + + } +}