From 66fd077c5c6d7ebbf1859769723989feed291c68 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Thu, 22 Sep 2022 18:58:56 +0200 Subject: [PATCH] Implement initialSendDelay feature The initialSendDelay feature adds an optional wait time before a newly established connection can be used to write events. See #855 --- README.md | 20 +++ .../AbstractLogstashTcpSocketAppender.java | 74 ++++++++-- .../LogstashTcpSocketAppenderTest.java | 137 +++++++++++++++++- 3 files changed, 215 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0e15f1c7..bb01f2d6 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ The structure of the output, and the data it contains, is fully configurable. * [Connection Timeout](#connection-timeout) * [Write Buffer Size](#write-buffer-size) * [Write Timeout](#write-timeout) + * [Initial Send Delay](#initial-send-delay) * [SSL](#ssl) * [Async Appenders](#async-appenders) * [RingBuffer Size](#ringbuffer-size) @@ -600,6 +601,25 @@ The write timeout must be >0. A timeout of zero is interpreted as an infinite ti This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values. + +#### Initial Send Delay + +The appender starts writing the events stored in the queue as soon as the connection is established. In some cases you may want to add an extra delay before sending the first events after the connection is established. This may come in handy in situations where the appender connects to an intermediate proxy that needs some time to establish a connection to the final destination. If the appender starts writing immediately, events may be lost in-flight if the proxy ultimately fails to connect to the final destination. + +To enable this feature, set the `initialSendDelay` to the desired delay before the first event is sent after the connection is established. If the connection is lost before the delay expires, no event will be lost. The default value is `0` which means no delay and start flusing pending events immediately. + +The following example configures a delay of 5 secondes before writing in the new connection: + +```xml + + ... + 5 secondes + +``` + +This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values. + + #### SSL To use SSL, add an `` sub-element within the `` element for the `LogstashTcpSocketAppender` diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index def0746a..50ea839e 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -112,6 +112,11 @@ public abstract class AbstractLogstashTcpSocketAppender= 0"); + } + this.initialSendDelay = delay; + } + + public Duration getInitialSendDelay() { + return initialSendDelay; + } /** * Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}. diff --git a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java index d76b029f..684bdab4 100644 --- a/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java +++ b/src/test/java/net/logstash/logback/appender/LogstashTcpSocketAppenderTest.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -45,9 +46,12 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.UnaryOperator; import javax.net.SocketFactory; @@ -60,6 +64,7 @@ import net.logstash.logback.test.AbstractLogbackTest; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.status.Status; @@ -118,6 +123,18 @@ public void setup() throws Exception { when(encoder.encode(event1)).thenReturn("event1".getBytes(StandardCharsets.UTF_8)); appender.addListener(listener); + appender.addListener(new TcpAppenderListener() { + @Override + public void eventSent(Appender appender, Socket socket, ILoggingEvent event, + long durationInNanos) { + appender.addInfo("Event sent (nanos: " + durationInNanos + ")"); + } + + @Override + public void eventAppended(Appender appender, ILoggingEvent event, long durationInNanos) { + appender.addInfo("Event appended (nanos: " + durationInNanos + ")"); + } + }); appender.setContext(context); } @@ -237,8 +254,8 @@ public void testReconnectOnReadFailure() throws Exception { inOrder.verify(socket, async()).close(); inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt()); } - - + + /** * Scenario: * Two servers: localhost:10000 (primary), localhost:10001 (secondary) @@ -460,6 +477,121 @@ public void testReconnectWaitWhenExhausted() throws Exception { inOrder.verify(socket).connect(host("localhost", 10000), anyInt()); } + + /** + * Appender is stopped while waiting "reconnectionDelay" before attempting to re-establishing + * the connection. + * We expect the wait to be aborted immediately. + */ + @Test + public void testStopWhileWaitingToReconnect() throws Exception { + appender.addDestination("localhost:10000"); + appender.setReconnectionDelay(Duration.buildByMilliseconds(30000)); // configure a long delay between connection attempts + // so we can assert stop() does not wait for this delay to expire + appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0)); // don't wait for the queue to be empty + + when(readableCallableFuture.isDone()) + /* + * First return true, so that the reconnect logic is executed + */ + .thenReturn(true) + /* + * Then return false so that the event can be written + */ + .thenReturn(false); + + appender.start(); + appender.append(event1); + + // Wait for a first connection to be established then closed + verify(socket, async()).close(); + + // Appender is now waiting "reconnectionDelay" before attempting to reconnect + // Stop the appender -> gracePeriod is set to 0 -> we should return before reconnectionDelay + // (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky + // tests when host is overloaded) + // + appender.stop(); + verify(listener, timeout(2000).times(2)).connectionClosed(eq(appender), any()); + } + + + /** + * Assert that "initialSendDelay" is enforced + */ + @Test + public void testInitialSendDelay() throws Exception { + appender.addDestination("localhost:10000"); + appender.setInitialSendDelay(Duration.buildByMilliseconds(1000)); + + // Capture time events are sent + final Map tstamps = new HashMap<>(); + appender.addListener(new TcpAppenderListener() { + @Override + public void eventSent(Appender appender, Socket socket, ILoggingEvent event, long durationInNanos) { + tstamps.put(event, System.currentTimeMillis()); + } + }); + + appender.start(); // Note: will try to async establish the connection immediately + + /* + * Send two events and wait until they are received + */ + long now = System.currentTimeMillis(); + ILoggingEvent e1 = mock(ILoggingEvent.class); + ILoggingEvent e2 = mock(ILoggingEvent.class); + + appender.append(e1); + appender.append(e2); + + await().atMost(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS).until(() -> tstamps.size() >= 2); + + /* + * First event should be received not earlier than the configured "initialSendDelay" + * Second event should be received immediately after + */ + long e1Tstamp = tstamps.get(e1); + long e2Tstamp = tstamps.get(e2); + + assertThat(e1Tstamp).isGreaterThan(now + appender.getInitialSendDelay().getMilliseconds()); + assertThat(e2Tstamp).isGreaterThanOrEqualTo(e1Tstamp); + } + + + /** + * Assert that waiting for initialSendDelay is aborted immediately when the appender is stopped. + */ + @Test + public void testStopWhileWaitingInitialSendDelay() throws Exception { + appender.addDestination("localhost:10000"); + appender.setInitialSendDelay(Duration.buildByMilliseconds(30000)); + appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0)); + + appender.start(); + appender.append(event1); + + // Wait for the connection to be established + verify(listener, async()).connectionOpened(appender, socket); + + // Stop the appender + appender.stop(); + + // Assert that connection is closed before intiialSendDelay expires + // (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky + // tests when host is overloaded) + verify(listener, timeout(2000).times(1)).connectionClosed(eq(appender), any()); + } + + + @Test + public void testInvalidInitialSendDelay() { + assertThatThrownBy(() -> appender.setInitialSendDelay(null)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class); + + assertThatCode(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(0))).doesNotThrowAnyException(); + } + /** * Schedule keep alive and make sure we got the expected amount of messages @@ -568,7 +700,6 @@ public void testInvalidWriteTimeout() { assertThatThrownBy(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class); assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException(); - assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException(); }