Skip to content

Commit

Permalink
Implement initialSendDelay feature
Browse files Browse the repository at this point in the history
The initialSendDelay feature adds an optional wait time before a newly established connection can be used to write events.

See #855
  • Loading branch information
brenuart committed Sep 22, 2022
1 parent f6ea70f commit 66fd077
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 16 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The structure of the output, and the data it contains, is fully configurable.
* [Connection Timeout](#connection-timeout)
* [Write Buffer Size](#write-buffer-size)
* [Write Timeout](#write-timeout)
* [Initial Send Delay](#initial-send-delay)
* [SSL](#ssl)
* [Async Appenders](#async-appenders)
* [RingBuffer Size](#ringbuffer-size)
Expand Down Expand Up @@ -600,6 +601,25 @@ The write timeout must be >0. A timeout of zero is interpreted as an infinite ti
This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values.



#### Initial Send Delay

The appender starts writing the events stored in the queue as soon as the connection is established. In some cases you may want to add an extra delay before sending the first events after the connection is established. This may come in handy in situations where the appender connects to an intermediate proxy that needs some time to establish a connection to the final destination. If the appender starts writing immediately, events may be lost in-flight if the proxy ultimately fails to connect to the final destination.

To enable this feature, set the `initialSendDelay` to the desired delay before the first event is sent after the connection is established. If the connection is lost before the delay expires, no event will be lost. The default value is `0` which means no delay and start flusing pending events immediately.

The following example configures a delay of 5 secondes before writing in the new connection:

```xml
<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
...
<initialSendDelay>5 secondes</initialSendDelay>
</appender>
```

This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values.


#### SSL

To use SSL, add an `<ssl>` sub-element within the `<appender>` element for the `LogstashTcpSocketAppender`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
public static final int DEFAULT_WRITE_TIMEOUT = 0;

/**
* The default delay before sending data into a newly established connection
*/
public static final int DEFAULT_INITIALSEND_DELAY = 0;

/**
* Default size of the queue used to hold logging events that are destined
* for the remote peer.
Expand Down Expand Up @@ -187,6 +192,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
private Duration connectionTimeout = new Duration(DEFAULT_CONNECTION_TIMEOUT);

/**
* The amount of time to wait before sending data into a newly established connection.
*/
private Duration initialSendDelay = new Duration(DEFAULT_INITIALSEND_DELAY);

/**
* Human readable identifier of the client (used for logback status messages)
*/
Expand Down Expand Up @@ -732,16 +742,9 @@ private synchronized void openSocket() {
if (errorCount < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {
addWarn(peerId + "Waiting " + sleepTime + "ms before attempting reconnection.");
}
try {
shutdownLatch.await(sleepTime, TimeUnit.MILLISECONDS);
if (!isStarted()) {
return;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
addWarn(peerId + "connection interrupted. Will no longer attempt reconnection.");
return;
}

sleepUnlessStopped(sleepTime);

// reset the start time to be after the wait period.
startWallTime = System.currentTimeMillis();
}
Expand Down Expand Up @@ -798,9 +801,25 @@ private synchronized void openSocket() {

fireConnectionOpened(this.socket);

/*
* wait for initialSendDelay before returning and start sending data in the newly
* established connection
*/
sleepUnlessStopped(initialSendDelay.getMilliseconds());

return;

} catch (Exception e) {
}
/*
* We have been interrupted (appender stopped) while waiting between connection attempts
* or during initialSendDelay
*/
catch (InterruptedException ie) {
CloseUtil.closeQuietly(tempOutputStream);
CloseUtil.closeQuietly(tempSocket);

Thread.currentThread().interrupt();
}
catch (Exception e) {
CloseUtil.closeQuietly(tempOutputStream);
CloseUtil.closeQuietly(tempSocket);

Expand All @@ -816,7 +835,21 @@ private synchronized void openSocket() {
}
}
}



/**
* Sleep for the given amount of time and throws an {@link InterruptedException} if appender
* is stopped while waiting.
*
* @param millis the amount of time to wait
* @throws InterruptedException thrown if the appender is stopped while waiting
*/
private void sleepUnlessStopped(long millis) throws InterruptedException {
if (shutdownLatch.await(millis, TimeUnit.MILLISECONDS) || !isStarted()) {
throw new InterruptedException();
}
}

private synchronized void closeSocket() {
connectedDestination = null;
CloseUtil.closeQuietly(outputStream);
Expand Down Expand Up @@ -1226,6 +1259,21 @@ public Duration getReconnectionDelay() {
return reconnectionDelay;
}

/**
* Time period to wait before sending data into a newly established connection.
*
* @param delay the time to wait before sending the first data
*/
public void setInitialSendDelay(Duration delay) {
if (delay == null || delay.getMilliseconds() < 0) {
throw new IllegalArgumentException("initialSendDelay must be >= 0");
}
this.initialSendDelay = delay;
}

public Duration getInitialSendDelay() {
return initialSendDelay;
}

/**
* Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -45,9 +46,12 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;

import javax.net.SocketFactory;
Expand All @@ -60,6 +64,7 @@
import net.logstash.logback.test.AbstractLogbackTest;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
import ch.qos.logback.core.status.Status;
Expand Down Expand Up @@ -118,6 +123,18 @@ public void setup() throws Exception {
when(encoder.encode(event1)).thenReturn("event1".getBytes(StandardCharsets.UTF_8));

appender.addListener(listener);
appender.addListener(new TcpAppenderListener<ILoggingEvent>() {
@Override
public void eventSent(Appender<ILoggingEvent> appender, Socket socket, ILoggingEvent event,
long durationInNanos) {
appender.addInfo("Event sent (nanos: " + durationInNanos + ")");
}

@Override
public void eventAppended(Appender<ILoggingEvent> appender, ILoggingEvent event, long durationInNanos) {
appender.addInfo("Event appended (nanos: " + durationInNanos + ")");
}
});
appender.setContext(context);
}

Expand Down Expand Up @@ -237,8 +254,8 @@ public void testReconnectOnReadFailure() throws Exception {
inOrder.verify(socket, async()).close();
inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt());
}


/**
* Scenario:
* Two servers: localhost:10000 (primary), localhost:10001 (secondary)
Expand Down Expand Up @@ -460,6 +477,121 @@ public void testReconnectWaitWhenExhausted() throws Exception {
inOrder.verify(socket).connect(host("localhost", 10000), anyInt());
}


/**
* Appender is stopped while waiting "reconnectionDelay" before attempting to re-establishing
* the connection.
* We expect the wait to be aborted immediately.
*/
@Test
public void testStopWhileWaitingToReconnect() throws Exception {
appender.addDestination("localhost:10000");
appender.setReconnectionDelay(Duration.buildByMilliseconds(30000)); // configure a long delay between connection attempts
// so we can assert stop() does not wait for this delay to expire
appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0)); // don't wait for the queue to be empty

when(readableCallableFuture.isDone())
/*
* First return true, so that the reconnect logic is executed
*/
.thenReturn(true)
/*
* Then return false so that the event can be written
*/
.thenReturn(false);

appender.start();
appender.append(event1);

// Wait for a first connection to be established then closed
verify(socket, async()).close();

// Appender is now waiting "reconnectionDelay" before attempting to reconnect
// Stop the appender -> gracePeriod is set to 0 -> we should return before reconnectionDelay
// (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky
// tests when host is overloaded)
//
appender.stop();
verify(listener, timeout(2000).times(2)).connectionClosed(eq(appender), any());
}


/**
* Assert that "initialSendDelay" is enforced
*/
@Test
public void testInitialSendDelay() throws Exception {
appender.addDestination("localhost:10000");
appender.setInitialSendDelay(Duration.buildByMilliseconds(1000));

// Capture time events are sent
final Map<ILoggingEvent, Long> tstamps = new HashMap<>();
appender.addListener(new TcpAppenderListener<ILoggingEvent>() {
@Override
public void eventSent(Appender<ILoggingEvent> appender, Socket socket, ILoggingEvent event, long durationInNanos) {
tstamps.put(event, System.currentTimeMillis());
}
});

appender.start(); // Note: will try to async establish the connection immediately

/*
* Send two events and wait until they are received
*/
long now = System.currentTimeMillis();
ILoggingEvent e1 = mock(ILoggingEvent.class);
ILoggingEvent e2 = mock(ILoggingEvent.class);

appender.append(e1);
appender.append(e2);

await().atMost(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS).until(() -> tstamps.size() >= 2);

/*
* First event should be received not earlier than the configured "initialSendDelay"
* Second event should be received immediately after
*/
long e1Tstamp = tstamps.get(e1);
long e2Tstamp = tstamps.get(e2);

assertThat(e1Tstamp).isGreaterThan(now + appender.getInitialSendDelay().getMilliseconds());
assertThat(e2Tstamp).isGreaterThanOrEqualTo(e1Tstamp);
}


/**
* Assert that waiting for initialSendDelay is aborted immediately when the appender is stopped.
*/
@Test
public void testStopWhileWaitingInitialSendDelay() throws Exception {
appender.addDestination("localhost:10000");
appender.setInitialSendDelay(Duration.buildByMilliseconds(30000));
appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0));

appender.start();
appender.append(event1);

// Wait for the connection to be established
verify(listener, async()).connectionOpened(appender, socket);

// Stop the appender
appender.stop();

// Assert that connection is closed before intiialSendDelay expires
// (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky
// tests when host is overloaded)
verify(listener, timeout(2000).times(1)).connectionClosed(eq(appender), any());
}


@Test
public void testInvalidInitialSendDelay() {
assertThatThrownBy(() -> appender.setInitialSendDelay(null)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class);

assertThatCode(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
}


/**
* Schedule keep alive and make sure we got the expected amount of messages
Expand Down Expand Up @@ -568,7 +700,6 @@ public void testInvalidWriteTimeout() {
assertThatThrownBy(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class);

assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
}


Expand Down

0 comments on commit 66fd077

Please sign in to comment.