Skip to content

Commit

Permalink
ZOOKEEPER-3174: Quorum TLS - support reloading trust/key store
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmaykov committed Oct 25, 2018
1 parent f9fb9c6 commit 65edf69
Show file tree
Hide file tree
Showing 5 changed files with 530 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package org.apache.zookeeper.common;

import com.sun.nio.file.SensitivityWatchEventModifier;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.function.Consumer;

/**
* Instances of this class can be used to watch a directory for file changes. When a file is added to, deleted from,
* or is modified in the given directory, the callback provided by the user will be called from a background thread.
* Some things to keep in mind:
* <ul>
* <li>The callback should be thread-safe.</li>
* <li>Changes that happen around the time the thread is started may be missed.</li>
* <li>There is a delay between a file changing and the callback firing.</li>
* <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
* </ul>
*/
public final class FileChangeWatcher {
private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);

private final WatcherThread watcherThread;

/**
* Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on changes.
*
* @param dirPath the directory to watch.
* @param callback the callback to invoke with events. <code>event.kind()</code> will return the type of event,
* and <code>event.context()</code> will return the filename relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();
if (LOG.isDebugEnabled()) {
LOG.debug("Registering with watch service: " + dirPath);
}
dirPath.register(
watchService,
new WatchEvent.Kind<?>[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW},
SensitivityWatchEventModifier.HIGH);
this.watcherThread = new WatcherThread(watchService, callback);
this.watcherThread.setDaemon(true);
this.watcherThread.start();
}

/**
* Waits for the background thread to enter the main loop before returning. This method exists mostly to make
* the unit tests simpler, which is why it is package private.
*
* @throws InterruptedException if this thread is interrupted while waiting for the background thread to start.
*/
void waitForBackgroundThreadToStart() throws InterruptedException {
synchronized (watcherThread) {
while (!watcherThread.started) {
watcherThread.wait();
}
}
}

/**
* Tells the background thread to stop. Does not wait for it to exit.
*/
public void stop() {
watcherThread.shouldStop = true;
watcherThread.interrupt();
}

/**
* Tells the background thread to stop and waits for it to exit. Only used by unit tests, which is why it is package
* private.
*/
void stopAndJoinBackgroundThread() throws InterruptedException {
stop();
watcherThread.join();
}

/**
* Inner class that implements the watcher thread logic.
*/
private static class WatcherThread extends ZooKeeperThread {
private static final String THREAD_NAME = "FileChangeWatcher";

volatile boolean shouldStop;
volatile boolean started;
final WatchService watchService;
final Consumer<WatchEvent<?>> callback;

WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
super(THREAD_NAME);
this.shouldStop = this.started = false;
this.watchService = watchService;
this.callback = callback;
}

@Override
public void run() {
LOG.info(getName() + " thread started");
synchronized (this) {
started = true;
this.notifyAll();
}
try {
runLoop();
} finally {
try {
watchService.close();
} catch (IOException e) {
LOG.warn("Error closing watch service", e);
}
LOG.info(getName() + " thread finished");
}
}

