diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 8120aa0d..db1022f7 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -1,5 +1,9 @@ package org.logstash.beats; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; @@ -10,7 +14,8 @@ import javax.net.ssl.SSLHandshakeException; public class BeatsHandler extends SimpleChannelInboundHandler { - private final static Logger logger = LogManager.getLogger(BeatsHandler.class); + + private final static Logger logger = LogManager.getLogger(); private final IMessageListener messageListener; private ChannelHandlerContext context; @@ -22,9 +27,7 @@ public BeatsHandler(IMessageListener listener) { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { context = ctx; - if (logger.isTraceEnabled()){ - logger.trace(format("Channel Active")); - } + logger.trace("{}", () -> format("Channel Active")); super.channelActive(ctx); messageListener.onNewConnection(ctx); } @@ -32,23 +35,17 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (logger.isTraceEnabled()){ - logger.trace(format("Channel Inactive")); - } + logger.trace("{}", () -> format("Channel Inactive")); messageListener.onConnectionClose(ctx); } @Override public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception { - if(logger.isDebugEnabled()) { - logger.debug(format("Received a new payload")); - } + logger.debug("{}", () -> format("Received a new payload")); try { for (Message message : batch) { - if (logger.isDebugEnabled()) { - logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); - } + logger.debug("{}", () -> format("Sending a new message for the listener, sequence: " + message.getSequence())); messageListener.onNewMessage(ctx, message); if (needAck(message)) { @@ -58,9 +55,9 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio }finally{ //this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().set(false); - if (logger.isDebugEnabled()) { - logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get()); - } + logger.debug("{}: batches pending: {}", + () -> ctx.channel().id().asShortText(), + () -> ctx.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get().get()); batch.release(); ctx.flush(); } @@ -83,11 +80,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage(); - if (logger.isDebugEnabled()){ - logger.debug(format("Handling exception: " + causeMessage), cause); - } - logger.info(format("Handling exception: " + causeMessage)); - } finally{ + logger.info("{}", () -> format("Handling exception: " + causeMessage)); + logger.catching(Level.DEBUG, cause); + } finally { super.exceptionCaught(ctx, cause); ctx.flush(); ctx.close(); @@ -99,9 +94,7 @@ private boolean needAck(Message message) { } private void ack(ChannelHandlerContext ctx, Message message) { - if (logger.isTraceEnabled()){ - logger.trace(format("Acking message number " + message.getSequence())); - } + logger.trace("{}", () -> format("Acking message number " + message.getSequence())); writeAck(ctx, message.getBatch().getProtocol(), message.getSequence()); } diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 3e4b6bfe..a9f9a0a0 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -18,7 +18,8 @@ public class BeatsParser extends ByteToMessageDecoder { - private final static Logger logger = LogManager.getLogger(BeatsParser.class); + + private static final Logger logger = LogManager.getLogger(); private Batch batch; @@ -195,9 +196,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t logger.trace("Running: READ_JSON"); ((V2Batch)batch).addMessage(sequence, in, requiredBytes); if(batch.isComplete()) { - if(logger.isTraceEnabled()) { - logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); - } + logger.trace("Sending batch size: {}, windowSize: {}, seq: {}", () -> batch.size(), () -> batch.getBatchSize() , () -> sequence); out.add(batch); batchComplete(); } @@ -217,9 +216,7 @@ private void transition(States next) { } private void transition(States nextState, int requiredBytes) { - if(logger.isTraceEnabled()) { - logger.trace("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes"); - } + logger.trace("{}", () -> "Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes"); this.currentState = nextState; this.requiredBytes = requiredBytes; } diff --git a/src/main/java/org/logstash/beats/ConnectionHandler.java b/src/main/java/org/logstash/beats/ConnectionHandler.java index ee25c8c3..23994eac 100644 --- a/src/main/java/org/logstash/beats/ConnectionHandler.java +++ b/src/main/java/org/logstash/beats/ConnectionHandler.java @@ -16,16 +16,15 @@ * Manages the connection state to the beats client. */ public class ConnectionHandler extends ChannelDuplexHandler { - private final static Logger logger = LogManager.getLogger(ConnectionHandler.class); + + private final static Logger logger = LogManager.getLogger(); public static AttributeKey CHANNEL_SEND_KEEP_ALIVE = AttributeKey.valueOf("channel-send-keep-alive"); @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).set(new AtomicBoolean(false)); - if (logger.isTraceEnabled()) { - logger.trace("{}: channel activated", ctx.channel().id().asShortText()); - } + logger.trace("{}: channel activated", () -> ctx.channel().id().asShortText()); super.channelActive(ctx); } @@ -37,9 +36,9 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().set(true); - if (logger.isDebugEnabled()) { - logger.debug("{}: batches pending: {}", ctx.channel().id().asShortText(),ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().get()); - } + logger.debug("{}: batches pending: {}", + () -> ctx.channel().id().asShortText(), + () -> ctx.channel().attr(CHANNEL_SEND_KEEP_ALIVE).get().get()); super.channelRead(ctx, msg); } @@ -80,7 +79,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc } } } else if (e.state() == IdleState.ALL_IDLE) { - logger.debug("{}: reader and writer are idle, closing remote connection", ctx.channel().id().asShortText()); + logger.debug("{}: reader and writer are idle, closing remote connection", + () -> ctx.channel().id().asShortText()); ctx.flush(); ChannelFuture f = ctx.close(); if (logger.isTraceEnabled()) { diff --git a/src/main/java/org/logstash/beats/MessageListener.java b/src/main/java/org/logstash/beats/MessageListener.java index 2a0b2c14..a64d4554 100644 --- a/src/main/java/org/logstash/beats/MessageListener.java +++ b/src/main/java/org/logstash/beats/MessageListener.java @@ -12,7 +12,8 @@ */ // This need to be implemented in Ruby public class MessageListener implements IMessageListener { - private final static Logger logger = LogManager.getLogger(MessageListener.class); + + private static final Logger logger = LogManager.getLogger(); /** diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index a5eb6fd3..5ac26af4 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -8,9 +8,7 @@ public class Runner { private static final int DEFAULT_PORT = 5044; - private final static Logger logger = LogManager.getLogger(Runner.class); - - + private static final Logger logger = LogManager.getLogger(); static public void main(String[] args) throws Exception { logger.info("Starting Beats Bulk"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index bfaec368..92f09649 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -18,7 +18,8 @@ import java.security.cert.CertificateException; public class Server { - private final static Logger logger = LogManager.getLogger(Server.class); + + private static final Logger logger = LogManager.getLogger(); private final int port; private final String host; @@ -52,7 +53,7 @@ public Server listen() throws InterruptedException { } workGroup = new NioEventLoopGroup(); try { - logger.info("Starting server on port: {}", this.port); + logger.info("Starting server on port: {}", port); beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount); diff --git a/src/main/java/org/logstash/netty/SslSimpleBuilder.java b/src/main/java/org/logstash/netty/SslSimpleBuilder.java index f38f80d8..f12ed77c 100644 --- a/src/main/java/org/logstash/netty/SslSimpleBuilder.java +++ b/src/main/java/org/logstash/netty/SslSimpleBuilder.java @@ -30,7 +30,7 @@ public static enum SslClientVerifyMode { VERIFY_PEER, FORCE_PEER, } - private final static Logger logger = LogManager.getLogger(SslSimpleBuilder.class); + private static final Logger logger = LogManager.getLogger(); private File sslKeyFile; @@ -77,7 +77,7 @@ public SslSimpleBuilder setCipherSuites(String[] ciphersSuite) throws IllegalArg if(!OpenSsl.isCipherSuiteAvailable(cipher)) { throw new IllegalArgumentException("Cipher `" + cipher + "` is not available"); } else { - logger.debug("Cipher is supported: " + cipher); + logger.debug("Cipher is supported: {}", cipher); } } @@ -111,25 +111,21 @@ public File getSslCertificateFile() { public SslHandler build(ByteBufAllocator bufferAllocator) throws IOException, NoSuchAlgorithmException, CertificateException { SslContextBuilder builder = SslContextBuilder.forServer(sslCertificateFile, sslKeyFile, passPhrase); - if(logger.isDebugEnabled()) - logger.debug("Available ciphers:" + Arrays.toString(OpenSsl.availableOpenSslCipherSuites().toArray())); - logger.debug("Ciphers: " + Arrays.toString(ciphers)); + logger.debug("Available ciphers: {}", () ->Arrays.toString(OpenSsl.availableOpenSslCipherSuites().toArray())); + logger.debug("Ciphers: {}", () -> Arrays.toString(ciphers)); builder.ciphers(Arrays.asList(ciphers)); - if(requireClientAuth()) { - if (logger.isDebugEnabled()) - logger.debug("Certificate Authorities: " + Arrays.toString(certificateAuthorities)); - + if (requireClientAuth()) { + logger.debug("Certificate Authorities: {}", () -> Arrays.toString(certificateAuthorities)); builder.trustManager(loadCertificateCollection(certificateAuthorities)); } SslContext context = builder.build(); SslHandler sslHandler = context.newHandler(bufferAllocator); - if(logger.isDebugEnabled()) - logger.debug("TLS: " + Arrays.toString(protocols)); + logger.debug("TLS: {}", () -> Arrays.toString(protocols)); SSLEngine engine = sslHandler.engine(); engine.setEnabledProtocols(protocols); @@ -162,7 +158,7 @@ private X509Certificate[] loadCertificateCollection(String[] certificates) throw for(int i = 0; i < certificates.length; i++) { String certificate = certificates[i]; - logger.debug("Loading certificates from file " + certificate); + logger.debug("Loading certificates from file {}", ()-> certificate); try(InputStream in = new FileInputStream(certificate)) { List certificatesChains = (List) certificateFactory.generateCertificates(in); diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml new file mode 100644 index 00000000..d62aba20 --- /dev/null +++ b/src/test/resources/log4j2.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + +