Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop event when Encoder fails to encode it before it becomes a "poison" event #649

Merged
merged 10 commits into from
Sep 30, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,6 @@ private class TcpSendingEventHandler implements EventHandler<LogEvent<Event>>, L
*/
private static final int MAX_REPEAT_CONNECTION_ERROR_LOG = 5;

/**
* Number of times we try to write an event before it is discarded.
* Between each attempt, the socket will be reconnected.
*/
private static final int MAX_REPEAT_WRITE_ATTEMPTS = 5;

/**
* The destination socket to which to send events.
*/
Expand Down Expand Up @@ -516,8 +510,7 @@ public void run() {
@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {

Exception sendFailureException = null;
for (int i = 0; i < MAX_REPEAT_WRITE_ATTEMPTS; i++) {
while (true) {
/*
* Save local references to the outputStream and socket
* in case the WriteTimeoutRunnable closes the socket.
Expand All @@ -531,56 +524,68 @@ public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch)
*
* This will occur if shutdown occurred during reopen()
*/
sendFailureException = SHUTDOWN_IN_PROGRESS_EXCEPTION;
fireEventSendFailure(logEvent.event, SHUTDOWN_IN_PROGRESS_EXCEPTION);
break;
}

/*
* If readerFuture.isDone(), then the destination has shut down its output (our input),
* and the destination is probably no longer listening to its input (our output).
* This will be the case for Amazon's Elastic Load Balancers (ELB)
* when an instance behind the ELB becomes unhealthy while we're connected to it.
*
* If socket == null here, it means that a write timed out,
* and the socket was closed by the WriteTimeoutRunnable.
*
* Therefore, attempt reconnection.
*/
Future<?> readerFuture = this.readerFuture; // volatile read
if (readerFuture.isDone() || socket == null) {
/*
* If readerFuture.isDone(), then the destination has shut down its output (our input),
* and the destination is probably no longer listening to its input (our output).
* This will be the case for Amazon's Elastic Load Balancers (ELB)
* when an instance behind the ELB becomes unhealthy while we're connected to it.
*
* If socket == null here, it means that a write timed out,
* and the socket was closed by the WriteTimeoutRunnable.
*
* Therefore, attempt reconnection.
*/
addInfo(peerId + "destination terminated the connection. Reconnecting.");
reopenSocket();
try {
readerFuture.get();
sendFailureException = NOT_CONNECTED_EXCEPTION;
} catch (Exception e) {
sendFailureException = e;
}

// TODO readerFuture refers to the "old" future and a new one has been started by reopenSocket...
// Why wait for the old to complete? Is it to reuse its thread and avoid creating/wasting a new one?
// In this case better to readerFuture.get() before reopenSocket()...
//
// try {
// readerFuture.get();
// sendFailureException = NOT_CONNECTED_EXCEPTION;
// } catch (Exception e) {
// sendFailureException = e;
// }
philsttr marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

/*
* Write the event in the output stream.
* Drop event if encoder throws an exception.
* Reconnect if an exception is thrown by the connection stream itself.
*/
try {
writeEvent(socket, outputStream, logEvent, endOfBatch);
return;

} catch (EncoderException e) {
/*
* Encoding threw an exception. Warn and drop event before it becomes a "poison".
*/
addWarn(peerId + "Encoder failed to encode event: " + e.getMessage() + ". Dropping event.", e);
fireEventSendFailure(logEvent.event, e);
break;
brenuart marked this conversation as resolved.
Show resolved Hide resolved

} catch (Exception e) {
sendFailureException = e;
addWarn(peerId + "unable to send event: " + e.getMessage() + " Reconnecting.", e);
/*
* Need to re-open the socket in case of IOExceptions.
*
* Reopening the socket probably won't help other exceptions
* (like NullPointerExceptions),
* but we're doing so anyway, just in case.
* Any other exception is thrown by the socket stream (or bug in the code).
* Re-open the socket and get a fresh new stream.
*/
addWarn(peerId + "unable to send event: " + e.getMessage() + " Reconnecting.", e);
reopenSocket();
}
philsttr marked this conversation as resolved.
Show resolved Hide resolved
}

if (logEvent.event != null) {
fireEventSendFailure(logEvent.event, sendFailureException);
}
}

private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event> logEvent, boolean endOfBatch) throws IOException {
private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event> logEvent, boolean endOfBatch) throws IOException, EncoderException {

long startWallTime = System.currentTimeMillis();
long startNanoTime = System.nanoTime();
Expand Down Expand Up @@ -624,20 +629,29 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event


@SuppressWarnings("unchecked")
private void encode(Event event, OutputStream outputStream) throws IOException {
private void encode(Event event, OutputStream outputStream) throws IOException, EncoderException {
if (encoder instanceof StreamingEncoder) {
/*
* Generate content in a temporary buffer to avoid writing "partial" content in the output
* Use a temporary buffer to avoid writing "partial" content in the output
* stream if the Encoder throws an exception.
*/
try {
((StreamingEncoder<Event>) encoder).encode(event, buffer);
try {
((StreamingEncoder<Event>) encoder).encode(event, buffer);
} catch (Exception e) {
throw new EncoderException(e);
}
buffer.writeTo(outputStream);
} finally {
buffer.reset();
}
} else {
byte[] data = encoder.encode(event);
byte[] data;
try {
data = encoder.encode(event);
} catch (Exception e) {
throw new EncoderException(e);
}
if (data != null) {
outputStream.write(data);
}
Expand Down Expand Up @@ -890,6 +904,17 @@ private synchronized void unscheduleWriteTimeout() {
}
}
}


/**
* Wrap exceptions thrown by {@link Encoder}
*/
@SuppressWarnings("serial")
private static class EncoderException extends Exception {
EncoderException(Throwable cause) {
super(cause);
}
}

/**
* An extension of logback's {@link ConfigurableSSLSocketFactory}
Expand Down Expand Up @@ -1044,11 +1069,15 @@ protected Future<?> scheduleReaderCallable(Callable<Void> readerCallable) {
}

protected void fireEventSent(Socket socket, Event event, long durationInNanos) {
safelyFireEvent(l -> l.eventSent(this, socket, event, durationInNanos));
if (event != null) {
safelyFireEvent(l -> l.eventSent(this, socket, event, durationInNanos));
}
}

protected void fireEventSendFailure(Event event, Throwable reason) {
safelyFireEvent(l -> l.eventSendFailure(this, event, reason));
if (event != null) {
safelyFireEvent(l -> l.eventSendFailure(this, event, reason));
}
philsttr marked this conversation as resolved.
Show resolved Hide resolved
}

protected void fireConnectionOpened(Socket socket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand Down Expand Up @@ -56,11 +57,13 @@
import net.logstash.logback.appender.destination.RoundRobinDestinationConnectionStrategy;
import net.logstash.logback.appender.listener.TcpAppenderListener;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.encoder.StreamingEncoder;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.BasicStatusManager;
import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
import ch.qos.logback.core.status.OnConsoleStatusListener;
import ch.qos.logback.core.status.Status;
import ch.qos.logback.core.status.StatusManager;
Expand Down Expand Up @@ -625,6 +628,108 @@ public void testReconnectToSecondaryOnKeepAlive() throws Exception {
}


/**
* Assert that nothing is written in the socket output stream when a *non* {@link StreamingEncoder}
* throws an exception.
*/
@Test
public void testEncoderThrowsException() throws Exception {
// Use a ByteArrayOutputStream to capture actual output
ByteArrayOutputStream bos = new ByteArrayOutputStream();
when(socket.getOutputStream())
.thenReturn(bos);

// Encoder throws an exception
when(encoder.encode(event1)).thenThrow(new RuntimeException("Exception thrown by the Encoder"));

// Configure and start appender
appender.addDestination("localhost:10000");
appender.start();


// This event will cause the encoder to throw an exception
appender.append(event1);

// Event dropped
verify(listener, async()).eventSendFailure(eq(appender), eq(event1), any());

// Nothing written in the socket output stream
assertThat(bos.size()).isZero();

// A warn status is emitted
assertThat(statusManager.getCopyOfStatusList()).anySatisfy(status -> {
assertThat(status.getLevel()).isEqualTo(Status.WARN);
assertThat(status.getMessage()).contains("Encoder failed to encode event:");
});
}


/**
* Assert that nothing is written in the socket output stream when a {@link StreamingEncoder} throws
* an exception after having written a few bytes.
*
* Also assert that the StreamingEncoder interface is used instead of the legacy Encoder.
*/
@Test
public void testStreamingEncoderThrowsException() throws Exception {
// Use a ByteArrayOutputStream to capture actual output
ByteArrayOutputStream bos = new ByteArrayOutputStream();
when(socket.getOutputStream())
.thenReturn(bos);

// StreamingEncoder throwing an exception
BadStreamingEncoder badEncoder = spy(new BadStreamingEncoder());
appender.setEncoder(badEncoder);

// Configure and start appender
appender.addDestination("localhost:10000");
appender.start();


// This event will cause the encoder to throw an exception
appender.append(event1);

// Event dropped
verify(listener, async()).eventSendFailure(eq(appender), eq(event1), any());

// Streaming interface used instead of standard Encoder
verify(badEncoder, times(1)).encode(eq(event1), any(OutputStream.class));
verify(badEncoder, never()).encode(any());

// Nothing written in the socket output stream
assertThat(bos.size()).isZero();

// A warn status is emitted
assertThat(statusManager.getCopyOfStatusList()).anySatisfy(status -> {
assertThat(status.getLevel()).isEqualTo(Status.WARN);
assertThat(status.getMessage()).contains("Encoder failed to encode event:");
});
}

private static class BadStreamingEncoder extends EncoderBase<ILoggingEvent> implements StreamingEncoder<ILoggingEvent> {
@Override
public byte[] headerBytes() {
return null;
}

@Override
public byte[] encode(ILoggingEvent event) {
return null;
}

@Override
public byte[] footerBytes() {
return null;
}

@Override
public void encode(ILoggingEvent event, OutputStream outputStream) throws IOException {
outputStream.write("First few bytes".getBytes());
throw new IOException("Exception thrown after some bytes are written");
}
}


/**
* At least one valid destination must be configured.
* The appender refuses to start in case of error.
Expand Down