Skip to content

Commit

Permalink
Set specific keepalive options by default on supported platforms (#59278
Browse files Browse the repository at this point in the history
)

keepalives tell any intermediate devices that the connection remains alive, which helps with overzealous firewalls that are
killing idle connections. keepalives are enabled by default in Elasticsearch, but use system defaults for their
configuration, which often times do not have reasonable defaults (e.g. 7200s for TCP_KEEP_IDLE) in the context of
distributed systems such as Elasticsearch.

This PR sets the socket-level keep_alive options for network.tcp.{keep_idle,keep_interval} to 5 minutes on configurations
that support it (>= Java 11 & (MacOS || Linux)) and where the system defaults are set to something higher than 5
minutes. This helps keep the connections alive while not interfering with system defaults or user-specified settings
unless they are deemed to be set too high by providing better out-of-the-box defaults.
  • Loading branch information
ywelsch committed Jul 28, 2020
1 parent 981e436 commit ffe114b
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 64 deletions.
37 changes: 18 additions & 19 deletions docs/reference/modules/transport.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,30 @@ this node connects to other nodes in the cluster.
The following parameters can be configured on each transport profile, as in the
example above:

* `port`: The port to bind to
* `bind_host`: The host to bind
* `publish_host`: The host which is published in informational APIs
* `tcp.no_delay`: Configures the `TCP_NO_DELAY` option for this socket
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket
* `port`: The port to which to bind.
* `bind_host`: The host to which to bind.
* `publish_host`: The host which is published in informational APIs.
* `tcp.no_delay`: Configures the `TCP_NO_DELAY` option for this socket.
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket, which
determines whether it sends TCP keepalive probes.
* `tcp.keep_idle`: Configures the `TCP_KEEPIDLE` option for this socket, which
determines the time in seconds that a connection must be idle before
starting to send TCP keepalive probes.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
starting to send TCP keepalive probes. Defaults to `-1` which means to use
the smaller of 300 or the system default. May not be greater than 300. Only
applicable on Linux and macOS, and requires Java 11 or newer.
* `tcp.keep_interval`: Configures the `TCP_KEEPINTVL` option for this socket,
which determines the time in seconds between sending TCP keepalive probes.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
Defaults to `-1` which means to use the smaller of 300 or the system
default. May not be greater than 300. Only applicable on Linux and macOS,
and requires Java 11 or newer.
* `tcp.keep_count`: Configures the `TCP_KEEPCNT` option for this socket, which
determines the number of unacknowledged TCP keepalive probes that may be
sent on a connection before it is dropped.
Only applicable on Linux and Mac, and requires JDK 11 or newer.
Defaults to -1, which does not set this option at the socket level, but
uses default system configuration instead.
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket
* `tcp.send_buffer_size`: Configures the send buffer size of the socket
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket
sent on a connection before it is dropped. Defaults to `-1` which means to
use the system default. Only applicable on Linux and macOS, and requires
Java 11 or newer.
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket.
* `tcp.send_buffer_size`: Configures the send buffer size of the socket.
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket.

[[long-lived-connections]]
===== Long-lived idle connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

package org.elasticsearch.core.internal.net;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.util.Arrays;

/**
* Utilities for network-related methods.
Expand Down Expand Up @@ -59,4 +63,47 @@ private static <T> SocketOption<T> getExtendedSocketOptionOrNull(String fieldNam
return null;
}
}

/**
* If SO_KEEPALIVE is enabled (default), this method ensures sane default values for the extended socket options
* TCP_KEEPIDLE and TCP_KEEPINTERVAL. The default value for TCP_KEEPIDLE is system dependent, but is typically 2 hours.
* Such a high value can result in firewalls eagerly closing these connections. To tell any intermediate devices that
* the connection remains alive, we explicitly set these options to 5 minutes if the defaults are higher than that.
*/
public static void tryEnsureReasonableKeepAliveConfig(NetworkChannel socketChannel) {
assert socketChannel != null;
try {
if (socketChannel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE)) {
final Boolean keepalive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
assert keepalive != null;
if (keepalive.booleanValue()) {
for (SocketOption<Integer> option : Arrays.asList(
NetUtils.getTcpKeepIdleSocketOptionOrNull(),
NetUtils.getTcpKeepIntervalSocketOptionOrNull())) {
setMinValueForSocketOption(socketChannel, option, 300);
}
}
}
} catch (Exception e) {
// Getting an exception here should be ok when concurrently closing the channel
// An UnsupportedOperationException or IllegalArgumentException, however, should not happen
assert e instanceof IOException : e;
}
}

