Skip to content

Commit

Permalink
ZOOKEEPER-3174: Quorum TLS - support reloading trust/key store
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmaykov committed Nov 1, 2018
1 parent 06b1249 commit 232232e
Show file tree
Hide file tree
Showing 5 changed files with 574 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.common;

import com.sun.nio.file.SensitivityWatchEventModifier;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.function.Consumer;

/**
* Instances of this class can be used to watch a directory for file changes. When a file is added to, deleted from,
* or is modified in the given directory, the callback provided by the user will be called from a background thread.
* Some things to keep in mind:
* <ul>
* <li>The callback should be thread-safe.</li>
* <li>Changes that happen around the time the thread is started may be missed.</li>
* <li>There is a delay between a file changing and the callback firing.</li>
* <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
* </ul>
*/
public final class FileChangeWatcher {
private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);

private final WatcherThread watcherThread;

/**
* Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on changes.
*
* @param dirPath the directory to watch.
* @param callback the callback to invoke with events. <code>event.kind()</code> will return the type of event,
* and <code>event.context()</code> will return the filename relative to <code>dirPath</code>.
* @throws IOException if there is an error creating the WatchService.
*/
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
FileSystem fs = dirPath.getFileSystem();
WatchService watchService = fs.newWatchService();
if (LOG.isDebugEnabled()) {
LOG.debug("Registering with watch service: " + dirPath);
}
dirPath.register(
watchService,
new WatchEvent.Kind<?>[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW},
SensitivityWatchEventModifier.HIGH);
this.watcherThread = new WatcherThread(watchService, callback);
this.watcherThread.setDaemon(true);
this.watcherThread.start();
}

/**
* Waits for the background thread to enter the main loop before returning. This method exists mostly to make
* the unit tests simpler, which is why it is package private.
*
* @throws InterruptedException if this thread is interrupted while waiting for the background thread to start.
*/
void waitForBackgroundThreadToStart() throws InterruptedException {
synchronized (watcherThread) {
while (!watcherThread.started) {
watcherThread.wait();
}
}
}

/**
* Tells the background thread to stop. Does not wait for it to exit.
*/
public void stop() {
watcherThread.shouldStop = true;
watcherThread.interrupt();
}

/**
* Tells the background thread to stop and waits for it to exit. Only used by unit tests, which is why it is package
* private.
*/
void stopAndJoinBackgroundThread() throws InterruptedException {
stop();
watcherThread.join();
}

/**
* Inner class that implements the watcher thread logic.
*/
private static class WatcherThread extends ZooKeeperThread {
private static final String THREAD_NAME = "FileChangeWatcher";

volatile boolean shouldStop;
volatile boolean started;
final WatchService watchService;
final Consumer<WatchEvent<?>> callback;

WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
super(THREAD_NAME);
this.shouldStop = this.started = false;
this.watchService = watchService;
this.callback = callback;
}

@Override
public void run() {
LOG.info(getName() + " thread started");
synchronized (this) {
started = true;
this.notifyAll();
}
try {
runLoop();
} finally {
try {
watchService.close();
} catch (IOException e) {
LOG.warn("Error closing watch service", e);
}
LOG.info(getName() + " thread finished");
}
}

