diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index cfeb437a..97d38419 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -36,7 +36,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; @@ -273,6 +275,11 @@ public abstract class AbstractLogstashTcpSocketAppender { + executorService.submit(() -> { /* * https://github.com/logstash/logstash-logback-encoder/issues/341 * @@ -805,7 +812,7 @@ private synchronized void scheduleKeepAlive(long basedOnNanoTime) { } long delay = TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) - (System.nanoTime() - basedOnNanoTime); try { - keepAliveFuture = getExecutorService().schedule( + keepAliveFuture = executorService.schedule( keepAliveRunnable, delay, TimeUnit.NANOSECONDS); @@ -837,7 +844,7 @@ private synchronized void scheduleWriteTimeout() { } long delay = writeTimeout.getMilliseconds(); try { - writeTimeoutFuture = getExecutorService().scheduleWithFixedDelay( + writeTimeoutFuture = executorService.scheduleWithFixedDelay( writeTimeoutRunnable, delay, delay, @@ -950,16 +957,17 @@ public synchronized void start() { } if (errorCount == 0) { - + encoder.setContext(getContext()); if (!encoder.isStarted()) { encoder.start(); } /* - * Increase the core size to handle the reader thread + * Start with an initial core size of 1 to handle the Reader thread */ - int threadPoolCoreSize = getThreadPoolCoreSize() + 1; + int threadPoolCoreSize = 1; + /* * Increase the core size to handle the keep alive thread */ @@ -972,7 +980,15 @@ public synchronized void start() { if (isWriteTimeoutEnabled()) { threadPoolCoreSize++; } - setThreadPoolCoreSize(threadPoolCoreSize); + this.executorService = new ScheduledThreadPoolExecutor( + threadPoolCoreSize, + getThreadFactory()); + + /* + * This ensures that cancelled tasks do not hold up shutdown. + */ + this.executorService.setRemoveOnCancelPolicy(true); + this.shutdownLatch = new CountDownLatch(1); super.start(); } @@ -983,15 +999,29 @@ public synchronized void stop() { if (!isStarted()) { return; } + + super.stop(); + /* * Stop waiting to reconnect (if reconnect logic is currently waiting) */ this.shutdownLatch.countDown(); - super.stop(); + + /* + * Stop executor service + */ + this.executorService.shutdown(); + try { + if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) { + addWarn("Some queued events have not been logged due to requested shutdown"); + } + } catch (InterruptedException e) { + addWarn("Some queued events have not been logged due to requested shutdown", e); + } } protected Future scheduleReaderCallable(Callable readerCallable) { - return getExecutorService().submit(readerCallable); + return executorService.submit(readerCallable); } protected void fireEventSent(Socket socket, Event event, long durationInNanos) { diff --git a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java index 60a5ecf7..f4504e2f 100644 --- a/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java +++ b/src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java @@ -21,8 +21,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -203,16 +201,6 @@ public abstract class AsyncDisruptorAppender>( this.eventFactory, this.ringBufferSize, - this.executorService, + this.threadFactory, this.producerType, this.waitStrategy); @@ -504,23 +481,6 @@ public void stop() { if (!isRingBufferEmpty()) { addWarn("Some queued events have not been logged due to requested shutdown"); } - - - /* - * Shutdown executor service - */ - this.executorService.shutdown(); - - try { - this.executorService.awaitTermination(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignored - } - - - /* - * Notify listeners - */ fireAppenderStopped(); } @@ -645,21 +605,10 @@ protected void setEventTranslator(EventTranslatorOneArg, Event> this.eventTranslator = eventTranslator; } - protected ScheduledExecutorService getExecutorService() { - return executorService; - } - protected Disruptor> getDisruptor() { return disruptor; } - protected int getThreadPoolCoreSize() { - return threadPoolCoreSize; - } - protected void setThreadPoolCoreSize(int threadPoolCoreSize) { - this.threadPoolCoreSize = threadPoolCoreSize; - } - public String getThreadNameFormat() { return threadNameFormat; } diff --git a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java index 2da576bd..90219c5e 100644 --- a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java @@ -22,7 +22,6 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -70,6 +69,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.verification.VerificationWithTimeout; @ExtendWith(MockitoExtension.class) public class LogstashTcpSocketAppenderTest { @@ -123,7 +123,7 @@ public void setup() throws IOException { when(socket.getOutputStream()).thenReturn(outputStream); when(encoder.encode(event1)).thenReturn("event1".getBytes(StandardCharsets.UTF_8)); appender.addListener(listener); - + appender.setContext(context); } @AfterEach @@ -144,13 +144,13 @@ public void testEncoderCalled_logback12OrLater() { verify(event1).getCallerData(); - verify(encoder, timeout(VERIFICATION_TIMEOUT)).encode(event1); + verify(encoder, async()).encode(event1); } @Test public void testReconnectOnOpen() throws Exception { appender.addDestination("localhost:10000"); - appender.setReconnectionDelay(new Duration(100)); + appender.setReconnectionDelay(Duration.buildByMilliseconds(100)); reset(socketFactory); SocketTimeoutException exception = new SocketTimeoutException(); @@ -164,7 +164,7 @@ public void testReconnectOnOpen() throws Exception { appender.append(event1); - verify(encoder, timeout(VERIFICATION_TIMEOUT)).encode(event1); + verify(encoder, async()).encode(event1); assertThat(appender.getConnectedDestination()).isPresent(); @@ -176,32 +176,51 @@ public void testReconnectOnOpen() throws Exception { appender.stop(); - verify(listener, timeout(VERIFICATION_TIMEOUT)).connectionClosed(appender, socket); - + verify(listener, async()).connectionClosed(appender, socket); + assertThat(appender.getConnectedDestination()).isNotPresent(); } + + /** + * Scenario: + * Failure to write in the connection output stream + * + * Assert the appender closes the connection and reconnects + */ @Test - public void testReconnectOnWrite() { + public void testReconnectOnWrite() throws Exception { appender.addDestination("localhost:10000"); - appender.setReconnectionDelay(new Duration(100)); + appender.setReconnectionDelay(Duration.buildByMilliseconds(100)); - appender.start(); + // Throw exception at first attempt to write into the connection - this should + // trigger a reconnect + doThrow(IOException.class) + .doNothing() + .when(outputStream).write(any(byte[].class), anyInt(), anyInt()); + appender.start(); verify(encoder).start(); - doThrow(new RuntimeException()).doReturn("event1".getBytes(StandardCharsets.UTF_8)).when(encoder).encode(event1); - appender.append(event1); - verify(encoder, timeout(VERIFICATION_TIMEOUT).times(2)).encode(event1); + InOrder inOrder = inOrder(socket); + inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt()); + inOrder.verify(socket, async()).close(); + inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt()); } + + /** + * Scenario: + * Destination closes the connection (detected by the readableCallableFuture). + * + * Assert that the appender closes the socket and attempt to reconnect. + */ @Test - public void testReconnectOnReadFailure() { - + public void testReconnectOnReadFailure() throws Exception { appender.addDestination("localhost:10000"); - appender.setReconnectionDelay(new Duration(100)); + appender.setReconnectionDelay(Duration.buildByMilliseconds(100)); when(readableCallableFuture.isDone()) /* @@ -214,12 +233,14 @@ public void testReconnectOnReadFailure() { .thenReturn(false); appender.start(); - verify(encoder).start(); appender.append(event1); - verify(encoder, timeout(VERIFICATION_TIMEOUT)).encode(event1); + InOrder inOrder = inOrder(socket); + inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt()); + inOrder.verify(socket, async()).close(); + inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt()); } @@ -227,7 +248,7 @@ public void testReconnectOnReadFailure() { * Scenario: * Two servers: localhost:10000 (primary), localhost:10001 (secondary) * Primary is available at startup - * Appender should connect to PRIMARY and not any secondaries + * Appender should connect to PRIMARY and not to any secondaries */ @Test public void testConnectOnPrimary() throws Exception { @@ -238,14 +259,14 @@ public void testConnectOnPrimary() throws Exception { verify(encoder).start(); // Wait for the connection process to be fully completed - verify(listener, timeout(VERIFICATION_TIMEOUT)).connectionOpened(appender, socket); + verify(listener, async()).connectionOpened(appender, socket); // Only one socket should have been created verify(socket, times(1)).connect(any(SocketAddress.class), anyInt()); // The only socket should be connected to primary verify(socket).connect(host("localhost", 10000), anyInt()); - + assertThat(appender.getConnectedDestination()).isPresent(); } @@ -261,7 +282,7 @@ public void testReconnectToSecondaryOnOpen() throws Exception { appender.addDestination("localhost:10000"); appender.addDestination("localhost:10001"); - // Make it failed to connect to primary + // Make it fail to connect to primary doThrow(SocketTimeoutException.class) .when(socket).connect(host("localhost", 10000), anyInt()); @@ -271,16 +292,17 @@ public void testReconnectToSecondaryOnOpen() throws Exception { verify(encoder).start(); // TWO connection attempts must have been made (without delay) - verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); - InOrder inOrder = inOrder(socket); + verify(socket, async().times(2)).connect(any(), anyInt()); + InOrder inOrder = inOrder(socket, listener); // 1) First attempt on PRIMARY: failure inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); - + // 2) Second attempt on SECONDARY inOrder.verify(socket).connect(host("localhost", 10001), anyInt()); } + @Test public void testRandomDestinationAndReconnectToSecondaryOnOpen() throws Exception { appender.addDestination("localhost:10000"); @@ -289,7 +311,7 @@ public void testRandomDestinationAndReconnectToSecondaryOnOpen() throws Exceptio doReturn(random).when(strategy).getRandom(); appender.setConnectionStrategy(strategy); - // Make it failed to connect to second destination + // Make it fail to connect to second destination doThrow(SocketTimeoutException.class) .when(socket).connect(host("localhost", 10001), anyInt()); @@ -297,18 +319,18 @@ public void testRandomDestinationAndReconnectToSecondaryOnOpen() throws Exceptio when(random.nextInt(appender.getDestinations().size())).thenReturn(1).thenReturn(0); // Start the appender and verify it is actually started. - // It should try to connect to second destination by random destination, fail then retry on first destination. + // It should try to connect to the second destination, fail then retry on first destination. appender.start(); verify(encoder).start(); // TWO connection attempts must have been made (without delay) - verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); + verify(socket, async().times(2)).connect(any(), anyInt()); InOrder inOrder = inOrder(socket); - // 1) First attempt on second destination: failure + // 1) First attempt on SECONDARY: failure inOrder.verify(socket).connect(host("localhost", 10001), anyInt()); - - // 2) Second attempt on first destination + + // 2) Second attempt on PRIMARY inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); } @@ -324,22 +346,12 @@ public void testReconnectToSecondaryOnWrite() throws Exception { appender.addDestination("localhost:10000"); appender.addDestination("localhost:10001"); - // Primary accepts first connection attempt then refuses - doNothing() - .doThrow(SocketTimeoutException.class) - .when(socket).connect(host("localhost", 10000), anyInt()); - - // Secondary refuses all attempts - doThrow(SocketTimeoutException.class) - .when(socket).connect(host("localhost", 10001), anyInt()); - - // First attempt of sending the event throws an exception while subsequent - // attempts will succeed. This should force the appender to close the connection + // First attempt at sending the event throws an exception while subsequent + // attempts succeed. This should force the appender to close the connection // and attempt to reconnect - doThrow(new RuntimeException()) - .doReturn("event1".getBytes(StandardCharsets.UTF_8)) - .when(encoder).encode(event1); - + doThrow(IOException.class) + .doNothing() + .when(outputStream).write(any(byte[].class), anyInt(), anyInt()); // Start the appender and verify it is actually started // At this point, it should be connected to primary. @@ -350,14 +362,17 @@ public void testReconnectToSecondaryOnWrite() throws Exception { // TWO connection attempts must have been made in total - verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); + verify(socket, async().times(2)).connect(any(), anyInt()); InOrder inOrder = inOrder(socket); // 1) connected to primary at startup inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); - + inOrder.verify(socket).close(); + // 2) retry on secondary after failed attempt to send event inOrder.verify(socket).connect(host("localhost", 10001), anyInt()); + + verify(encoder, times(2)).encode(event1); } @@ -368,7 +383,7 @@ public void testReconnectToSecondaryOnWrite() throws Exception { public void testReconnectToPrimaryWhileOnSecondary() throws Exception { appender.addDestination("localhost:10000"); appender.addDestination("localhost:10001"); - appender.setReconnectionDelay(new Duration(1)); + appender.setReconnectionDelay(Duration.buildByMilliseconds(1)); appender.setSecondaryConnectionTTL(Duration.buildByMilliseconds(100)); // Primary refuses first connection to force the appender to go on the secondary. @@ -378,23 +393,23 @@ public void testReconnectToPrimaryWhileOnSecondary() throws Exception { // Start the appender and verify it is actually started - // At this point, it should be connected to primary. + // At this point, it should be connected to secondary. appender.start(); verify(encoder).start(); // The appender is supposed to be on the secondary. - // Wait until after the appender is supposed to reattempt to connect to primary, then + // Wait until after the appender is supposed to re-attempt to connect to primary, then // send an event (requires some activity to trigger the reconnection process). Thread.sleep(appender.getSecondaryConnectionTTL().getMilliseconds() + 50); appender.append(event1); // THREE connection attempts must have been made in total - verify(socket, timeout(VERIFICATION_TIMEOUT).times(3)).connect(any(SocketAddress.class), anyInt()); + verify(socket, async().times(3)).connect(any(), anyInt()); InOrder inOrder = inOrder(socket, encoder); - // 1) fail to connect on primary at startup + // 1) failed to connect on primary at startup inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); // 2) connect to secondary @@ -403,7 +418,8 @@ public void testReconnectToPrimaryWhileOnSecondary() throws Exception { // 3) send the event inOrder.verify(encoder).encode(event1); - // 4) connect to primary + // 4) disconnect from secondary and reconnect to primary + inOrder.verify(socket).close(); inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); } @@ -482,12 +498,13 @@ public void testKeepAlive() throws Exception { Assertions.assertArrayEquals(expectedKeepAlivesBytes, bos.toByteArray()); } + @Test public void testWriteTimeout() throws Exception { appender.addDestination("localhost"); - appender.setWriteTimeout(new Duration(100)); - appender.setReconnectionDelay(new Duration(1)); + appender.setWriteTimeout(Duration.buildByMilliseconds(100)); + appender.setReconnectionDelay(Duration.buildByMilliseconds(1)); appender.setWriteBufferSize(0); Socket badSocket = mock(Socket.class, "badSocket"); @@ -525,9 +542,9 @@ public void testWriteTimeout() throws Exception { verify(goodOutputStream, timeout(1000)).flush(); } + /** - * Make sure keep alive messages trigger reconnect to another host upon failure. - * + * Make sure keep alive messages trigger a reconnect to another host upon failure. */ @Test public void testReconnectToSecondaryOnKeepAlive() throws Exception { @@ -539,18 +556,10 @@ public void testReconnectToSecondaryOnKeepAlive() throws Exception { appender.setKeepAliveMessage("UNIX"); appender.setKeepAliveDuration(Duration.buildByMilliseconds(100)); - // Primary accepts first connection then refuse subsequent attempts - doNothing() - .doThrow(SocketTimeoutException.class) - .when(socket).connect(host("localhost", 10000), anyInt()); - - // Secondary refuses all attempts - doThrow(SocketTimeoutException.class) - .when(socket).connect(host("localhost", 10001), anyInt()); - - // Throw an exception the first time the the appender attempts to write in the output stream. + // Throw an exception the first time the the appender attempts to write in the output + // stream - this will be the keep alive message sent while on the primary destination. // This should cause the appender to initiate the reconnect sequence. - doThrow(SocketException.class) + doThrow(IOException.class) .doNothing() .when(outputStream).write(any(byte[].class), anyInt(), anyInt()); @@ -561,7 +570,7 @@ public void testReconnectToSecondaryOnKeepAlive() throws Exception { // Wait for a bit more than a single keep alive message. // TWO connection attempts must have been made in total: - verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); + verify(socket, async().times(2)).connect(any(), anyInt()); InOrder inOrder = inOrder(socket); // 1) connected to primary at startup @@ -570,7 +579,7 @@ public void testReconnectToSecondaryOnKeepAlive() throws Exception { // 2) retry on secondary after failed attempt to send event inOrder.verify(socket).connect(host("localhost", 10001), anyInt()); } - + /** * At least one valid destination must be configured. @@ -600,18 +609,20 @@ public void testRoundRobin() throws Exception { Thread.sleep(strategy.getConnectionTTL().getMilliseconds() + 50); appender.append(event1); - verify(socket, timeout(VERIFICATION_TIMEOUT).times(2)).connect(any(SocketAddress.class), anyInt()); + verify(socket, async().times(2)).connect(any(), anyInt()); InOrder inOrder = inOrder(socket); // 1) connected to primary at startup inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); - + inOrder.verify(socket).close(); + // 2) connected to next destination by round-robin inOrder.verify(socket).connect(host("localhost", 10001), anyInt()); - - } + + // -------------------------------------------------------------------------------------------- + private SocketAddress host(final String host, final int port) { return argThat(hasHostAndPort(host, port)); } @@ -631,4 +642,8 @@ public String toString() { } }; } + + private static VerificationWithTimeout async() { + return timeout(VERIFICATION_TIMEOUT); + } }