private static void setMinValueForSocketOption(NetworkChannel socketChannel, SocketOption<Integer> option, int minValue) {
if (option != null && socketChannel.supportedOptions().contains(option)) {
try {
final Integer currentIdleVal = socketChannel.getOption(option);
assert currentIdleVal != null;
if (currentIdleVal.intValue() > minValue) {
socketChannel.setOption(option, minValue);
}
} catch (Exception e) {
// Getting an exception here should be ok when concurrently closing the channel
// An UnsupportedOperationException or IllegalArgumentException, however, should not happen
assert e instanceof IOException : e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ private void configureSocket(Socket socket, boolean isConnectComplete) throws IO
}
}
}
NetUtils.tryEnsureReasonableKeepAliveConfig(socket.getChannel());
socket.setTcpNoDelay(socketConfig.tcpNoDelay());
int tcpSendBufferSize = socketConfig.tcpSendBufferSize();
if (tcpSendBufferSize > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
* local buffer with a defined size.
*/
@SuppressForbidden(reason = "Channel#write")
public class CopyBytesSocketChannel extends NioSocketChannel {
public class CopyBytesSocketChannel extends Netty4NioSocketChannel {

private static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue(
System.getProperty("es.transport.buffer.size", "1m"), "es.transport.buffer.size").getBytes());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.channels.SocketChannel;

/**
* Helper class to expose {@link #javaChannel()} method
*/
public class Netty4NioSocketChannel extends NioSocketChannel {

public Netty4NioSocketChannel() {
super();
}

public Netty4NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
}

@Override
public SocketChannel javaChannel() {
return super.javaChannel();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.monitor.jvm.JvmInfo;

Expand Down Expand Up @@ -68,7 +67,7 @@ public static Class<? extends Channel> getChannelType() {
if (ALLOCATOR instanceof NoDirectBuffers) {
return CopyBytesSocketChannel.class;
} else {
return NioSocketChannel.class;
return Netty4NioSocketChannel.class;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.Netty4NioSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;

Expand Down Expand Up @@ -143,31 +143,30 @@ private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGro
bootstrap.group(sharedGroup.getLowLevelGroup());

// NettyAllocator will return the channel type designed to work with the configured allocator
assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType());
bootstrap.channel(NettyAllocator.getChannelType());
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());

bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
// Netty logs a warning if it can't set the option, so try this only on supported platforms
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
}
// Note that Netty logs a warning if it can't set the option
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
}
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
}
}
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
}
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
}
}
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
}
}
}
Expand Down Expand Up @@ -215,26 +214,24 @@ private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupF
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
if (profileSettings.tcpKeepAlive) {
// Netty logs a warning if it can't set the option, so try this only on supported platforms
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
if (profileSettings.tcpKeepIdle >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
}
// Note that Netty logs a warning if it can't set the option
if (profileSettings.tcpKeepIdle >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
}
if (profileSettings.tcpKeepInterval >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
}

}
if (profileSettings.tcpKeepInterval >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
}
if (profileSettings.tcpKeepCount >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
}

}
if (profileSettings.tcpKeepCount >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
}
}
}
Expand Down Expand Up @@ -281,7 +278,6 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOExceptio
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
throw new IOException(connectFuture.cause());
}
addClosedExceptionLogger(channel);

Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
channel.attr(CHANNEL_KEY).set(nettyChannel);
Expand Down Expand Up @@ -311,6 +307,9 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
ch.pipeline().addLast("logging", new ESLoggingHandler());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
Expand All @@ -334,6 +333,8 @@ protected ServerChannelInitializer(String name) {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
Expand Down
Loading

0 comments on commit ffe114b

Please sign in to comment.