Skip to content

Commit

Permalink
Backport native TLS to hubspot-2.5 (#49)
Browse files Browse the repository at this point in the history
* HBASE-27185 Rewrite NettyRpcServer to decode rpc request with netty handler (apache#4624)
* HBASE-27185 Addendum fix TestShadeSaslAuthenticationProvider
* HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (apache#4676)
* HBASE-26666 Add native TLS encryption support to RPC server/client (apache#4666)
* HBASE-27278 Improve TestTlsIPC to reuse existing IPC test code (apache#4682)
* HBASE-27279 Make SslHandler work with SaslWrapHandler/SaslUnwrapHandler (apache#4705)
* HBASE-27342 Use Hadoop Credentials API to retrieve passwords of TLS key/trust stores (apache#4751)
* HBASE-27346 Autodetect key/truststore file type from file extension (apache#4757)
* HBASE-27280 Add mutual authentication support to TLS (apache#4796)
* HBASE-27673 Fix mTLS client hostname verification (apache#5066)
* HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (branch-2) (apache#4897)
* HBASE-27779 Make X509Util config constants public
* HBASE-27578 Upgrade hbase-thirdparty to 4.1.4 (apache#4985)
  • Loading branch information
bbeaudreault authored Apr 7, 2023
1 parent b01e8d5 commit 51bdd16
Show file tree
Hide file tree
Showing 76 changed files with 8,049 additions and 944 deletions.
3 changes: 3 additions & 0 deletions bin/hbase
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ show_usage() {
echo " cellcounter Run CellCounter tool"
echo " pre-upgrade Run Pre-Upgrade validator tool"
echo " hbtop Run HBTop tool"
echo " credential Run the Hadoop Credential Shell"
echo " CLASSNAME Run the class named CLASSNAME"
}

Expand Down Expand Up @@ -759,6 +760,8 @@ elif [ "$COMMAND" = "hbtop" ] ; then
HBASE_HBTOP_OPTS="${HBASE_HBTOP_OPTS} -Dlog4j2.configurationFile=file:${HBASE_HOME}/conf/log4j2-hbtop.properties"
fi
HBASE_OPTS="${HBASE_OPTS} ${HBASE_HBTOP_OPTS}"
elif [ "$COMMAND" = "credential" ] ; then
CLASS='org.apache.hadoop.security.alias.CredentialShell'
else
CLASS=$COMMAND
if [[ "$CLASS" =~ .*IntegrationTest.* ]] ; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,13 @@ public static UserGroupInformation loginAndReturnUGI(Configuration conf, String
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
return ugi;
}

public static UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
throws Exception {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
return UserGroupInformation.getLoginUser();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static String getDefaultCodec(final Configuration c) {
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @return Codec to use on this client.
*/
Codec getCodec() {
protected Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
Expand All @@ -251,7 +251,7 @@ public boolean hasCellBlockSupport() {
}

// for writing tests that want to throw exception when connecting.
boolean isTcpNoDelay() {
protected boolean isTcpNoDelay() {
return tcpNoDelay;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.X509Exception;
import org.apache.hadoop.hbase.io.FileChangeWatcher;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -31,6 +35,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;

/**
Expand All @@ -45,6 +50,9 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
final Class<? extends Channel> channelClass;

private final boolean shutdownGroupWhenClose;
private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();

public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
Expand All @@ -67,7 +75,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre
}

/** Used in test only. */
NettyRpcClient(Configuration configuration) {
public NettyRpcClient(Configuration configuration) {
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
}

Expand All @@ -81,5 +89,31 @@ protected void closeInternal() {
if (shutdownGroupWhenClose) {
NettyFutureUtils.consume(group.shutdownGracefully());
}
FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
if (ks != null) {
ks.stop();
}
FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
if (ts != null) {
ts.stop();
}
}

SslContext getSslContext() throws X509Exception, IOException {
SslContext result = sslContextForClient.get();
if (result == null) {
result = X509Util.createSslContextForClient(conf);
if (!sslContextForClient.compareAndSet(null, result)) {
// lost the race, another thread already set the value
result = sslContextForClient.get();
} else if (
keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
&& conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
) {
X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
() -> sslContextForClient.set(null));
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
Expand All @@ -56,6 +57,8 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -219,16 +222,14 @@ private void saslNegotiate(final Channel ch) {
return;
}
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
.addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME,
saslHandler);
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {

@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(SaslChallengeDecoder.class);
p.remove(NettyHBaseSaslRpcClientHandler.class);

// check if negotiate with server for connection header is necessary
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
Expand All @@ -250,7 +251,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
ChannelPipeline p = ch.pipeline();
p.remove(readTimeoutHandlerName);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHBaseRpcConnectionHeaderHandler
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
established(ch);
} else {
Expand Down Expand Up @@ -283,26 +284,26 @@ private void connect() throws UnknownHostException {
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
SslContext sslContext = rpcClient.getSslContext();
SslHandler sslHandler = sslContext.newHandler(ch.alloc(),
remoteId.address.getHostName(), remoteId.address.getPort());
sslHandler.setHandshakeTimeoutMillis(
conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT,
X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS));
ch.pipeline().addFirst(sslHandler);
LOG.info("SSL handler added with handshake timeout {} ms",
sslHandler.getHandshakeTimeoutMillis());
}
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
new BufferCallBeforeInitHandler());
}
}).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
IOException ex = toIOE(future.cause());
LOG.warn(
"Exception encountered while connecting to the server " + remoteId.getAddress(), ex);
failInit(ch, ex);
rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), future.cause());
return;
}
private void succeed(Channel ch) throws IOException {
NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate());
if (useSasl) {
saslNegotiate(ch);
Expand All @@ -312,6 +313,35 @@ public void operationComplete(ChannelFuture future) throws Exception {
established(ch);
}
}

private void fail(Channel ch, Throwable error) {
IOException ex = toIOE(error);
LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
ex);
failInit(ch, ex);
rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), error);
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel ch = future.channel();
if (!future.isSuccess()) {
fail(ch, future.cause());
return;
}
SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
if (sslHandler != null) {
NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> {
if (f.isSuccess()) {
succeed(ch);
} else {
fail(ch, f.cause());
}
});
} else {
succeed(ch);
}
}
}).channel();
}

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;

import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
Expand Down Expand Up @@ -92,10 +91,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
* Remove handlers for sasl encryption and add handlers for Crypto AES encryption
*/
private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
p.remove(SaslWrapHandler.class);
p.remove(SaslUnwrapHandler.class);
String lengthDecoder = p.context(LengthFieldBasedFrameDecoder.class).name();
p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap));
p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ public NettyHBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvi
super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection);
}

public void setupSaslHandler(ChannelPipeline p) {
public void setupSaslHandler(ChannelPipeline p, String addAfter) {
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
LOG.trace("SASL client context established. Negotiated QoP {}", qop);
if (qop == null || "auth".equalsIgnoreCase(qop)) {
return;
}
// add wrap and unwrap handlers to pipeline.
p.addFirst(new SaslWrapHandler(saslClient),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new SaslUnwrapHandler(saslClient));
p.addAfter(addAfter, null, new SaslUnwrapHandler(saslClient::unwrap))
.addAfter(addAfter, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
.addAfter(addAfter, null, new SaslWrapHandler(saslClient::wrap));
}

public String getSaslQOP() {
Expand Down
Loading

0 comments on commit 51bdd16

Please sign in to comment.