diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 492ae297..af7a9217 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -3,19 +3,27 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.ChannelMatchers; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.netty.SslHandlerProvider; +import java.util.concurrent.TimeUnit; + public class Server { private final static Logger logger = LogManager.getLogger(Server.class); @@ -84,12 +92,13 @@ public void stop() { private void shutdown() { try { + if (beatsInitializer != null) { + beatsInitializer.shutdownConnectionsAndExecutors(); + } if (workGroup != null) { + logger.debug("Shutting down worker group"); workGroup.shutdownGracefully().sync(); } - if (beatsInitializer != null) { - beatsInitializer.shutdownEventExecutor(); - } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -121,6 +130,7 @@ private class BeatsInitializer extends ChannelInitializer { private final static int DEFAULT_IDLESTATEHANDLER_THREAD = 4; private final static int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; + private final ChannelGroup activeChannelGroup; private final EventExecutorGroup idleExecutorGroup; private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; @@ -132,11 +142,27 @@ private class BeatsInitializer extends ChannelInitializer { this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); + activeChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } public void initChannel(SocketChannel socket) { + activeChannelGroup.add(socket); ChannelPipeline pipeline = socket.pipeline(); + pipeline.addLast(new ChannelInboundHandlerAdapter(){ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + activeChannelGroup.add(ctx.channel()); + super.channelActive(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + activeChannelGroup.remove(ctx.channel()); + super.channelUnregistered(ctx); + } + }); + if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); } @@ -157,8 +183,22 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } - public void shutdownEventExecutor() { + public void shutdownConnectionsAndExecutors() { try { + logger.debug("Shutting down server channel"); + activeChannelGroup.close(ChannelMatchers.isServerChannel()).sync(); + + ChannelGroupFuture channelsClosedFuture = activeChannelGroup.newCloseFuture(ChannelMatchers.isNonServerChannel()); + if (!channelsClosedFuture.isDone()) { + // if we have inbound connections still, give them a brief + // moment to finish what they have already begun before closing them + if (!channelsClosedFuture.await(3, TimeUnit.SECONDS)) { + logger.debug("Forcibly closing client connections"); + activeChannelGroup.close().sync(); + } + } + + logger.debug("Shutting down executors"); idleExecutorGroup.shutdownGracefully().sync(); beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) {