Skip to content

Commit

Permalink
Merge pull request #472 from brenuart/gh461
Browse files Browse the repository at this point in the history
Reduce memory allocations by writing directly into the output stream (#461)
  • Loading branch information
philsttr authored Jul 5, 2021
2 parents ef9dced + 513da38 commit fa81678
Show file tree
Hide file tree
Showing 12 changed files with 895 additions and 260 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,10 @@ The logstash-logback-encoder library contains many providers out-of-the-box,
and you can even plug-in your own by extending `JsonProvider`.
Each provider has its own configuration options to further customize it.

These encoders/layouts make use of an internal buffer to hold the JSON output during the rendering process.
The size of this buffer is set to `1024` bytes by default. A different size can be configured by setting the `minBufferSize` property to the desired value.
The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector.


#### Providers for LoggingEvents

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy;
import net.logstash.logback.appender.listener.TcpAppenderListener;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.encoder.StreamingEncoder;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.joran.spi.DefaultClass;
Expand All @@ -59,10 +64,6 @@
import ch.qos.logback.core.util.CloseUtil;
import ch.qos.logback.core.util.Duration;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;

/**
* An {@link AsyncDisruptorAppender} appender that writes
* events to a TCP {@link Socket} outputStream.
Expand Down Expand Up @@ -584,7 +585,7 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
* This is a standard (non-keepAlive) event.
* Therefore, we need to send the event.
*/
outputStream.write(encoder.encode(logEvent.event));
encode(logEvent.event, outputStream);
} else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)) {
/*
* This is a keep alive event, and the keepAliveDuration has passed,
Expand Down Expand Up @@ -612,6 +613,20 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
}
}


@SuppressWarnings("unchecked")
private void encode(Event event, OutputStream outputStream) throws IOException {
if (encoder instanceof StreamingEncoder) {
((StreamingEncoder<Event>) encoder).encode(event, outputStream);
} else {
byte[] data = encoder.encode(event);
if (data != null) {
outputStream.write(data);
}
}
}


private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentNanoTime) {
return isKeepAliveEnabled()
&& lastSentNanoTime + TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) < currentNanoTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,26 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.lang.ref.SoftReference;
import java.util.ServiceConfigurationError;

import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.decorate.NullJsonFactoryDecorator;
import net.logstash.logback.decorate.NullJsonGeneratorDecorator;
import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.ContextAware;
import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.spi.LifeCycle;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import com.fasterxml.jackson.core.util.BufferRecycler;
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.ContextAware;
import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import ch.qos.logback.core.spi.LifeCycle;

/**
* Formats logstash Events as JSON using {@link JsonProvider}s.
* <p>
Expand All @@ -52,18 +49,6 @@
public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAware>
extends ContextAwareBase implements LifeCycle {

/**
* This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftReference}
* to a {@link BufferRecycler} used to provide a low-cost
* buffer recycling between writer instances.
*/
private final ThreadLocal<SoftReference<BufferRecycler>> recycler = new ThreadLocal<SoftReference<BufferRecycler>>() {
protected SoftReference<BufferRecycler> initialValue() {
final BufferRecycler bufferRecycler = new BufferRecycler();
return new SoftReference<BufferRecycler>(bufferRecycler);
}
};

/**
* Used to create the necessary {@link JsonGenerator}s for generating JSON.
*/
Expand Down Expand Up @@ -134,35 +119,7 @@ private JsonFactory createJsonFactory() {
}
}

JsonFactory jsonFactory = objectMapper
.getFactory()
/*
* When generators are flushed, don't flush the underlying outputStream.
*
* This allows some streaming optimizations when using an encoder.
*
* The encoder generally determines when the stream should be flushed
* by an 'immediateFlush' property.
*
* The 'immediateFlush' property of the encoder can be set to false
* when the appender performs the flushes at appropriate times
* (such as the end of a batch in the AbstractLogstashTcpSocketAppender).
*/
.disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM);

return this.jsonFactoryDecorator.decorate(jsonFactory);
}

public byte[] writeEventAsBytes(Event event) throws IOException {
ByteArrayBuilder outputStream = new ByteArrayBuilder(getBufferRecycler());

try {
writeEventToOutputStream(event, outputStream);
outputStream.flush();
return outputStream.toByteArray();
} finally {
outputStream.release();
}
return this.jsonFactoryDecorator.decorate(objectMapper.getFactory());
}

public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException {
Expand All @@ -177,13 +134,9 @@ public void writeEventToOutputStream(Event event, OutputStream outputStream) thr
*/
}

public String writeEventAsString(Event event) throws IOException {
SegmentedStringWriter writer = new SegmentedStringWriter(getBufferRecycler());

public void writeEventToWriter(Event event, Writer writer) throws IOException {
try (JsonGenerator generator = createGenerator(writer)) {
writeEventToGenerator(generator, event);
writer.flush();
return writer.getAndClear();
}
}

Expand All @@ -203,23 +156,36 @@ protected void prepareForDeferredProcessing(Event event) {
}

private JsonGenerator createGenerator(OutputStream outputStream) throws IOException {
return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(outputStream, encoding));
return decorateGenerator(jsonFactory.createGenerator(outputStream, encoding));
}

private JsonGenerator createGenerator(Writer writer) throws IOException {
return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(writer));
}

private BufferRecycler getBufferRecycler() {
SoftReference<BufferRecycler> bufferRecyclerReference = recycler.get();
BufferRecycler bufferRecycler = bufferRecyclerReference.get();
if (bufferRecycler == null) {
recycler.remove();
return getBufferRecycler();
}
return bufferRecycler;
}

return decorateGenerator(jsonFactory.createGenerator(writer));
}

private JsonGenerator decorateGenerator(JsonGenerator generator) {
return this.jsonGeneratorDecorator.decorate(generator)
/*
* When generators are flushed, don't flush the underlying outputStream.
*
* This allows some streaming optimizations when using an encoder.
*
* The encoder generally determines when the stream should be flushed
* by an 'immediateFlush' property.
*
* The 'immediateFlush' property of the encoder can be set to false
* when the appender performs the flushes at appropriate times
* (such as the end of a batch in the AbstractLogstashTcpSocketAppender).
*/
.disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)

/*
* Don't let the json generator close the underlying outputStream and let the
* encoder managed it.
*/
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
}

public JsonFactory getJsonFactory() {
return jsonFactory;
}
Expand Down
Loading

0 comments on commit fa81678

Please sign in to comment.