private void runLoop() {
while (!shouldStop) {
WatchKey key;
try {
key = watchService.take();
} catch (InterruptedException|ClosedWatchServiceException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " was interrupted and is shutting down ...");
}
break;
}
for (WatchEvent<?> event : key.pollEvents()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got file changed event: " + event.kind() + " with context: " + event.context());
}
try {
callback.accept(event);
} catch (Throwable e) {
LOG.error("Error from callback", e);
}
}
boolean isKeyValid = key.reset();
if (!isKeyValid) {
// This is likely a problem, it means that file reloading is broken, probably because the
// directory we are watching was deleted or otherwise became inaccessible (unmounted, permissions
// changed, ???).
// For now, we log an error and exit the watcher thread.
LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.zookeeper.common;


import org.apache.zookeeper.common.X509Exception.KeyManagerException;
import org.apache.zookeeper.common.X509Exception.SSLContextException;
import org.apache.zookeeper.common.X509Exception.TrustManagerException;
import org.apache.zookeeper.util.PemReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +44,10 @@
import java.io.IOException;
import java.net.Socket;
import java.security.InvalidAlgorithmParameterException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.security.GeneralSecurityException;
import java.security.KeyManagementException;
import java.security.KeyStore;
Expand All @@ -54,10 +61,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.zookeeper.common.X509Exception.KeyManagerException;
import org.apache.zookeeper.common.X509Exception.SSLContextException;
import org.apache.zookeeper.common.X509Exception.TrustManagerException;
import java.util.function.Consumer;

/**
* Utility code for X509 handling
Expand Down Expand Up @@ -144,6 +148,8 @@ public static StoreFileType fromPropertyValue(String prop) {
private String[] cipherSuites;

private AtomicReference<SSLContext> defaultSSLContext = new AtomicReference<>(null);
private FileChangeWatcher keyStoreFileWatcher;
private FileChangeWatcher trustStoreFileWatcher;

public X509Util() {
String cipherSuitesInput = System.getProperty(cipherSuitesProperty);
Expand Down Expand Up @@ -223,6 +229,11 @@ public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextExceptio
return result;
}

private void resetDefaultSSLContext() throws X509Exception.SSLContextException {
SSLContext newContext = createSSLContext();
defaultSSLContext.set(newContext);
}

private SSLContext createSSLContext() throws SSLContextException {
/*
* Since Configuration initializes the key store and trust store related
Expand Down Expand Up @@ -544,4 +555,109 @@ public static StoreFileType detectStoreFileTypeFromFileExtension(File filename)
}
throw new IOException("Unable to auto-detect store file type from file name: " + filename);
}

/**
* Enables automatic reloading of the trust store and key store files when they change on disk.
*
* @throws IOException if creating the FileChangeWatcher objects fails.
*/
public void enableCertFileReloading() throws IOException {
ZKConfig config = new ZKConfig();
String keyStoreLocation = config.getProperty(sslKeystoreLocationProperty);
if (keyStoreLocation != null && !keyStoreLocation.isEmpty()) {
final Path filePath = Paths.get(keyStoreLocation).toAbsolutePath();
FileChangeWatcher newKeyStoreFileWatcher = new FileChangeWatcher(
filePath.getParent(),
new Consumer<WatchEvent<?>>() {
@Override
public void accept(WatchEvent<?> watchEvent) {
handleWatchEvent(filePath, watchEvent);
}
});
// stop old watcher if there is one
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
keyStoreFileWatcher = newKeyStoreFileWatcher;
}
}
String trustStoreLocation = config.getProperty(sslTruststoreLocationProperty);
if (trustStoreLocation != null && !trustStoreLocation.isEmpty()) {
final Path filePath = Paths.get(trustStoreLocation).toAbsolutePath();
FileChangeWatcher newTrustStoreFileWatcher = new FileChangeWatcher(
filePath.getParent(),
new Consumer<WatchEvent<?>>() {
@Override
public void accept(WatchEvent<?> watchEvent) {
handleWatchEvent(filePath, watchEvent);
}
});
// stop old watcher if there is one
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
trustStoreFileWatcher = newTrustStoreFileWatcher;
}
}
}

/**
* Disables automatic reloading of the trust store and key store files when they change on disk.
* Stops background threads and closes WatchService instances.
*/
public void disableCertFileReloading() {
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
keyStoreFileWatcher = null;
}
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
trustStoreFileWatcher = null;
}
}

// Finalizer guardian object, see Effective Java item 7
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
@Override
protected void finalize() {
disableCertFileReloading();
}
};

/**
* Handler for watch events that let us know a file we may care about has changed on disk.
*
* @param filePath the path to the file we are watching for changes.
* @param event the WatchEvent.
*/
private void handleWatchEvent(Path filePath, WatchEvent<?> event) {
boolean shouldResetContext = false;
Path dirPath = filePath.getParent();
if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
// If we get notified about possibly missed events, reload the key store / trust store just to be sure.
shouldResetContext = true;
} else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) ||
event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) {
Path eventFilePath = dirPath.resolve((Path) event.context());
if (filePath.equals(eventFilePath)) {
shouldResetContext = true;
}
}
// Note: we don't care about delete events
if (shouldResetContext) {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to reset default SSL context after receiving watch event: " +
event.kind() + " with context: " + event.context());
}
try {
this.resetDefaultSSLContext();
} catch (SSLContextException e) {
throw new RuntimeException(e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " +
event.kind() + " with context: " + event.context());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class QuorumPeerConfig {
protected InetSocketAddress secureClientPortAddress;
protected boolean sslQuorum = false;
protected boolean shouldUsePortUnification = false;
protected boolean sslQuorumReloadCertFiles = false;
protected File dataDir;
protected File dataLogDir;
protected String dynamicConfigFileStr = null;
Expand Down Expand Up @@ -317,6 +318,8 @@ public void parseProperties(Properties zkProp)
sslQuorum = Boolean.parseBoolean(value);
} else if (key.equals("portUnification")){
shouldUsePortUnification = Boolean.parseBoolean(value);
} else if (key.equals("sslQuorumReloadCertFiles")) {
sslQuorumReloadCertFiles = Boolean.parseBoolean(value);
} else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public void runFromConfig(QuorumPeerConfig config)
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
Expand Down
Loading

0 comments on commit 65edf69

Please sign in to comment.