diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java new file mode 100644 index 00000000000..701020bbf3d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import com.sun.nio.file.SensitivityWatchEventModifier; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.function.Consumer; + +/** + * Instances of this class can be used to watch a directory for file changes. When a file is added to, deleted from, + * or is modified in the given directory, the callback provided by the user will be called from a background thread. + * Some things to keep in mind: + * + */ +public final class FileChangeWatcher { + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); + + private final WatcherThread watcherThread; + + /** + * Creates a watcher that watches dirPath and invokes callback on changes. + * + * @param dirPath the directory to watch. + * @param callback the callback to invoke with events. event.kind() will return the type of event, + * and event.context() will return the filename relative to dirPath. + * @throws IOException if there is an error creating the WatchService. + */ + public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + FileSystem fs = dirPath.getFileSystem(); + WatchService watchService = fs.newWatchService(); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering with watch service: " + dirPath); + } + dirPath.register( + watchService, + new WatchEvent.Kind[]{ + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.OVERFLOW}, + SensitivityWatchEventModifier.HIGH); + this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread.setDaemon(true); + this.watcherThread.start(); + } + + /** + * Waits for the background thread to enter the main loop before returning. This method exists mostly to make + * the unit tests simpler, which is why it is package private. + * + * @throws InterruptedException if this thread is interrupted while waiting for the background thread to start. + */ + void waitForBackgroundThreadToStart() throws InterruptedException { + synchronized (watcherThread) { + while (!watcherThread.started) { + watcherThread.wait(); + } + } + } + + /** + * Tells the background thread to stop. Does not wait for it to exit. + */ + public void stop() { + watcherThread.shouldStop = true; + watcherThread.interrupt(); + } + + /** + * Tells the background thread to stop and waits for it to exit. Only used by unit tests, which is why it is package + * private. + */ + void stopAndJoinBackgroundThread() throws InterruptedException { + stop(); + watcherThread.join(); + } + + /** + * Inner class that implements the watcher thread logic. + */ + private static class WatcherThread extends ZooKeeperThread { + private static final String THREAD_NAME = "FileChangeWatcher"; + + volatile boolean shouldStop; + volatile boolean started; + final WatchService watchService; + final Consumer> callback; + + WatcherThread(WatchService watchService, Consumer> callback) { + super(THREAD_NAME); + this.shouldStop = this.started = false; + this.watchService = watchService; + this.callback = callback; + } + + @Override + public void run() { + LOG.info(getName() + " thread started"); + synchronized (this) { + started = true; + this.notifyAll(); + } + try { + runLoop(); + } finally { + try { + watchService.close(); + } catch (IOException e) { + LOG.warn("Error closing watch service", e); + } + LOG.info(getName() + " thread finished"); + } + } + + private void runLoop() { + while (!shouldStop) { + WatchKey key; + try { + key = watchService.take(); + } catch (InterruptedException|ClosedWatchServiceException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + " was interrupted and is shutting down ..."); + } + break; + } + for (WatchEvent event : key.pollEvents()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got file changed event: " + event.kind() + " with context: " + event.context()); + } + try { + callback.accept(event); + } catch (Throwable e) { + LOG.error("Error from callback", e); + } + } + boolean isKeyValid = key.reset(); + if (!isKeyValid) { + // This is likely a problem, it means that file reloading is broken, probably because the + // directory we are watching was deleted or otherwise became inaccessible (unmounted, permissions + // changed, ???). + // For now, we log an error and exit the watcher thread. + LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); + break; + } + } + } + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java index e3625a51c60..82d0a5050af 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/X509Util.java @@ -21,6 +21,10 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.Socket; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; import java.security.GeneralSecurityException; import java.security.KeyManagementException; import java.security.KeyStore; @@ -30,6 +34,7 @@ import java.security.cert.X509CertSelector; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.net.ssl.CertPathTrustManagerParameters; import javax.net.ssl.KeyManager; @@ -93,6 +98,8 @@ public abstract class X509Util { private String[] cipherSuites; private AtomicReference defaultSSLContext = new AtomicReference<>(null); + private FileChangeWatcher keyStoreFileWatcher; + private FileChangeWatcher trustStoreFileWatcher; public X509Util() { String cipherSuitesInput = System.getProperty(cipherSuitesProperty); @@ -172,6 +179,11 @@ public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextExceptio return result; } + private void resetDefaultSSLContext() throws X509Exception.SSLContextException { + SSLContext newContext = createSSLContext(); + defaultSSLContext.set(newContext); + } + private SSLContext createSSLContext() throws SSLContextException { /* * Since Configuration initializes the key store and trust store related @@ -446,4 +458,117 @@ private String[] getDefaultCipherSuites() { LOG.debug("Using Java8-optimized cipher suites for Java version {}", javaVersion); return DEFAULT_CIPHERS_JAVA8; } + + /** + * Enables automatic reloading of the trust store and key store files when they change on disk. + * + * @throws IOException if creating the FileChangeWatcher objects fails. + */ + public void enableCertFileReloading() throws IOException { + LOG.info("enabling cert file reloading"); + ZKConfig config = new ZKConfig(); + String keyStoreLocation = config.getProperty(sslKeystoreLocationProperty); + if (keyStoreLocation != null && !keyStoreLocation.isEmpty()) { + final Path filePath = Paths.get(keyStoreLocation).toAbsolutePath(); + Path parentPath = filePath.getParent(); + if (parentPath == null) { + throw new IOException( + "Key store path does not have a parent: " + filePath); + } + FileChangeWatcher newKeyStoreFileWatcher = new FileChangeWatcher( + parentPath, + watchEvent -> { + handleWatchEvent(filePath, watchEvent); + }); + // stop old watcher if there is one + if (keyStoreFileWatcher != null) { + keyStoreFileWatcher.stop(); + } + keyStoreFileWatcher = newKeyStoreFileWatcher; + } + String trustStoreLocation = config.getProperty(sslTruststoreLocationProperty); + if (trustStoreLocation != null && !trustStoreLocation.isEmpty()) { + final Path filePath = Paths.get(trustStoreLocation).toAbsolutePath(); + Path parentPath = filePath.getParent(); + if (parentPath == null) { + throw new IOException( + "Trust store path does not have a parent: " + filePath); + } + FileChangeWatcher newTrustStoreFileWatcher = new FileChangeWatcher( + parentPath, + watchEvent -> { + handleWatchEvent(filePath, watchEvent); + }); + // stop old watcher if there is one + if (trustStoreFileWatcher != null) { + trustStoreFileWatcher.stop(); + } + trustStoreFileWatcher = newTrustStoreFileWatcher; + } + } + + /** + * Disables automatic reloading of the trust store and key store files when they change on disk. + * Stops background threads and closes WatchService instances. + */ + public void disableCertFileReloading() { + if (keyStoreFileWatcher != null) { + keyStoreFileWatcher.stop(); + keyStoreFileWatcher = null; + } + if (trustStoreFileWatcher != null) { + trustStoreFileWatcher.stop(); + trustStoreFileWatcher = null; + } + } + + // Finalizer guardian object, see Effective Java item 7 + // TODO: finalize() is deprecated starting with Java 10. This needs to be + // replaced with an explicit shutdown call. + @SuppressWarnings("unused") + private final Object finalizerGuardian = new Object() { + @SuppressWarnings("deprecation") + @Override + protected void finalize() { + disableCertFileReloading(); + } + }; + + /** + * Handler for watch events that let us know a file we may care about has changed on disk. + * + * @param filePath the path to the file we are watching for changes. + * @param event the WatchEvent. + */ + private void handleWatchEvent(Path filePath, WatchEvent event) { + boolean shouldResetContext = false; + Path dirPath = filePath.getParent(); + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + // If we get notified about possibly missed events, reload the key store / trust store just to be sure. + shouldResetContext = true; + } else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) || + event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) { + Path eventFilePath = dirPath.resolve((Path) event.context()); + if (filePath.equals(eventFilePath)) { + shouldResetContext = true; + } + } + // Note: we don't care about delete events + if (shouldResetContext) { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to reset default SSL context after receiving watch event: " + + event.kind() + " with context: " + event.context()); + } + try { + this.resetDefaultSSLContext(); + } catch (SSLContextException e) { + throw new RuntimeException(e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " + + event.kind() + " with context: " + event.context()); + } + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index aee5efcd68b..b061768edd6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -71,6 +71,7 @@ public class QuorumPeerConfig { protected InetSocketAddress secureClientPortAddress; protected boolean sslQuorum = false; protected boolean shouldUsePortUnification = false; + protected boolean sslQuorumReloadCertFiles = false; protected File dataDir; protected File dataLogDir; protected String dynamicConfigFileStr = null; @@ -317,6 +318,8 @@ public void parseProperties(Properties zkProp) sslQuorum = Boolean.parseBoolean(value); } else if (key.equals("portUnification")){ shouldUsePortUnification = Boolean.parseBoolean(value); + } else if (key.equals("sslQuorumReloadCertFiles")) { + sslQuorumReloadCertFiles = Boolean.parseBoolean(value); } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) { throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file"); } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index d2a02b20303..a73b4162a2d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -202,6 +202,9 @@ public void runFromConfig(QuorumPeerConfig config) quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); + if (config.sslQuorumReloadCertFiles) { + quorumPeer.getX509Util().enableCertFileReloading(); + } // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java new file mode 100644 index 00000000000..56a6da1278c --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.test.ClientBase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class FileChangeWatcherTest extends ZKTestCase { + private static File tempDir; + private static File tempFile; + + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcherTest.class); + + @BeforeClass + public static void createTempFile() throws IOException { + tempDir = ClientBase.createEmptyTestDir(); + tempFile = File.createTempFile("zk_test_", "", tempDir); + tempFile.deleteOnExit(); + } + + @AfterClass + public static void cleanupTempDir() { + try { + FileUtils.deleteDirectory(tempDir); + } catch (IOException e) { + // ignore + } + } + + @Test + public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + for (int i = 0; i < 3; i++) { + LOG.info("Modifying file, attempt " + (i + 1)); + FileUtils.writeStringToFile( + tempFile, + "Hello world " + i + "\n", + StandardCharsets.UTF_8, + true); + synchronized (events) { + if (events.size() < i + 1) { + events.wait(3000L); + } + assertEquals("Wrong number of events", i + 1, events.size()); + WatchEvent event = events.get(i); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + LOG.info("Touching file"); + FileUtils.touch(tempFile); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + File tempFile2 = File.createTempFile("zk_test_", "", tempDir); + tempFile2.deleteOnExit(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); + assertEquals(tempFile2.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher( + tempDir.toPath(), + event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + tempFile.delete(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + } + + @Test + public void testCallbackErrorDoesNotCrashWatcherThread() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher( + tempDir.toPath(), + event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (callCount) { + if (callCount.getAndIncrement() == 0) { + callCount.notifyAll(); + throw new RuntimeException("This error should not crash the watcher thread"); + } + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.waitForBackgroundThreadToStart(); + Thread.sleep(1000L); // XXX hack + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + while (callCount.get() == 0) { + callCount.wait(3000L); + } + } + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + synchronized (events) { + if (events.isEmpty()) { + events.wait(3000L); + } + assertEquals(2, callCount.get()); + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stopAndJoinBackgroundThread(); + } + } + + } +}