Skip to content

Commit

Permalink
Add nio http transport to security plugin (#32018)
Browse files Browse the repository at this point in the history
This is related to #27260. It adds the SecurityNioHttpServerTransport
to the security plugin. It randomly uses the nio http transport in
security integration tests.
  • Loading branch information
Tim-Brooks authored Jul 13, 2018
1 parent 3679d00 commit 305bfea
Show file tree
Hide file tree
Showing 23 changed files with 750 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
}
}

public Channel getLowLevelChannel() {
public Channel getNettyChannel() {
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
private final NioHttpChannel nioHttpChannel;
private final NioHttpServerTransport transport;

HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings,
NioCorsConfig corsConfig) {
public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings,
NioCorsConfig corsConfig) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public class NioHttpChannel extends NioSocketChannel implements HttpChannel {

NioHttpChannel(SocketChannel socketChannel) {
public NioHttpChannel(SocketChannel socketChannel) {
super(socketChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.nio.NioServerSocketChannel;

import java.io.IOException;
import java.nio.channels.ServerSocketChannel;

public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel {

NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException {
public NioHttpServerChannel(ServerSocketChannel serverSocketChannel) {
super(serverSocketChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.BytesChannelContext;
Expand Down Expand Up @@ -87,21 +86,21 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope);

private final PageCacheRecycler pageCacheRecycler;
protected final PageCacheRecycler pageCacheRecycler;
protected final NioCorsConfig corsConfig;

private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
private final int tcpSendBufferSize;
private final int tcpReceiveBufferSize;
protected final boolean tcpNoDelay;
protected final boolean tcpKeepAlive;
protected final boolean reuseAddress;
protected final int tcpSendBufferSize;
protected final int tcpReceiveBufferSize;

private NioGroup nioGroup;
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;
private ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory;

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
HttpServerTransport.Dispatcher dispatcher) {
Dispatcher dispatcher) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
this.pageCacheRecycler = pageCacheRecycler;

Expand Down Expand Up @@ -136,7 +135,7 @@ protected void doStart() {
nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount,
(s) -> new EventHandler(this::onNonChannelException, s));
channelFactory = new HttpChannelFactory();
channelFactory = channelFactory();
bindServer();
success = true;
} catch (IOException e) {
Expand All @@ -162,6 +161,10 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOExcep
return nioGroup.bindServerChannel(socketAddress, channelFactory);
}

protected ChannelFactory<NioHttpServerChannel, NioHttpChannel> channelFactory() {
return new HttpChannelFactory();
}

static NioCorsConfig buildCorsConfig(Settings settings) {
if (SETTING_CORS_ENABLED.get(settings) == false) {
return NioCorsConfigBuilder.forOrigins().disable().build();
Expand Down Expand Up @@ -194,7 +197,7 @@ static NioCorsConfig buildCorsConfig(Settings settings) {
.build();
}

private void acceptChannel(NioSocketChannel socketChannel) {
protected void acceptChannel(NioSocketChannel socketChannel) {
super.serverAcceptedChannel((HttpChannel) socketChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@
import org.elasticsearch.xpack.security.rest.action.user.RestPutUserAction;
import org.elasticsearch.xpack.security.rest.action.user.RestSetEnabledAction;
import org.elasticsearch.xpack.security.support.SecurityIndexManager;
import org.elasticsearch.xpack.security.transport.SecurityHttpSettings;
import org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.security.transport.nio.SecurityNioHttpServerTransport;
import org.elasticsearch.xpack.security.transport.nio.SecurityNioTransport;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -511,21 +513,22 @@ static Settings additionalSettings(final Settings settings, final boolean enable

if (NetworkModule.HTTP_TYPE_SETTING.exists(settings)) {
final String httpType = NetworkModule.HTTP_TYPE_SETTING.get(settings);
if (httpType.equals(SecurityField.NAME4)) {
SecurityNetty4HttpServerTransport.overrideSettings(builder, settings);
if (httpType.equals(SecurityField.NAME4) || httpType.equals(SecurityField.NIO)) {
SecurityHttpSettings.overrideSettings(builder, settings);
} else {
final String message = String.format(
Locale.ROOT,
"http type setting [%s] must be [%s] but is [%s]",
"http type setting [%s] must be [%s] or [%s] but is [%s]",
NetworkModule.HTTP_TYPE_KEY,
SecurityField.NAME4,
SecurityField.NIO,
httpType);
throw new IllegalArgumentException(message);
}
} else {
// default to security4
builder.put(NetworkModule.HTTP_TYPE_KEY, SecurityField.NAME4);
SecurityNetty4HttpServerTransport.overrideSettings(builder, settings);
SecurityHttpSettings.overrideSettings(builder, settings);
}
builder.put(SecuritySettings.addUserSettings(settings));
return builder.build();
Expand Down Expand Up @@ -869,8 +872,14 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
if (enabled == false) { // don't register anything if we are not enabled
return Collections.emptyMap();
}
return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings,
networkService, bigArrays, ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher));

Map<String, Supplier<HttpServerTransport>> httpTransports = new HashMap<>();
httpTransports.put(SecurityField.NAME4, () -> new SecurityNetty4HttpServerTransport(settings, networkService, bigArrays,
ipFilter.get(), getSslService(), threadPool, xContentRegistry, dispatcher));
httpTransports.put(SecurityField.NIO, () -> new SecurityNioHttpServerTransport(settings, networkService, bigArrays,
pageCacheRecycler, threadPool, xContentRegistry, dispatcher, ipFilter.get(), getSslService()));

return httpTransports;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.security.rest;

import io.netty.channel.Channel;
import io.netty.handler.ssl.SslHandler;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
Expand All @@ -15,7 +13,6 @@
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.netty4.Netty4HttpChannel;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -24,7 +21,7 @@
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.xpack.core.security.rest.RestRequestFilter;
import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.transport.ServerTransportFilter;
import org.elasticsearch.xpack.security.transport.SSLEngineUtils;

import java.io.IOException;

Expand Down Expand Up @@ -53,10 +50,7 @@ public void handleRequest(RestRequest request, RestChannel channel, NodeClient c
// CORS - allow for preflight unauthenticated OPTIONS request
if (extractClientCertificate) {
HttpChannel httpChannel = request.getHttpChannel();
Channel nettyChannel = ((Netty4HttpChannel) httpChannel).getNettyChannel();
SslHandler handler = nettyChannel.pipeline().get(SslHandler.class);
assert handler != null;
ServerTransportFilter.extractClientCertificates(logger, threadContext, handler.engine(), nettyChannel);
SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
}
service.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(
authentication -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport;

import io.netty.channel.Channel;
import io.netty.handler.ssl.SslHandler;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.netty4.Netty4HttpChannel;
import org.elasticsearch.http.nio.NioHttpChannel;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import org.elasticsearch.transport.nio.NioTcpChannel;
import org.elasticsearch.xpack.security.authc.pki.PkiRealm;
import org.elasticsearch.xpack.security.transport.nio.SSLChannelContext;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;

public class SSLEngineUtils {

private SSLEngineUtils() {}

public static void extractClientCertificates(Logger logger, ThreadContext threadContext, HttpChannel httpChannel) {
SSLEngine sslEngine = getSSLEngine(httpChannel);
extract(logger, threadContext, sslEngine, httpChannel);
}

public static void extractClientCertificates(Logger logger, ThreadContext threadContext, TcpChannel tcpChannel) {
SSLEngine sslEngine = getSSLEngine(tcpChannel);
extract(logger, threadContext, sslEngine, tcpChannel);
}

public static SSLEngine getSSLEngine(HttpChannel httpChannel) {
if (httpChannel instanceof Netty4HttpChannel) {
Channel nettyChannel = ((Netty4HttpChannel) httpChannel).getNettyChannel();
SslHandler handler = nettyChannel.pipeline().get(SslHandler.class);
assert handler != null : "Must have SslHandler";
return handler.engine();
} else if (httpChannel instanceof NioHttpChannel) {
SocketChannelContext context = ((NioHttpChannel) httpChannel).getContext();
assert context instanceof SSLChannelContext : "Must be SSLChannelContext.class, found: " + context.getClass();
return ((SSLChannelContext) context).getSSLEngine();
} else {
throw new AssertionError("Unknown channel class type: " + httpChannel.getClass());
}
}

public static SSLEngine getSSLEngine(TcpChannel tcpChannel) {
if (tcpChannel instanceof Netty4TcpChannel) {
Channel nettyChannel = ((Netty4TcpChannel) tcpChannel).getNettyChannel();
SslHandler handler = nettyChannel.pipeline().get(SslHandler.class);
assert handler != null : "Must have SslHandler";
return handler.engine();
} else if (tcpChannel instanceof NioTcpChannel) {
SocketChannelContext context = ((NioTcpChannel) tcpChannel).getContext();
assert context instanceof SSLChannelContext : "Must be SSLChannelContext.class, found: " + context.getClass();
return ((SSLChannelContext) context).getSSLEngine();
} else {
throw new AssertionError("Unknown channel class type: " + tcpChannel.getClass());
}
}

private static void extract(Logger logger, ThreadContext threadContext, SSLEngine sslEngine, Object channel) {
try {
Certificate[] certs = sslEngine.getSession().getPeerCertificates();
if (certs instanceof X509Certificate[]) {
threadContext.putTransient(PkiRealm.PKI_CERT_HEADER_NAME, certs);
}
} catch (SSLPeerUnverifiedException e) {
// this happens when client authentication is optional and the client does not provide credentials. If client
// authentication was required then this connection should be closed before ever getting into this class
assert sslEngine.getNeedClientAuth() == false;
assert sslEngine.getWantClientAuth();
if (logger.isTraceEnabled()) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"SSL Peer did not present a certificate on channel [{}]", channel), e);
} else if (logger.isDebugEnabled()) {
logger.debug("SSL Peer did not present a certificate on channel [{}]", channel);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.http.HttpChannel;

import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.core.security.transport.SSLExceptionHelper.isCloseDuringHandshakeException;
import static org.elasticsearch.xpack.core.security.transport.SSLExceptionHelper.isNotSslRecordException;
import static org.elasticsearch.xpack.core.security.transport.SSLExceptionHelper.isReceivedCertificateUnknownException;

public final class SecurityHttpExceptionHandler implements BiConsumer<HttpChannel, Exception> {

private final Lifecycle lifecycle;
private final Logger logger;
private final BiConsumer<HttpChannel, Exception> fallback;

public SecurityHttpExceptionHandler(Logger logger, Lifecycle lifecycle, BiConsumer<HttpChannel, Exception> fallback) {
this.lifecycle = lifecycle;
this.logger = logger;
this.fallback = fallback;
}

public void accept(HttpChannel channel, Exception e) {
if (!lifecycle.started()) {
return;
}

if (isNotSslRecordException(e)) {
if (logger.isTraceEnabled()) {
logger.trace(new ParameterizedMessage("received plaintext http traffic on a https channel, closing connection {}",
channel), e);
} else {
logger.warn("received plaintext http traffic on a https channel, closing connection {}", channel);
}
CloseableChannel.closeChannel(channel);
} else if (isCloseDuringHandshakeException(e)) {
if (logger.isTraceEnabled()) {
logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e);
} else {
logger.warn("connection {} closed during ssl handshake", channel);
}
CloseableChannel.closeChannel(channel);
} else if (isReceivedCertificateUnknownException(e)) {
if (logger.isTraceEnabled()) {
logger.trace(new ParameterizedMessage("http client did not trust server's certificate, closing connection {}",
channel), e);
} else {
logger.warn("http client did not trust this server's certificate, closing connection {}", channel);
}
CloseableChannel.closeChannel(channel);
} else {
fallback.accept(channel, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport;

import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION;
import static org.elasticsearch.xpack.core.XPackSettings.HTTP_SSL_ENABLED;

public final class SecurityHttpSettings {

private SecurityHttpSettings() {}

public static void overrideSettings(Settings.Builder settingsBuilder, Settings settings) {
if (HTTP_SSL_ENABLED.get(settings) && SETTING_HTTP_COMPRESSION.exists(settings) == false) {
settingsBuilder.put(SETTING_HTTP_COMPRESSION.getKey(), false);
}
}
}
Loading

0 comments on commit 305bfea

Please sign in to comment.