Skip to content

Commit

Permalink
ZOOKEEPER-3174: Quorum TLS - support reloading trust/key store
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmaykov committed Nov 2, 2018
1 parent 65abe0d commit cdc84f2
Show file tree
Hide file tree
Showing 5 changed files with 565 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.common;

import com.sun.nio.file.SensitivityWatchEventModifier;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.function.Consumer;

/**
* Instances of this class can be used to watch a directory for file changes. When a file is added to, deleted from,
* or is modified in the given directory, the callback provided by the user will be called from a background thread.
* Some things to keep in mind:
* <ul>
* <li>The callback should be thread-safe.</li>
* <li>Changes that happen around the time the thread is started may be missed.</li>
* <li>There is a delay between a file changing and the callback firing.</li>
* <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
* </ul>
*/
public final class FileChangeWatcher {
private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);

private final WatcherThread watcherThread;

/**
* Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on changes.
*
* @param dirPath the directory to watch.
* @param callback the callback to invoke with events. <code>event.kind()</code> will return the type of event,
* and <code>event.context()</code> will return the filename relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();
if (LOG.isDebugEnabled()) {
LOG.debug("Registering with watch service: " + dirPath);
}
dirPath.register(
watchService,
new WatchEvent.Kind<?>[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW},
SensitivityWatchEventModifier.HIGH);
this.watcherThread = new WatcherThread(watchService, callback);
this.watcherThread.setDaemon(true);
this.watcherThread.start();
}

/**
* Waits for the background thread to enter the main loop before returning. This method exists mostly to make
* the unit tests simpler, which is why it is package private.
*
* @throws InterruptedException if this thread is interrupted while waiting for the background thread to start.
*/
void waitForBackgroundThreadToStart() throws InterruptedException {
synchronized (watcherThread) {
while (!watcherThread.started) {
watcherThread.wait();
}
}
}

/**
* Tells the background thread to stop. Does not wait for it to exit.
*/
public void stop() {
watcherThread.shouldStop = true;
watcherThread.interrupt();
}

/**
* Tells the background thread to stop and waits for it to exit. Only used by unit tests, which is why it is package
* private.
*/
void stopAndJoinBackgroundThread() throws InterruptedException {
stop();
watcherThread.join();
}

/**
* Inner class that implements the watcher thread logic.
*/
private static class WatcherThread extends ZooKeeperThread {
private static final String THREAD_NAME = "FileChangeWatcher";

volatile boolean shouldStop;
volatile boolean started;
final WatchService watchService;
final Consumer<WatchEvent<?>> callback;

WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
super(THREAD_NAME);
this.shouldStop = this.started = false;
this.watchService = watchService;
this.callback = callback;
}

@Override
public void run() {
LOG.info(getName() + " thread started");
synchronized (this) {
started = true;
this.notifyAll();
}
try {
runLoop();
} finally {
try {
watchService.close();
} catch (IOException e) {
LOG.warn("Error closing watch service", e);
}
LOG.info(getName() + " thread finished");
}
}