private void runLoop() {
while (!shouldStop) {
WatchKey key;
try {
key = watchService.take();
} catch (InterruptedException|ClosedWatchServiceException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + " was interrupted and is shutting down ...");
}
break;
}
for (WatchEvent<?> event : key.pollEvents()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got file changed event: " + event.kind() + " with context: " + event.context());
}
try {
callback.accept(event);
} catch (Throwable e) {
LOG.error("Error from callback", e);
}
}
boolean isKeyValid = key.reset();
if (!isKeyValid) {
// This is likely a problem, it means that file reloading is broken, probably because the
// directory we are watching was deleted or otherwise became inaccessible (unmounted, permissions
// changed, ???).
// For now, we log an error and exit the watcher thread.
LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.security.GeneralSecurityException;
import java.security.KeyManagementException;
import java.security.KeyStore;
Expand All @@ -31,6 +35,7 @@
import java.security.cert.X509CertSelector;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import javax.net.ssl.CertPathTrustManagerParameters;
import javax.net.ssl.KeyManager;
Expand Down Expand Up @@ -94,6 +99,8 @@ public abstract class X509Util {
private String[] cipherSuites;

private AtomicReference<SSLContext> defaultSSLContext = new AtomicReference<>(null);
private FileChangeWatcher keyStoreFileWatcher;
private FileChangeWatcher trustStoreFileWatcher;

public X509Util() {
String cipherSuitesInput = System.getProperty(cipherSuitesProperty);
Expand Down Expand Up @@ -173,6 +180,11 @@ public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextExceptio
return result;
}

private void resetDefaultSSLContext() throws X509Exception.SSLContextException {
SSLContext newContext = createSSLContext();
defaultSSLContext.set(newContext);
}

private SSLContext createSSLContext() throws SSLContextException {
/*
* Since Configuration initializes the key store and trust store related
Expand Down Expand Up @@ -465,4 +477,110 @@ private String[] getDefaultCipherSuites() {
LOG.debug("Using Java8-optimized cipher suites for Java version {}", javaVersion);
return DEFAULT_CIPHERS_JAVA8;
}

/**
* Enables automatic reloading of the trust store and key store files when they change on disk.
*
* @throws IOException if creating the FileChangeWatcher objects fails.
*/
public void enableCertFileReloading() throws IOException {
LOG.info("enabling cert file reloading");
ZKConfig config = new ZKConfig();
String keyStoreLocation = config.getProperty(sslKeystoreLocationProperty);

This comment has been minimized.

Copy link
@eolivelli

eolivelli Nov 1, 2018

Contributor

This sounds weird. It seems that we are simply using a default value.

This comment has been minimized.

Copy link
@ivmaykov

ivmaykov Nov 2, 2018

Author Contributor

ZKConfig will get the value from system properties. In the next PR this is changed - you can create a X509Util with a ZKConfig parameter, in which case it will be used here.

This comment has been minimized.

Copy link
@eolivelli

eolivelli Nov 2, 2018

Contributor

sorry.
I left this comment before starting the review of the next commit.

if (keyStoreLocation != null && !keyStoreLocation.isEmpty()) {
final Path filePath = Paths.get(keyStoreLocation).toAbsolutePath();
FileChangeWatcher newKeyStoreFileWatcher = new FileChangeWatcher(
filePath.getParent(),
new Consumer<WatchEvent<?>>() {
@Override
public void accept(WatchEvent<?> watchEvent) {
handleWatchEvent(filePath, watchEvent);
}
});
// stop old watcher if there is one
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
}
keyStoreFileWatcher = newKeyStoreFileWatcher;
}
String trustStoreLocation = config.getProperty(sslTruststoreLocationProperty);
if (trustStoreLocation != null && !trustStoreLocation.isEmpty()) {
final Path filePath = Paths.get(trustStoreLocation).toAbsolutePath();
FileChangeWatcher newTrustStoreFileWatcher = new FileChangeWatcher(
filePath.getParent(),
new Consumer<WatchEvent<?>>() {
@Override
public void accept(WatchEvent<?> watchEvent) {
handleWatchEvent(filePath, watchEvent);
}
});
// stop old watcher if there is one
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
}
trustStoreFileWatcher = newTrustStoreFileWatcher;
}
}

/**
* Disables automatic reloading of the trust store and key store files when they change on disk.
* Stops background threads and closes WatchService instances.
*/
public void disableCertFileReloading() {
if (keyStoreFileWatcher != null) {
keyStoreFileWatcher.stop();
keyStoreFileWatcher = null;
}
if (trustStoreFileWatcher != null) {
trustStoreFileWatcher.stop();
trustStoreFileWatcher = null;
}
}

// Finalizer guardian object, see Effective Java item 7
@SuppressWarnings("unused")
private final Object finalizerGuardian = new Object() {
@Override
protected void finalize() {

This comment has been minimized.

Copy link
@eolivelli

eolivelli Nov 1, 2018

Contributor

I suggest drop any 'finalize'.
It has been deprecated since Java 10.

This comment has been minimized.

Copy link
@ivmaykov

ivmaykov Nov 2, 2018

Author Contributor

Is there another way in Java 10 to make sure the background reloader threads are shut down when an X509Util is eligible for GC?

This comment has been minimized.

Copy link
@eolivelli

eolivelli Nov 2, 2018

Contributor

The best is to have complete control over this.
We should run disableCertFileReloading in an explicit "shutdown"/"close" method.

We cannot count on Java finalization, I suggest to drop this "finalizerGuardian", we have full control on both server and client stopping/close procedures so I think we can safely rely on:

  • server shutdown (the process will die....)
  • client calls ZooKeeper#close (it it does not so the application is already buggy)

This comment has been minimized.

Copy link
@ivmaykov

ivmaykov Nov 2, 2018

Author Contributor

That seems a bit fragile and could potentially introduce bugs, since we will need an explicit shutdown method where none existed before. Since we are currently targeting Java 8, can I keep the finalizer in this PR and fix it in a later PR + JIRA?

This comment has been minimized.

Copy link
@eolivelli

eolivelli Nov 2, 2018

Contributor

To me is okay a new JIRA.

When building on JDK10 you wlil have a "deprecation" warning, I think we should add a SuppressWarnings annotation.

ZK is building and running fine on JDK10/JDK11, please check

disableCertFileReloading();
}
};

/**
* Handler for watch events that let us know a file we may care about has changed on disk.
*
* @param filePath the path to the file we are watching for changes.
* @param event the WatchEvent.
*/
private void handleWatchEvent(Path filePath, WatchEvent<?> event) {
boolean shouldResetContext = false;
Path dirPath = filePath.getParent();
if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) {
// If we get notified about possibly missed events, reload the key store / trust store just to be sure.
shouldResetContext = true;
} else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) ||
event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) {
Path eventFilePath = dirPath.resolve((Path) event.context());
if (filePath.equals(eventFilePath)) {
shouldResetContext = true;
}
}
// Note: we don't care about delete events
if (shouldResetContext) {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to reset default SSL context after receiving watch event: " +
event.kind() + " with context: " + event.context());
}
try {
this.resetDefaultSSLContext();
} catch (SSLContextException e) {
throw new RuntimeException(e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " +
event.kind() + " with context: " + event.context());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class QuorumPeerConfig {
protected InetSocketAddress secureClientPortAddress;
protected boolean sslQuorum = false;
protected boolean shouldUsePortUnification = false;
protected boolean sslQuorumReloadCertFiles = false;
protected File dataDir;
protected File dataLogDir;
protected String dynamicConfigFileStr = null;
Expand Down Expand Up @@ -317,6 +318,8 @@ public void parseProperties(Properties zkProp)
sslQuorum = Boolean.parseBoolean(value);
} else if (key.equals("portUnification")){
shouldUsePortUnification = Boolean.parseBoolean(value);
} else if (key.equals("sslQuorumReloadCertFiles")) {
sslQuorumReloadCertFiles = Boolean.parseBoolean(value);
} else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) {
throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public void runFromConfig(QuorumPeerConfig config)
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
Expand Down
Loading

0 comments on commit 232232e

Please sign in to comment.