Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement initialSendDelay feature #860

Merged
merged 1 commit into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The structure of the output, and the data it contains, is fully configurable.
* [Connection Timeout](#connection-timeout)
* [Write Buffer Size](#write-buffer-size)
* [Write Timeout](#write-timeout)
* [Initial Send Delay](#initial-send-delay)
* [SSL](#ssl)
* [Async Appenders](#async-appenders)
* [RingBuffer Size](#ringbuffer-size)
Expand Down Expand Up @@ -600,6 +601,25 @@ The write timeout must be >0. A timeout of zero is interpreted as an infinite ti
This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values.



#### Initial Send Delay

The appender starts writing the events stored in the queue as soon as the connection is established. In some cases you may want to add an extra delay before sending the first events after the connection is established. This may come in handy in situations where the appender connects to an intermediate proxy that needs some time to establish a connection to the final destination. If the appender starts writing immediately, events may be lost in-flight if the proxy ultimately fails to connect to the final destination.

To enable this feature, set the `initialSendDelay` to the desired delay before the first event is sent after the connection is established. If the connection is lost before the delay expires, no event will be lost. The default value is `0` which means no delay and start flusing pending events immediately.

The following example configures a delay of 5 secondes before writing in the new connection:

```xml
<appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
...
<initialSendDelay>5 secondes</initialSendDelay>
</appender>
```

This setting accepts a Logback Duration value - see the section dedicated to [Duration Property](#duration-property) for more information about the valid values.


#### SSL

To use SSL, add an `<ssl>` sub-element within the `<appender>` element for the `LogstashTcpSocketAppender`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
public static final int DEFAULT_WRITE_TIMEOUT = 0;

/**
* The default delay before sending data into a newly established connection
*/
public static final int DEFAULT_INITIALSEND_DELAY = 0;

/**
* Default size of the queue used to hold logging events that are destined
* for the remote peer.
Expand Down Expand Up @@ -187,6 +192,11 @@ public abstract class AbstractLogstashTcpSocketAppender<Event extends DeferredPr
*/
private Duration connectionTimeout = new Duration(DEFAULT_CONNECTION_TIMEOUT);

/**
* The amount of time to wait before sending data into a newly established connection.
*/
private Duration initialSendDelay = new Duration(DEFAULT_INITIALSEND_DELAY);

/**
* Human readable identifier of the client (used for logback status messages)
*/
Expand Down Expand Up @@ -732,16 +742,9 @@ private synchronized void openSocket() {
if (errorCount < MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {
addWarn(peerId + "Waiting " + sleepTime + "ms before attempting reconnection.");
}
try {
shutdownLatch.await(sleepTime, TimeUnit.MILLISECONDS);
if (!isStarted()) {
return;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
addWarn(peerId + "connection interrupted. Will no longer attempt reconnection.");
return;
}

sleepUnlessStopped(sleepTime);

// reset the start time to be after the wait period.
startWallTime = System.currentTimeMillis();
}
Expand Down Expand Up @@ -798,9 +801,25 @@ private synchronized void openSocket() {

fireConnectionOpened(this.socket);

/*
* wait for initialSendDelay before returning and start sending data in the newly
* established connection
*/
sleepUnlessStopped(initialSendDelay.getMilliseconds());

return;

} catch (Exception e) {
}
/*
* We have been interrupted (appender stopped) while waiting between connection attempts
* or during initialSendDelay
*/
catch (InterruptedException ie) {
CloseUtil.closeQuietly(tempOutputStream);
CloseUtil.closeQuietly(tempSocket);

Thread.currentThread().interrupt();
}
catch (Exception e) {
CloseUtil.closeQuietly(tempOutputStream);
CloseUtil.closeQuietly(tempSocket);

Expand All @@ -816,7 +835,21 @@ private synchronized void openSocket() {
}
}
}



/**
* Sleep for the given amount of time and throws an {@link InterruptedException} if appender
* is stopped while waiting.
*
* @param millis the amount of time to wait
* @throws InterruptedException thrown if the appender is stopped while waiting
*/
private void sleepUnlessStopped(long millis) throws InterruptedException {
if (shutdownLatch.await(millis, TimeUnit.MILLISECONDS) || !isStarted()) {
throw new InterruptedException();
}
}

private synchronized void closeSocket() {
connectedDestination = null;
CloseUtil.closeQuietly(outputStream);
Expand Down Expand Up @@ -1226,6 +1259,21 @@ public Duration getReconnectionDelay() {
return reconnectionDelay;
}

/**
* Time period to wait before sending data into a newly established connection.
*
* @param delay the time to wait before sending the first data
*/
public void setInitialSendDelay(Duration delay) {
if (delay == null || delay.getMilliseconds() < 0) {
throw new IllegalArgumentException("initialSendDelay must be >= 0");
}
this.initialSendDelay = delay;
}

public Duration getInitialSendDelay() {
return initialSendDelay;
}

/**
* Convenience method for setting {@link PreferPrimaryDestinationConnectionStrategy#setSecondaryConnectionTTL(Duration)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -45,9 +46,12 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;

import javax.net.SocketFactory;
Expand All @@ -60,6 +64,7 @@
import net.logstash.logback.test.AbstractLogbackTest;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
import ch.qos.logback.core.status.Status;
Expand Down Expand Up @@ -118,6 +123,18 @@ public void setup() throws Exception {
when(encoder.encode(event1)).thenReturn("event1".getBytes(StandardCharsets.UTF_8));

appender.addListener(listener);
appender.addListener(new TcpAppenderListener<ILoggingEvent>() {
@Override
public void eventSent(Appender<ILoggingEvent> appender, Socket socket, ILoggingEvent event,
long durationInNanos) {
appender.addInfo("Event sent (nanos: " + durationInNanos + ")");
}

@Override
public void eventAppended(Appender<ILoggingEvent> appender, ILoggingEvent event, long durationInNanos) {
appender.addInfo("Event appended (nanos: " + durationInNanos + ")");
}
});
appender.setContext(context);
}

Expand Down Expand Up @@ -237,8 +254,8 @@ public void testReconnectOnReadFailure() throws Exception {
inOrder.verify(socket, async()).close();
inOrder.verify(socket, async()).connect(host("localhost", 10000), anyInt());
}


/**
* Scenario:
* Two servers: localhost:10000 (primary), localhost:10001 (secondary)
Expand Down Expand Up @@ -460,6 +477,121 @@ public void testReconnectWaitWhenExhausted() throws Exception {
inOrder.verify(socket).connect(host("localhost", 10000), anyInt());
}


/**
* Appender is stopped while waiting "reconnectionDelay" before attempting to re-establishing
* the connection.
* We expect the wait to be aborted immediately.
*/
@Test
public void testStopWhileWaitingToReconnect() throws Exception {
appender.addDestination("localhost:10000");
appender.setReconnectionDelay(Duration.buildByMilliseconds(30000)); // configure a long delay between connection attempts
// so we can assert stop() does not wait for this delay to expire
appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0)); // don't wait for the queue to be empty

when(readableCallableFuture.isDone())
/*
* First return true, so that the reconnect logic is executed
*/
.thenReturn(true)
/*
* Then return false so that the event can be written
*/
.thenReturn(false);

appender.start();
appender.append(event1);

// Wait for a first connection to be established then closed
verify(socket, async()).close();

// Appender is now waiting "reconnectionDelay" before attempting to reconnect
// Stop the appender -> gracePeriod is set to 0 -> we should return before reconnectionDelay
// (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky
// tests when host is overloaded)
//
appender.stop();
verify(listener, timeout(2000).times(2)).connectionClosed(eq(appender), any());
}


/**
* Assert that "initialSendDelay" is enforced
*/
@Test
public void testInitialSendDelay() throws Exception {
appender.addDestination("localhost:10000");
appender.setInitialSendDelay(Duration.buildByMilliseconds(1000));

// Capture time events are sent
final Map<ILoggingEvent, Long> tstamps = new HashMap<>();
appender.addListener(new TcpAppenderListener<ILoggingEvent>() {
@Override
public void eventSent(Appender<ILoggingEvent> appender, Socket socket, ILoggingEvent event, long durationInNanos) {
tstamps.put(event, System.currentTimeMillis());
}
});

appender.start(); // Note: will try to async establish the connection immediately

/*
* Send two events and wait until they are received
*/
long now = System.currentTimeMillis();
ILoggingEvent e1 = mock(ILoggingEvent.class);
ILoggingEvent e2 = mock(ILoggingEvent.class);

appender.append(e1);
appender.append(e2);

await().atMost(VERIFICATION_TIMEOUT, TimeUnit.MILLISECONDS).until(() -> tstamps.size() >= 2);

/*
* First event should be received not earlier than the configured "initialSendDelay"
* Second event should be received immediately after
*/
long e1Tstamp = tstamps.get(e1);
long e2Tstamp = tstamps.get(e2);

assertThat(e1Tstamp).isGreaterThan(now + appender.getInitialSendDelay().getMilliseconds());
assertThat(e2Tstamp).isGreaterThanOrEqualTo(e1Tstamp);
}


/**
* Assert that waiting for initialSendDelay is aborted immediately when the appender is stopped.
*/
@Test
public void testStopWhileWaitingInitialSendDelay() throws Exception {
appender.addDestination("localhost:10000");
appender.setInitialSendDelay(Duration.buildByMilliseconds(30000));
appender.setShutdownGracePeriod(Duration.buildByMilliseconds(0));

appender.start();
appender.append(event1);

// Wait for the connection to be established
verify(listener, async()).connectionOpened(appender, socket);

// Stop the appender
appender.stop();

// Assert that connection is closed before intiialSendDelay expires
// (Note: although it should happen "immediately" we allow for 2000ms delay to avoid flaky
// tests when host is overloaded)
verify(listener, timeout(2000).times(1)).connectionClosed(eq(appender), any());
}


@Test
public void testInvalidInitialSendDelay() {
assertThatThrownBy(() -> appender.setInitialSendDelay(null)).isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class);

assertThatCode(() -> appender.setInitialSendDelay(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
}


/**
* Schedule keep alive and make sure we got the expected amount of messages
Expand Down Expand Up @@ -568,7 +700,6 @@ public void testInvalidWriteTimeout() {
assertThatThrownBy(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(-1))).isInstanceOf(IllegalArgumentException.class);

assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
assertThatCode(() -> appender.setWriteTimeout(Duration.buildByMilliseconds(0))).doesNotThrowAnyException();
}


Expand Down