private void runLoop() {
while (!shouldStop) {
WatchKey key;
try {
key = watchService.take();
} catch (InterruptedException|ClosedWatchServiceException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " was interrupted and is shutting down ...");
}
break;
}
for (WatchEvent<?> event : key.pollEvents()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got file changed event: " + event.kind() + " with context: " + event.context());
}
try {
callback.accept(event);
} catch (Throwable e) {
LOG.error("Error from callback", e);
}
}
boolean isKeyValid = key.reset();
if (!isKeyValid) {
// This is likely a problem, it means that file reloading is broken, probably because the
// directory we are watching was deleted or otherwise became inaccessible (unmounted, permissions
// changed, ???).
// For now, we log an error and exit the watcher thread.
LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.security.GeneralSecurityException;
import java.security.KeyManagementException;
import java.security.KeyStore;
Expand All @@ -30,6 +34,7 @@
import java.security.cert.X509CertSelector;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.net.ssl.CertPathTrustManagerParameters;
import javax.net.ssl.KeyManager;
Expand Down Expand Up @@ -93,6 +98,8 @@ public abstract class X509Util {
private String[] cipherSuites;

private AtomicReference<SSLContext> defaultSSLContext = new AtomicReference<>(null);
private FileChangeWatcher keyStoreFileWatcher;
private FileChangeWatcher trustStoreFileWatcher;

public X509Util() {
String cipherSuitesInput = System.getProperty(cipherSuitesProperty);
Expand Down Expand Up @@ -172,6 +179,11 @@ public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextExceptio
return result;
}

private void resetDefaultSSLContext() throws X509Exception.SSLContextException {
SSLContext newContext = createSSLContext();
defaultSSLContext.set(newContext);
}

private SSLContext createSSLContext() throws SSLContextException {
/*
* Since Configuration initializes the key store and trust store related
Expand Down Expand Up @@ -446,4 +458,117 @@ private String[] getDefaultCipherSuites() {
LOG.debug("Using Java8-optimized cipher suites for Java version {}", javaVersion);
return DEFAULT_CIPHERS_JAVA8;
}

/**
* Enables automatic reloading of the trust store and key store files when they change on disk.
*
* @throws IOException if creating the FileChangeWatcher objects fails.
*/
public void enableCertFileReloading() throws IOException {
LOG.info("enabling cert file reloading");
ZKConfig config = new ZKConfig();
String keyStoreLocation = config.getProperty(sslKeystoreLocationProperty);
if (keyStoreLocation != null && !keyStoreLocation.isEmpty()) {
final Path filePath = Paths.get(keyStoreLocation).toAbsolutePath();
Path parentPath = filePath.getParent();
if (parentPath == null) {
throw new IOException(
"Key store path does not have a parent: " + filePath);
}
FileChangeWatcher newKeyStoreFileWatcher = new FileChangeWatcher(
parentPath,
watchEvent -> {
handleWatchEvent(filePath, watchEvent);
});
// stop old watcher if there is one
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
}
keyStoreFileWatcher = newKeyStoreFileWatcher;
}
String trustStoreLocation = config.getProperty(sslTruststoreLocationProperty);
if (trustStoreLocation != null && !trustStoreLocation.isEmpty()) {
final Path filePath = Paths.get(trustStoreLocation).toAbsolutePath();
Path parentPath = filePath.getParent();
if (parentPath == null) {
throw new IOException(
"Trust store path does not have a parent: " + filePath);
}
FileChangeWatcher newTrustStoreFileWatcher = new FileChangeWatcher(
parentPath,
watchEvent -> {
handleWatchEvent(filePath, watchEvent);
});
// stop old watcher if there is one
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
}
trustStoreFileWatcher = newTrustStoreFileWatcher;
}
}

/**
* Disables automatic reloading of the trust store and key store files when they change on disk.
* Stops background threads and closes WatchService instances.
*/
public void disableCertFileReloading() {
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
keyStoreFileWatcher = null;
}
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
trustStoreFileWatcher = null;
}
}

// Finalizer guardian object, see Effective Java item 7
// TODO: finalize() is deprecated starting with Java 10. This needs to be
// replaced with an explicit shutdown call.
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
@SuppressWarnings("deprecation")
@Override
protected void finalize() {
disableCertFileReloading();
}
};

/**
* Handler for watch events that let us know a file we may care about has changed on disk.
*
* @param filePath the path to the file we are watching for changes.
* @param event the WatchEvent.
*/
private void handleWatchEvent(Path filePath, WatchEvent<?> event) {
boolean shouldResetContext = false;
Path dirPath = filePath.getParent();
if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
// If we get notified about possibly missed events, reload the key store / trust store just to be sure.
shouldResetContext = true;
} else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) ||
event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) {
Path eventFilePath = dirPath.resolve((Path) event.context());
if (filePath.equals(eventFilePath)) {
shouldResetContext = true;
}
}
// Note: we don't care about delete events
if (shouldResetContext) {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to reset default SSL context after receiving watch event: " +
event.kind() + " with context: " + event.context());
}
try {
this.resetDefaultSSLContext();
} catch (SSLContextException e) {
throw new RuntimeException(e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " +
event.kind() + " with context: " + event.context());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class QuorumPeerConfig {
protected InetSocketAddress secureClientPortAddress;
protected boolean sslQuorum = false;
protected boolean shouldUsePortUnification = false;
protected boolean sslQuorumReloadCertFiles = false;
protected File dataDir;
protected File dataLogDir;
protected String dynamicConfigFileStr = null;
Expand Down Expand Up @@ -317,6 +318,8 @@ public void parseProperties(Properties zkProp)
sslQuorum = Boolean.parseBoolean(value);
} else if (key.equals("portUnification")){
shouldUsePortUnification = Boolean.parseBoolean(value);
} else if (key.equals("sslQuorumReloadCertFiles")) {
sslQuorumReloadCertFiles = Boolean.parseBoolean(value);
} else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public void runFromConfig(QuorumPeerConfig config)
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
Expand Down
Loading

0 comments on commit cdc84f2

Please sign in to comment.