From 5b3d0c5c9a885e5e4b20c62e0a24daff69a33eda Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Tue, 5 Jan 2021 15:58:20 +0100 Subject: [PATCH 1/6] Reduce GC pressure by writing directly into the outpustream instead of returning a byte array Introduce a new (internal) StreamingEncoder interface to be implemented by Encoders that supports writing directly into the output stream instead of returning their results as a byte array. Update both the AbstractLogstashTcpSocketAppender and the CompositeJsonEncoder to support this new interface. This should hoppefully reduce the amount of short-lived byte arrays created for each log event. See #461 for more information. --- .../AbstractLogstashTcpSocketAppender.java | 29 ++- .../logback/encoder/CompositeJsonEncoder.java | 80 +++--- .../logback/encoder/StreamingEncoder.java | 27 ++ .../encoder/CompositeJsonEncoderTest.java | 232 ++++++++++++------ 4 files changed, 252 insertions(+), 116 deletions(-) create mode 100644 src/main/java/net/logstash/logback/encoder/StreamingEncoder.java diff --git a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java index 6c23036b..c0d22014 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java @@ -48,6 +48,11 @@ import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy; import net.logstash.logback.appender.listener.TcpAppenderListener; import net.logstash.logback.encoder.SeparatorParser; +import net.logstash.logback.encoder.StreamingEncoder; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.RingBuffer; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.joran.spi.DefaultClass; @@ -59,10 +64,6 @@ import ch.qos.logback.core.util.CloseUtil; import ch.qos.logback.core.util.Duration; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.RingBuffer; - /** * An {@link AsyncDisruptorAppender} appender that writes * events to a TCP {@link Socket} outputStream. @@ -594,8 +595,9 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent)encoder).encode(event, outputStream); + } + else { + byte[] data = encoder.encode(event); + if (data!=null) { + outputStream.write(data); + } + } + } + + private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentNanoTime) { return isKeepAliveEnabled() && lastSentNanoTime + TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) < currentNanoTime; diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 8385b888..e6266161 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -15,12 +15,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.Charset; import net.logstash.logback.composite.CompositeJsonFormatter; import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; + import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.encoder.LayoutWrappingEncoder; @@ -28,7 +30,7 @@ import ch.qos.logback.core.spi.DeferredProcessingAware; public abstract class CompositeJsonEncoder - extends EncoderBase { + extends EncoderBase implements StreamingEncoder { private static final byte[] EMPTY_BYTES = new byte[0]; @@ -60,54 +62,48 @@ public CompositeJsonEncoder() { protected abstract CompositeJsonFormatter createFormatter(); @Override - public byte[] encode(Event event) { + public void encode(Event event, OutputStream outputStream) throws IOException { if (!isStarted()) { throw new IllegalStateException("Encoder is not started"); } - byte[] prefixBytes = doEncodeWrappedToBytes(prefix, event); - byte[] suffixBytes = doEncodeWrappedToBytes(suffix, event); + if (!isStarted()) { + throw new IllegalStateException("Encoder is not started."); + } - ByteArrayOutputStream outputStream = new ByteArrayOutputStream( - minBufferSize - + (prefixBytes == null ? 0 : prefixBytes.length) - + (suffixBytes == null ? 0 : suffixBytes.length) - + lineSeparatorBytes.length); - try { - if (prefixBytes != null) { - outputStream.write(prefixBytes); - } - - formatter.writeEventToOutputStream(event, outputStream); - - if (suffixBytes != null) { - outputStream.write(suffixBytes); - } - - outputStream.write(lineSeparatorBytes); - + encode(prefix, event, outputStream); + formatter.writeEventToOutputStream(event, outputStream); + encode(suffix, event, outputStream); + + outputStream.write(lineSeparatorBytes); + } + + @Override + public byte[] encode(Event event) { + try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream(getMinBufferSize())) { + encode(event, outputStream); return outputStream.toByteArray(); - } catch (IOException e) { - addWarn("Error encountered while encoding log event. " - + "Event: " + event, e); - return EMPTY_BYTES; - } finally { - try { - outputStream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } } + catch (IOException e) { + addWarn("Error encountered while encoding log event. Event: " + event, e); + return EMPTY_BYTES; + } } - private byte[] doEncodeWrappedToBytes(Encoder wrapped, Event event) { - if (wrapped != null) { - return wrapped.encode(event); + private void encode(Encoder encoder, Event event, OutputStream outputStream) throws IOException { + if (encoder!=null) { + byte[] data = encoder.encode(event); + if (data!=null) { + outputStream.write(data); + } } - return EMPTY_BYTES; } @Override public void start() { + if (isStarted()) { + return; + } + super.start(); formatter.setContext(getContext()); formatter.start(); @@ -154,14 +150,16 @@ private void startWrapped(Encoder wrapped) { @Override public void stop() { - super.stop(); - formatter.stop(); - stopWrapped(prefix); - stopWrapped(suffix); + if (isStarted()) { + super.stop(); + formatter.stop(); + stopWrapped(prefix); + stopWrapped(suffix); + } } private void stopWrapped(Encoder wrapped) { - if (wrapped != null && !wrapped.isStarted()) { + if (wrapped != null && wrapped.isStarted()) { wrapped.stop(); } } diff --git a/src/main/java/net/logstash/logback/encoder/StreamingEncoder.java b/src/main/java/net/logstash/logback/encoder/StreamingEncoder.java new file mode 100644 index 00000000..867dd8b4 --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/StreamingEncoder.java @@ -0,0 +1,27 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Interface implemented by {@link Encoder} that supports writing directly into a {@link OutputStream} + * instead of returning a byte array. + */ +public interface StreamingEncoder { + + public void encode(Event event, OutputStream outputStream) throws IOException; + +} diff --git a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java index 857fcac5..367c6665 100644 --- a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java +++ b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java @@ -14,21 +14,25 @@ package net.logstash.logback.encoder; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import net.logstash.logback.TestJsonProvider; import net.logstash.logback.composite.CompositeJsonFormatter; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -36,9 +40,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.core.JsonEncoding; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Context; import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.encoder.LayoutWrappingEncoder; import ch.qos.logback.core.status.StatusManager; import ch.qos.logback.core.status.WarnStatus; @@ -47,16 +54,10 @@ @ExtendWith(MockitoExtension.class) public class CompositeJsonEncoderTest { - private final CompositeJsonFormatter formatter = mock(CompositeJsonFormatter.class); - @InjectMocks - private final CompositeJsonEncoder encoder = new CompositeJsonEncoder() { - - @Override - protected CompositeJsonFormatter createFormatter() { - return formatter; - } - }; + private final CompositeJsonEncoder encoder = new TestCompositeJsonEncoder(); + + private CompositeJsonFormatter formatter; @Mock(lenient = true) private Context context; @@ -69,119 +70,212 @@ protected CompositeJsonFormatter createFormatter() { @BeforeEach public void setup() { - when(formatter.getEncoding()).thenReturn("UTF-8"); + // suppress line separator to make test platform independent + this.encoder.setLineSeparator(""); + this.formatter = encoder.getFormatter(); + when(context.getStatusManager()).thenReturn(statusManager); } + @Test - public void testNoPrefixNoSuffix_logback12OrLater() throws IOException { - - encoder.start(); + public void startStop() { + Encoder prefix = spy(new TestEncoder("prefix")); + encoder.setPrefix(prefix); - Assertions.assertTrue(encoder.isStarted()); + // stopped by default + assertThat(encoder.isStarted()).isFalse(); + assertThat(formatter.isStarted()).isFalse(); + assertThat(prefix.isStarted()).isFalse(); + // start encoder + encoder.start(); + assertThat(encoder.isStarted()).isTrue(); + assertThat(formatter.isStarted()).isTrue(); + assertThat(prefix.isStarted()).isTrue(); verify(formatter).setContext(context); - verify(formatter).start(); - byte[] encoded = encoder.encode(event); - - verify(formatter).writeEventToOutputStream(eq(event), any(OutputStream.class)); + // providers are not started a second time + encoder.start(); + verify(formatter, times(1)).start(); + verify(prefix, times(1)).start(); - assertThat(encoded).containsExactly(System.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)); + // stop encoder + encoder.stop(); + assertThat(encoder.isStarted()).isFalse(); + assertThat(formatter.isStarted()).isFalse(); + assertThat(prefix.isStarted()).isFalse(); + // providers are not stopped a second time encoder.stop(); - Assertions.assertFalse(encoder.isStarted()); - verify(formatter).stop(); + verify(formatter, times(1)).stop(); + verify(prefix, times(1)).stop(); } + @Test - public void testPrefixAndSuffix_logback12OrLater() throws IOException { - - LayoutWrappingEncoder prefix = mock(LayoutWrappingEncoder.class); - Encoder suffix = mock(Encoder.class); + public void encode_noPrefixSuffix() { + encoder.start(); + assertThat(new String(encoder.encode(event))).isEqualTo("{}"); + } + + + /* + * Encode log event with prefix and suffix encoders. + * + * NOTE: Encoder#headerBytes and Encoder#footerBytes are ignored + */ + @Test + public void encode_withPrefixSuffix() { + encoder.setPrefix( new TestEncoder("prefix") ); + encoder.setSuffix( new TestEncoder("suffix") ); + encoder.start(); - when(prefix.encode(event)).thenReturn("prefix".getBytes(StandardCharsets.UTF_8)); - when(suffix.encode(event)).thenReturn("suffix".getBytes(StandardCharsets.UTF_8)); + assertThat(new String(encoder.encode(event))).isEqualTo("prefix/event{}suffix/event"); + } + + + /* + * Use a custom line separator + */ + @Test + public void encode_customLineSeparator() { + encoder.setLineSeparator("-"); + encoder.start(); + assertThat(new String(encoder.encode(event))).isEqualTo("{}-"); + } + + + /* + * Prefix/Suffix of type LayoutWrappingEncoder have their charset set to the same value + * as the Formatter used by the CompositeJsonEncoder + */ + @Test + public void charsetOnLayoutWrappingEncoder() { + formatter.setEncoding(JsonEncoding.UTF16_BE.getJavaName()); // use an encoding that is not likely to be + // the default to avoid false positives + LayoutWrappingEncoder prefix = mock(LayoutWrappingEncoder.class); encoder.setPrefix(prefix); - encoder.setSuffix(suffix); encoder.start(); - Assertions.assertTrue(encoder.isStarted()); - - verify(formatter).setContext(context); - verify(formatter).start(); - - verify(prefix).setCharset(StandardCharsets.UTF_8); - verify(prefix).start(); - verify(suffix).start(); - - byte[] encoded = encoder.encode(event); - - verify(prefix).encode(event); - verify(suffix).encode(event); - - verify(formatter).writeEventToOutputStream(eq(event), any(OutputStream.class)); + verify(prefix).setCharset(StandardCharsets.UTF_16BE); + } + + + /* + * Encode using the StreamingEncoder API + */ + @Test + public void streamingEncode() { + encoder.start(); - assertThat(encoded).containsExactly(("prefixsuffix" + System.getProperty("line.separator")).getBytes(StandardCharsets.UTF_8)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + assertThatNoException().isThrownBy(() -> encoder.encode(event, bos)); - encoder.stop(); - Assertions.assertFalse(encoder.isStarted()); - verify(formatter).stop(); - verify(prefix).stop(); - verify(suffix).stop(); + assertThat(new String(bos.toByteArray())).isEqualTo("{}"); } + + /* + * Test decoding of special line separators + */ @Test public void testLineEndings() { + // Use a brand new default instance to get rid of configuration done by the #setup() method + TestCompositeJsonEncoder encoder = new TestCompositeJsonEncoder(); - Assertions.assertEquals(System.getProperty("line.separator"), encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isEqualTo(System.getProperty("line.separator")); encoder.setLineSeparator("UNIX"); - Assertions.assertEquals("\n", encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isEqualTo("\n"); encoder.setLineSeparator(null); - Assertions.assertEquals(null, encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isNull(); encoder.setLineSeparator("WINDOWS"); - Assertions.assertEquals("\r\n", encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isEqualTo("\r\n"); encoder.setLineSeparator("foo"); - Assertions.assertEquals("foo", encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isEqualTo("foo"); encoder.setLineSeparator("SYSTEM"); - Assertions.assertEquals(System.getProperty("line.separator"), encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isEqualTo(System.getProperty("line.separator")); encoder.setLineSeparator(""); - Assertions.assertEquals(null, encoder.getLineSeparator()); + assertThat(encoder.getLineSeparator()).isNull(); } + + /* + * Failure to encode log event should log an warning status + */ @Test - public void testIOException_logback12OrLater() throws IOException { - + public void testIOException() throws IOException { encoder.start(); - Assertions.assertTrue(encoder.isStarted()); - - verify(formatter).setContext(context); - verify(formatter).start(); - IOException exception = new IOException(); + doThrow(exception).when(formatter).writeEventToOutputStream(eq(event), any(OutputStream.class)); encoder.encode(event); - Assertions.assertTrue(encoder.isStarted()); - verify(statusManager).add(new WarnStatus("Error encountered while encoding log event. " + "Event: " + event, context, exception)); } + + /* + * StreamingEncoder re-throws the IOException to the caller and does not log any warning + */ @Test - public void notStarted() { + public void testIOException_streaming() throws IOException { + encoder.start(); + + IOException exception = new IOException(); + + OutputStream stream = mock(OutputStream.class); + doThrow(exception).when(stream).write(any(byte[].class), any(int.class), any(int.class)); + + assertThatCode(() -> encoder.encode(event, stream)).isInstanceOf(IOException.class); + + verify(statusManager, never()).add(new WarnStatus("Error encountered while encoding log event. " + + "Event: " + event, context, exception)); + } + + + // ---------------------------------------------------------------------------------------------------------------- + + + private static class TestCompositeJsonEncoder extends CompositeJsonEncoder { + @Override + protected CompositeJsonFormatter createFormatter() { + CompositeJsonFormatter formatter = spy(new CompositeJsonFormatter(this) {}); + formatter.getProviders().addProvider(new TestJsonProvider()); + return formatter; + } + } + + private static class TestEncoder extends EncoderBase { + private final String name; + + this.name = name; + } + + public byte[] encode(ILoggingEvent event) { + return getBytes(name+"/event"); + } assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> encoder.encode(event)) + public byte[] footerBytes() { + return getBytes(name+"/footer"); + } .withMessage("Encoder is not started"); + public byte[] headerBytes() { + return getBytes(name+"/header"); + } + private byte[] getBytes(String s) { + return s.getBytes(); + } } - } From abadd8b38dbf8cf8e2d9714e53055bb38d660972 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Tue, 5 Jan 2021 18:57:06 +0100 Subject: [PATCH 2/6] Make use of reusable thread-bound buffers instead of allocating a new ByteArrayOutputStream for every log event --- .../composite/CompositeJsonFormatter.java | 58 ++++--------- .../logback/encoder/CompositeJsonEncoder.java | 25 ++++-- .../logback/util/ThreadLocalBuffers.java | 87 +++++++++++++++++++ 3 files changed, 123 insertions(+), 47 deletions(-) create mode 100644 src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index af1ab6a6..75f47c89 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -16,29 +16,28 @@ import java.io.IOException; import java.io.OutputStream; import java.io.Writer; -import java.lang.ref.SoftReference; import java.util.ServiceConfigurationError; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.decorate.NullJsonFactoryDecorator; import net.logstash.logback.decorate.NullJsonGeneratorDecorator; -import ch.qos.logback.access.spi.IAccessEvent; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.spi.ContextAware; -import ch.qos.logback.core.spi.ContextAwareBase; -import ch.qos.logback.core.spi.DeferredProcessingAware; -import ch.qos.logback.core.spi.LifeCycle; +import net.logstash.logback.util.ThreadLocalBuffers; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.io.SegmentedStringWriter; -import com.fasterxml.jackson.core.util.BufferRecycler; -import com.fasterxml.jackson.core.util.ByteArrayBuilder; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import ch.qos.logback.access.spi.IAccessEvent; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.ContextAware; +import ch.qos.logback.core.spi.ContextAwareBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; +import ch.qos.logback.core.spi.LifeCycle; + /** * Formats logstash Events as JSON using {@link JsonProvider}s. *

@@ -53,16 +52,10 @@ public abstract class CompositeJsonFormatterThreadLocal contains a {@link java.lang.ref.SoftReference} - * to a {@link BufferRecycler} used to provide a low-cost - * buffer recycling between writer instances. + * Provides thread local buffers */ - private final ThreadLocal> recycler = new ThreadLocal>() { - protected SoftReference initialValue() { - final BufferRecycler bufferRecycler = new BufferRecycler(); - return new SoftReference(bufferRecycler); - } - }; + private final ThreadLocalBuffers buffers = new ThreadLocalBuffers(); + /** * Used to create the necessary {@link JsonGenerator}s for generating JSON. @@ -153,18 +146,6 @@ private JsonFactory createJsonFactory() { return this.jsonFactoryDecorator.decorate(jsonFactory); } - public byte[] writeEventAsBytes(Event event) throws IOException { - ByteArrayBuilder outputStream = new ByteArrayBuilder(getBufferRecycler()); - - try { - writeEventToOutputStream(event, outputStream); - outputStream.flush(); - return outputStream.toByteArray(); - } finally { - outputStream.release(); - } - } - public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException { try (JsonGenerator generator = createGenerator(outputStream)) { writeEventToGenerator(generator, event); @@ -178,13 +159,16 @@ public void writeEventToOutputStream(Event event, OutputStream outputStream) thr } public String writeEventAsString(Event event) throws IOException { - SegmentedStringWriter writer = new SegmentedStringWriter(getBufferRecycler()); + SegmentedStringWriter writer = this.buffers.getStringWriter(); try (JsonGenerator generator = createGenerator(writer)) { writeEventToGenerator(generator, event); - writer.flush(); return writer.getAndClear(); } + finally { + // make sure the buffer is released! + writer.getAndClear(); + } } protected void writeEventToGenerator(JsonGenerator generator, Event event) throws IOException { @@ -209,16 +193,6 @@ private JsonGenerator createGenerator(OutputStream outputStream) throws IOExcept private JsonGenerator createGenerator(Writer writer) throws IOException { return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(writer)); } - - private BufferRecycler getBufferRecycler() { - SoftReference bufferRecyclerReference = recycler.get(); - BufferRecycler bufferRecycler = bufferRecyclerReference.get(); - if (bufferRecycler == null) { - recycler.remove(); - return getBufferRecycler(); - } - return bufferRecycler; - } public JsonFactory getJsonFactory() { return jsonFactory; diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index e6266161..27eb5f76 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -13,7 +13,6 @@ */ package net.logstash.logback.encoder; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -22,6 +21,9 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; +import net.logstash.logback.util.ThreadLocalBuffers; + +import com.fasterxml.jackson.core.util.ByteArrayBuilder; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; @@ -43,6 +45,11 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; @@ -54,6 +61,7 @@ public abstract class CompositeJsonEncoder encoder, Event event, OutputStream outputStream) throws IOException { diff --git a/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java b/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java new file mode 100644 index 00000000..48296b13 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java @@ -0,0 +1,87 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.lang.ref.SoftReference; + +import com.fasterxml.jackson.core.io.SegmentedStringWriter; +import com.fasterxml.jackson.core.util.BufferRecycler; +import com.fasterxml.jackson.core.util.ByteArrayBuilder; + +/** + * Utility to maintain thread-bound buffers. + */ +public class ThreadLocalBuffers { + + /** + * This ThreadLocal contains a {@link java.lang.ref.SoftReference} + * to a {@link BufferRecycler} used to provide a low-cost + * buffer recycling between writer instances. + */ + private final ThreadLocal> recycler = new ThreadLocal>() { + protected SoftReference initialValue() { + final BufferRecycler bufferRecycler = new BufferRecycler(); + return new SoftReference(bufferRecycler); + } + }; + + + private BufferRecycler getBufferRecycler() { + SoftReference bufferRecyclerReference = recycler.get(); + BufferRecycler bufferRecycler = bufferRecyclerReference.get(); + if (bufferRecycler == null) { + recycler.remove(); + return getBufferRecycler(); + } + return bufferRecycler; + } + + + /** + * Get a thread-bound {@link ByteArrayBuilder} with a default initial size. + * + *

Do not forget to release the buffer by calling {@link ByteArrayBuilder#release()} + * when done otherwise the memory allocated by the buffer will not be recycled and new + * memory allocations will occur for every subsequent buffers. + * + * @return a thread-bound {@link ByteArrayBuilder} + */ + public ByteArrayBuilder getByteBuffer() { + return new ByteArrayBuilder(getBufferRecycler()); + } + + /** + * Get a thread-bound {@link ByteArrayBuilder} with the specified initial size. + * A buffer of the default size is allocated with the initial size is less than the default. + * + *

Do not forget to release the buffer by calling {@link ByteArrayBuilder#release()} + * when done otherwise the memory allocated by the buffer will not be recycled and new + * memory allocations will occur for every subsequent buffers. + * + * @param initialSize the initial size of the buffer + * @return a thread-bound {@link ByteArrayBuilder} + */ + public ByteArrayBuilder getByteBuffer(int initialSize) { + return new ByteArrayBuilder(getBufferRecycler(), initialSize); + } + + /** + * Get a thread-bound {@link SegmentedStringWriter}. + * + * @return a thread-bound {@link SegmentedStringWriter} + */ + public SegmentedStringWriter getStringWriter() { + return new SegmentedStringWriter(getBufferRecycler()); + } +} From 774d0d664b608b794d2f70067ee1aa4c4e3f83a1 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Sat, 9 Jan 2021 23:51:08 +0100 Subject: [PATCH 3/6] Assert encoder reports it is not started when encode() is called --- .../encoder/CompositeJsonEncoderTest.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java index 367c6665..00dd2fbf 100644 --- a/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java +++ b/src/test/java/net/logstash/logback/encoder/CompositeJsonEncoderTest.java @@ -15,6 +15,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; @@ -113,6 +115,13 @@ public void startStop() { } + @Test + public void notStarted() { + assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> encoder.encode(event)) + .withMessage("Encoder is not started"); + } + + @Test public void encode_noPrefixSuffix() { encoder.start(); @@ -260,22 +269,24 @@ protected CompositeJsonFormatter createFormatter() { private static class TestEncoder extends EncoderBase { private final String name; + public TestEncoder(String name) { this.name = name; } public byte[] encode(ILoggingEvent event) { return getBytes(name+"/event"); } - assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> encoder.encode(event)) + public byte[] footerBytes() { return getBytes(name+"/footer"); } - .withMessage("Encoder is not started"); + public byte[] headerBytes() { return getBytes(name+"/header"); - } + } + private byte[] getBytes(String s) { return s.getBytes(); } } -} +} \ No newline at end of file From 7b128141ff2fbb49342a004d1a2ad7d2811b975b Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Mon, 5 Jul 2021 17:29:56 +0200 Subject: [PATCH 4/6] Alternate strategy to reuse buffers --- .../composite/CompositeJsonFormatter.java | 29 +-- .../logback/encoder/CompositeJsonEncoder.java | 58 +++-- .../logback/layout/CompositeJsonLayout.java | 111 +++++--- .../logback/util/ReusableByteBuffer.java | 245 ++++++++++++++++++ .../logback/util/ReusableByteBuffers.java | 99 +++++++ .../logback/util/ThreadLocalBuffers.java | 87 ------- ...oggingEventCompositeJsonFormatterTest.java | 6 +- .../layout/CompositeJsonLayoutTest.java | 78 +++--- .../logback/util/ReusableBufferTests.java | 103 ++++++++ 9 files changed, 621 insertions(+), 195 deletions(-) create mode 100644 src/main/java/net/logstash/logback/util/ReusableByteBuffer.java create mode 100644 src/main/java/net/logstash/logback/util/ReusableByteBuffers.java delete mode 100644 src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java create mode 100644 src/test/java/net/logstash/logback/util/ReusableBufferTests.java diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index 75f47c89..c077ff48 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -22,12 +22,10 @@ import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.decorate.NullJsonFactoryDecorator; import net.logstash.logback.decorate.NullJsonGeneratorDecorator; -import net.logstash.logback.util.ThreadLocalBuffers; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.io.SegmentedStringWriter; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -50,12 +48,6 @@ */ public abstract class CompositeJsonFormatter extends ContextAwareBase implements LifeCycle { - - /** - * Provides thread local buffers - */ - private final ThreadLocalBuffers buffers = new ThreadLocalBuffers(); - /** * Used to create the necessary {@link JsonGenerator}s for generating JSON. @@ -141,8 +133,16 @@ private JsonFactory createJsonFactory() { * when the appender performs the flushes at appropriate times * (such as the end of a batch in the AbstractLogstashTcpSocketAppender). */ - .disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM); + .disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM) + /* + * Don't let the json generator close the underlying outputStream and let the + * encoder managed it. + */ + .disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + + //FIXME these settings are mandatory - should we instead apply them *after* jsonGeneratorDecorator is applied (ie. in createGenerator()) ? + return this.jsonFactoryDecorator.decorate(jsonFactory); } @@ -158,16 +158,9 @@ public void writeEventToOutputStream(Event event, OutputStream outputStream) thr */ } - public String writeEventAsString(Event event) throws IOException { - SegmentedStringWriter writer = this.buffers.getStringWriter(); - + public void writeEventToWriter(Event event, Writer writer) throws IOException { try (JsonGenerator generator = createGenerator(writer)) { writeEventToGenerator(generator, event); - return writer.getAndClear(); - } - finally { - // make sure the buffer is released! - writer.getAndClear(); } } @@ -189,7 +182,7 @@ protected void prepareForDeferredProcessing(Event event) { private JsonGenerator createGenerator(OutputStream outputStream) throws IOException { return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(outputStream, encoding)); } - + private JsonGenerator createGenerator(Writer writer) throws IOException { return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(writer)); } diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 27eb5f76..a25cf811 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -21,9 +21,8 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; -import net.logstash.logback.util.ThreadLocalBuffers; - -import com.fasterxml.jackson.core.util.ByteArrayBuilder; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBuffers; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; @@ -37,19 +36,23 @@ public abstract class CompositeJsonEncoderThe buffer automatically grows above the {@code #minBufferSize} when needed to + * accommodate with larger events. However, only the first {@code minBufferSize} bytes + * will be reused by subsequent invocations. It is therefore strongly advised to set + * the minimum size at least equal to the average size of the encoded events to reduce + * unnecessary memory allocations and reduce pressure on the garbage collector. */ private int minBufferSize = 1024; /** - * Provides thread local buffers + * Provides reusable byte buffers (initialized when the encoder is started). */ - private final ThreadLocalBuffers buffers = new ThreadLocalBuffers(); - + private ReusableByteBuffers bufferPool; + private Encoder prefix; private Encoder suffix; @@ -74,10 +77,7 @@ public void encode(Event event, OutputStream outputStream) throws IOException { if (!isStarted()) { throw new IllegalStateException("Encoder is not started"); } - if (!isStarted()) { - throw new IllegalStateException("Encoder is not started."); - } - + encode(prefix, event, outputStream); formatter.writeEventToOutputStream(event, outputStream); encode(suffix, event, outputStream); @@ -87,20 +87,21 @@ public void encode(Event event, OutputStream outputStream) throws IOException { @Override public byte[] encode(Event event) { - ByteArrayBuilder buffer = null; + if (!isStarted()) { + throw new IllegalStateException("Encoder is not started"); + } + + ReusableByteBuffer buffer = bufferPool.getBuffer(); try { - buffer = this.buffers.getByteBuffer(getMinBufferSize()); encode(event, buffer); return buffer.toByteArray(); } - catch (IOException e) { + catch(IOException e) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; } finally { - if (buffer!=null) { - buffer.release(); - } + bufferPool.releaseBuffer(buffer); } } @@ -120,6 +121,7 @@ public void start() { } super.start(); + this.bufferPool = new ReusableByteBuffers(this.minBufferSize); formatter.setContext(getContext()); formatter.start(); charset = Charset.forName(formatter.getEncoding()); @@ -254,12 +256,20 @@ public void setLineSeparator(String lineSeparator) { public int getMinBufferSize() { return minBufferSize; } + /** - * Sets the minimum size of the byte array buffer used when - * encoding events in logback versions greater than or equal to 1.2.0. + * The minimum size of the byte buffer used when encoding events in logback versions + * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of + * the encoder. + * + *

The buffer automatically grows above the {@code #minBufferSize} when needed to + * accommodate with larger events. However, only the first {@code minBufferSize} bytes + * will be reused by subsequent invocations. It is therefore strongly advised to set + * the minimum size at least equal to the average size of the encoded events to reduce + * unnecessary memory allocations and reduce pressure on the garbage collector. * - * The actual buffer size will be the {@link #minBufferSize} - * plus the prefix, suffix, and line separators sizes. + *

Note: changes to the buffer size will not be taken into account after the encoder + * is started. */ public void setMinBufferSize(int minBufferSize) { this.minBufferSize = minBufferSize; diff --git a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java index 625d24b0..e1d44420 100644 --- a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java +++ b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java @@ -14,17 +14,22 @@ package net.logstash.logback.layout; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; import net.logstash.logback.composite.CompositeJsonFormatter; import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; +import net.logstash.logback.encoder.CompositeJsonEncoder; +import net.logstash.logback.encoder.SeparatorParser; +import net.logstash.logback.util.ReusableByteBuffer; +import net.logstash.logback.util.ReusableByteBuffers; + import ch.qos.logback.core.Layout; import ch.qos.logback.core.LayoutBase; import ch.qos.logback.core.pattern.PatternLayoutBase; import ch.qos.logback.core.spi.DeferredProcessingAware; -import net.logstash.logback.encoder.CompositeJsonEncoder; -import net.logstash.logback.encoder.SeparatorParser; public abstract class CompositeJsonLayout extends LayoutBase { @@ -43,6 +48,25 @@ public abstract class CompositeJsonLayout */ private String lineSeparator; + /** + * The minimum size of the byte buffer used when encoding events in logback versions + * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of + * the encoder. + * + *

The buffer automatically grows above the {@code #minBufferSize} when needed to + * accommodate with larger events. However, only the first {@code minBufferSize} bytes + * will be reused by subsequent invocations. It is therefore strongly advised to set + * the minimum size at least equal to the average size of the encoded events to reduce + * unnecessary memory allocations and reduce pressure on the garbage collector. + */ + private int minBufferSize = 1024; + + /** + * Provides reusable byte buffers (initialized when layout is started) + */ + private ReusableByteBuffers bufferPool; + + private final CompositeJsonFormatter formatter; public CompositeJsonLayout() { @@ -52,51 +76,55 @@ public CompositeJsonLayout() { protected abstract CompositeJsonFormatter createFormatter(); + @Override public String doLayout(Event event) { if (!isStarted()) { throw new IllegalStateException("Layout is not started"); } - final String result; - try { - result = formatter.writeEventAsString(event); - } catch (IOException e) { + + ReusableByteBuffer buffer = this.bufferPool.getBuffer(); + try(OutputStreamWriter writer = new OutputStreamWriter(buffer)) { + writeLayout( prefix, writer, event); + writeFormatter( writer, event); + writeLayout( suffix, writer, event); + + if (lineSeparator!=null) { + writer.write(lineSeparator); + } + writer.flush(); + + return new String(buffer.toByteArray()); + } + catch (IOException e) { addWarn("Error formatting logging event", e); return null; } - - if (prefix == null && suffix == null && lineSeparator == null) { - return result; + finally { + bufferPool.releaseBuffer(buffer); } - - String prefixResult = doLayoutWrapped(prefix, event); - String suffixResult = doLayoutWrapped(suffix, event); - - int size = result.length() - + (prefixResult == null ? 0 : prefixResult.length()) - + (suffixResult == null ? 0 : suffixResult.length()) - + (lineSeparator == null ? 0 : lineSeparator.length()); - - StringBuilder stringBuilder = new StringBuilder(size); - if (prefixResult != null) { - stringBuilder.append(prefixResult); - } - stringBuilder.append(result); - if (suffixResult != null) { - stringBuilder.append(suffixResult); + } + + + private void writeLayout(Layout wrapped, Writer writer, Event event) throws IOException { + if (wrapped==null) { + return; } - if (lineSeparator != null) { - stringBuilder.append(lineSeparator); + + String str = wrapped.doLayout(event); + if (str!=null) { + writer.write(str); } - return stringBuilder.toString(); } - - private String doLayoutWrapped(Layout wrapped, Event event) { - return wrapped == null ? null : wrapped.doLayout(event); + + private void writeFormatter(Writer writer, Event event) throws IOException { + this.formatter.writeEventToWriter(event, writer); } + @Override public void start() { super.start(); + this.bufferPool = new ReusableByteBuffers(this.minBufferSize); formatter.setContext(getContext()); formatter.start(); startWrapped(prefix); @@ -212,4 +240,25 @@ public void setLineSeparator(String lineSeparator) { this.lineSeparator = SeparatorParser.parseSeparator(lineSeparator); } + public int getMinBufferSize() { + return minBufferSize; + } + + /** + * The minimum size of the byte buffer used when encoding events in logback versions + * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of + * the encoder. + * + *

The buffer automatically grows above the {@code #minBufferSize} when needed to + * accommodate with larger events. However, only the first {@code minBufferSize} bytes + * will be reused by subsequent invocations. It is therefore strongly advised to set + * the minimum size at least equal to the average size of the encoded events to reduce + * unnecessary memory allocations and reduce pressure on the garbage collector. + * + *

Note: changes to the buffer size will not be taken into account after the encoder + * is started. + */ + public void setMinBufferSize(int minBufferSize) { + this.minBufferSize = minBufferSize; + } } diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java new file mode 100644 index 00000000..f249b312 --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java @@ -0,0 +1,245 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package net.logstash.logback.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * A speedy alternative to {@link java.io.ByteArrayOutputStream}. + * + *

Unlike {@link java.io.ByteArrayOutputStream}, this implementation is backed by an {@link ArrayList} + * of {@code byte[]} instead of 1 constantly resizing {@code byte[]} array. It does not copy buffers when + * it gets expanded. + * + *

The initial buffer is only created when the stream is first written. + * There is also no copying of the internal buffer if its contents is extracted with the + * {@link #writeTo(OutputStream)} method. + * + *

The {@link #reset()} method clears the content and resets the buffer to its initial state. + * Buffers are disposed except the initial buffer which is reused by subsequent usage. + * + * @author brenuart + * + */ +public class ReusableByteBuffer extends OutputStream { + + /** + * The default size of the initial buffer + */ + public static final int INITIAL_SIZE = 1024; + + /** + * Constant with an empty byte array + */ + private static final byte[] EMPTY_BYTES = new byte[0]; + + /** + * The buffers used to store the content bytes + */ + private final List buffers = new ArrayList<>(); + + /** + * The number of bytes already written in previous buffers (other than tail). + */ + private int alreadyBufferedSize = 0; + + /** + * The write index in the tail buffer + */ + private int index = 0; + + /** + * Is the stream closed? + */ + private boolean closed = false; + + + /** + * Create a new {@link ReusableByteBuffer} + * with the default initial capacity of 1024 bytes. + */ + public ReusableByteBuffer() { + this(INITIAL_SIZE); + } + + /** + * Create a new {@link ReusableByteBuffer} + * with the specified initial capacity. + * + * @param initialBlockSize the initial buffer size in bytes + */ + public ReusableByteBuffer(int initialBlockSize) { + if (initialBlockSize <= 0) { + throw new IllegalArgumentException("Initial block size must be greater than 0"); + } + this.buffers.add(new byte[initialBlockSize]); + } + + + @Override + public void write(int datum) throws IOException { + if (this.closed) { + throw new IOException("Stream closed"); + } + + growIfNeeded(); + getTailBuffer()[this.index++] = (byte) datum; + } + + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + if (data == null) { + throw new NullPointerException(); + } + if (offset < 0 || offset + length > data.length || length < 0) { + throw new IndexOutOfBoundsException(); + } + if (this.closed) { + throw new IOException("Stream closed"); + } + + + while(length > 0) { + byte[] buffer = getTailBuffer(); + int freeSpace = buffer.length - this.index; + + if (freeSpace > 0) { + int toCopy = Math.min(freeSpace, length); + System.arraycopy(data, offset, buffer, this.index, toCopy); + offset += toCopy; + this.index += toCopy; + length -= toCopy; + } + + if (length>0) { + growIfNeeded(); + } + } + } + + + @Override + public void close() { + this.closed = true; + } + + + /** + * Return the number of bytes stored in this {@link ReusableByteBuffer}. + */ + public int size() { + return (this.alreadyBufferedSize + this.index); + } + + + /** + * Reset the contents of this {@link ReusableByteBuffer}. + *

All currently accumulated output in the output stream is discarded. + * The output stream can be used again. + */ + public void reset() { + // Clear allocated buffers but keep the first one + byte[] initialBuffer = this.buffers.get(0); + this.buffers.clear(); + this.buffers.add(initialBuffer); + + //this.nextBlockSize = this.initialBlockSize; + this.closed = false; + this.index = 0; + this.alreadyBufferedSize = 0; + } + + + /** + * Write the buffers content to the given OutputStream. + * + * @param out the OutputStream to write to + */ + public void writeTo(OutputStream out) throws IOException { + Iterator it = this.buffers.iterator(); + while (it.hasNext()) { + byte[] buffer = it.next(); + if (it.hasNext()) { + out.write(buffer, 0, buffer.length); + } + else { + out.write(buffer, 0, this.index); + } + } + } + + + /** + * Creates a newly allocated byte array. + *

Its size is the current + * size of this output stream and the valid contents of the buffer + * have been copied into it.

+ * + * @return the current contents of this output stream, as a byte array. + * @see #size() + * @see #toByteArrayUnsafe() + */ + public byte[] toByteArray() { + int totalSize = size(); + if (totalSize == 0) { + return EMPTY_BYTES; + } + + byte[] result = new byte[totalSize]; + + int offset = 0; + Iterator it = this.buffers.iterator(); + while (it.hasNext()) { + byte[] buffer = it.next(); + if (it.hasNext()) { + System.arraycopy(buffer, 0, result, offset, buffer.length); + offset += buffer.length; + } + else { + System.arraycopy(buffer, 0, result, offset, this.index); + } + } + + return result; + } + + + /** + * Allocate a new chunk if needed + */ + private void growIfNeeded() { + if (getTailBuffer().length == this.index) { + this.alreadyBufferedSize += this.index; + this.buffers.add(new byte[this.index * 2]); // block size doubles each time + this.index = 0; + } + } + + /** + * Convenience method to get the tail buffer (the one to write into) + * + * @return the tail buffer + */ + private byte[] getTailBuffer() { + return this.buffers.get(this.buffers.size()-1); + } +} diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java b/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java new file mode 100644 index 00000000..61d66cfa --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.lang.ref.Reference; +import java.lang.ref.SoftReference; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; + +/** + * A pool of {@link ReusableByteBuffer}. + * + *

The pool is unbounded and can hold as many buffers as needed. Buffers are kept in the pool + * using weak references so they can be garbage collected by the JVM when running low in memory. + * + * @author brenuart + */ +public class ReusableByteBuffers { + + /** + * Pool of reusable buffers. + */ + private final Deque> buffers = new ConcurrentLinkedDeque<>(); + + /** + * The size (in bytes) of the initial buffer that is reused across consecutive usages. + */ + private final int initialSize; + + /** + * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. + * + * @param initialSize the initial capacity of buffers created by this pool. + */ + public ReusableByteBuffers(int initialSize) { + this.initialSize = initialSize; + } + + /** + * Create a new buffer pool holding buffers with a default initial capacity. + */ + public ReusableByteBuffers() { + this(ReusableByteBuffer.INITIAL_SIZE); + } + + /** + * Create a new buffer with an initial size of {@link #initialSize} bytes. + * + * @return a new buffer instance + */ + private ReusableByteBuffer createBuffer() { + return new ReusableByteBuffer(initialSize); + } + + /** + * Get a buffer from the pool or create a new one if none is available. + * The buffer must be returned to the pool after usage by a call to {@link #releaseBuffer(ReusableByteBuffer)}. + * + * @return a reusable byte buffer + */ + public ReusableByteBuffer getBuffer() { + ReusableByteBuffer buffer=null; + + while(buffer==null) { + Reference ref = buffers.poll(); + if (ref==null) { + break; + } + buffer = ref.get(); + } + + if (buffer==null) { + buffer = createBuffer(); + } + + return buffer; + } + + /** + * Return a buffer to the pool after usage. + * + * @param buffer the buffer to return to the pool. + */ + public void releaseBuffer(ReusableByteBuffer buffer) { + buffer.reset(); + this.buffers.add(new SoftReference<>(buffer)); + } +} \ No newline at end of file diff --git a/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java b/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java deleted file mode 100644 index 48296b13..00000000 --- a/src/main/java/net/logstash/logback/util/ThreadLocalBuffers.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.util; - -import java.lang.ref.SoftReference; - -import com.fasterxml.jackson.core.io.SegmentedStringWriter; -import com.fasterxml.jackson.core.util.BufferRecycler; -import com.fasterxml.jackson.core.util.ByteArrayBuilder; - -/** - * Utility to maintain thread-bound buffers. - */ -public class ThreadLocalBuffers { - - /** - * This ThreadLocal contains a {@link java.lang.ref.SoftReference} - * to a {@link BufferRecycler} used to provide a low-cost - * buffer recycling between writer instances. - */ - private final ThreadLocal> recycler = new ThreadLocal>() { - protected SoftReference initialValue() { - final BufferRecycler bufferRecycler = new BufferRecycler(); - return new SoftReference(bufferRecycler); - } - }; - - - private BufferRecycler getBufferRecycler() { - SoftReference bufferRecyclerReference = recycler.get(); - BufferRecycler bufferRecycler = bufferRecyclerReference.get(); - if (bufferRecycler == null) { - recycler.remove(); - return getBufferRecycler(); - } - return bufferRecycler; - } - - - /** - * Get a thread-bound {@link ByteArrayBuilder} with a default initial size. - * - *

Do not forget to release the buffer by calling {@link ByteArrayBuilder#release()} - * when done otherwise the memory allocated by the buffer will not be recycled and new - * memory allocations will occur for every subsequent buffers. - * - * @return a thread-bound {@link ByteArrayBuilder} - */ - public ByteArrayBuilder getByteBuffer() { - return new ByteArrayBuilder(getBufferRecycler()); - } - - /** - * Get a thread-bound {@link ByteArrayBuilder} with the specified initial size. - * A buffer of the default size is allocated with the initial size is less than the default. - * - *

Do not forget to release the buffer by calling {@link ByteArrayBuilder#release()} - * when done otherwise the memory allocated by the buffer will not be recycled and new - * memory allocations will occur for every subsequent buffers. - * - * @param initialSize the initial size of the buffer - * @return a thread-bound {@link ByteArrayBuilder} - */ - public ByteArrayBuilder getByteBuffer(int initialSize) { - return new ByteArrayBuilder(getBufferRecycler(), initialSize); - } - - /** - * Get a thread-bound {@link SegmentedStringWriter}. - * - * @return a thread-bound {@link SegmentedStringWriter} - */ - public SegmentedStringWriter getStringWriter() { - return new SegmentedStringWriter(getBufferRecycler()); - } -} diff --git a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java index ad6a0b5a..dfec6ed2 100644 --- a/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java +++ b/src/test/java/net/logstash/logback/composite/loggingevent/LoggingEventCompositeJsonFormatterTest.java @@ -13,8 +13,10 @@ */ package net.logstash.logback.composite.loggingevent; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import net.logstash.logback.argument.StructuredArguments; @@ -48,7 +50,9 @@ public void testDoesNotFailOnEmptyBeans() throws IOException { /* * This should not throw an exception, since SerializationFeature.FAIL_ON_EMPTY_BEANS is disabled */ - formatter.writeEventAsString(event); + try(ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + assertThatCode(() -> formatter.writeEventToOutputStream(event, bos)).doesNotThrowAnyException(); + } } } diff --git a/src/test/java/net/logstash/logback/layout/CompositeJsonLayoutTest.java b/src/test/java/net/logstash/logback/layout/CompositeJsonLayoutTest.java index 05435209..452ffc47 100644 --- a/src/test/java/net/logstash/logback/layout/CompositeJsonLayoutTest.java +++ b/src/test/java/net/logstash/logback/layout/CompositeJsonLayoutTest.java @@ -15,118 +15,128 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.mockito.Mockito.when; import java.io.IOException; -import ch.qos.logback.core.Layout; -import ch.qos.logback.core.spi.DeferredProcessingAware; +import net.logstash.logback.composite.AbstractJsonProvider; import net.logstash.logback.composite.CompositeJsonFormatter; -import org.junit.jupiter.api.BeforeEach; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.core.JsonGenerator; + +import ch.qos.logback.core.Layout; +import ch.qos.logback.core.LayoutBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; + @ExtendWith(MockitoExtension.class) public class CompositeJsonLayoutTest { /** * create an implementation of the composite layout that format teh event using simply toString() */ - static class TesterCompositeJsonLayout extends CompositeJsonLayout { + static class TesterCompositeJsonLayout extends CompositeJsonLayout { @Override - protected CompositeJsonFormatter createFormatter() { - return new CompositeJsonFormatter(this){ + protected CompositeJsonFormatter createFormatter() { + CompositeJsonFormatter formatter = new CompositeJsonFormatter(this) {}; + formatter.getProviders().addProvider(new AbstractJsonProvider() { @Override - public String writeEventAsString(DeferredProcessingAware deferredProcessingAware) throws IOException { - return deferredProcessingAware.toString(); + public void writeTo(JsonGenerator generator, DeferredProcessingAware event) throws IOException { + generator.writeRaw("event"); } - }; + @Override + public void prepareForDeferredProcessing(DeferredProcessingAware event) { + super.prepareForDeferredProcessing(event); + } + }); + return formatter; } } - @Mock(lenient = true) - DeferredProcessingAware event; + private Layout prefixLayout = new LayoutBase() { + @Override + public String doLayout(DeferredProcessingAware event) { + return "prefix:"; + } + }; - @Mock(lenient = true) - Layout prefixLayout; + private Layout suffixLayout = new LayoutBase() { + public String doLayout(DeferredProcessingAware event) { + return ":suffix"; + }; + }; @Mock(lenient = true) - Layout suffixLayout; - - @BeforeEach - public void setup() { - when(event.toString()).thenReturn("event"); - when(prefixLayout.doLayout(event)).thenReturn("prefix:"); - when(suffixLayout.doLayout(event)).thenReturn(":suffix"); - } + private DeferredProcessingAware event; + @Test public void testDoLayoutWithoutPrefixSuffix() { - - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); layout.start(); String layoutResult = layout.doLayout(event); - assertThat(layoutResult).isEqualTo("event"); + assertThat(layoutResult).isEqualTo("{event}"); } @Test public void testDoLayoutWithPrefixWithoutSuffix() { - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); layout.setPrefix(prefixLayout); layout.start(); String layoutResult = layout.doLayout(event); - assertThat(layoutResult).isEqualTo("prefix:event"); + assertThat(layoutResult).isEqualTo("prefix:{event}"); } @Test public void testDoLayoutWithoutPrefixWithSuffix() { - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); layout.setSuffix(suffixLayout); layout.start(); String layoutResult = layout.doLayout(event); - assertThat(layoutResult).isEqualTo("event:suffix"); + assertThat(layoutResult).isEqualTo("{event}:suffix"); } @Test public void testDoLayoutWithPrefixWithSuffix() { - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); layout.setPrefix(prefixLayout); layout.setSuffix(suffixLayout); layout.start(); String layoutResult = layout.doLayout(event); - assertThat(layoutResult).isEqualTo("prefix:event:suffix"); + assertThat(layoutResult).isEqualTo("prefix:{event}:suffix"); } @Test public void testDoLayoutWithPrefixWithLineSeparator() { - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); layout.setLineSeparator("SYSTEM"); layout.start(); String layoutResult = layout.doLayout(event); - assertThat(layoutResult).isEqualTo("event" + System.lineSeparator()); + assertThat(layoutResult).isEqualTo("{event}" + System.lineSeparator()); } @Test public void notStarted() { - CompositeJsonLayout layout = new TesterCompositeJsonLayout(); + CompositeJsonLayout layout = new TesterCompositeJsonLayout(); assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> layout.doLayout(event)) .withMessage("Layout is not started"); } diff --git a/src/test/java/net/logstash/logback/util/ReusableBufferTests.java b/src/test/java/net/logstash/logback/util/ReusableBufferTests.java new file mode 100644 index 00000000..c32dbd74 --- /dev/null +++ b/src/test/java/net/logstash/logback/util/ReusableBufferTests.java @@ -0,0 +1,103 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIOException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Test; + +/** + * @author brenuart + * + */ +@SuppressWarnings("resource") +public class ReusableBufferTests { + + private final byte[] helloBytes = "0123456789".getBytes(StandardCharsets.UTF_8); + + + @Test + public void invalidInitialSize() { + assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> new ReusableByteBuffer(0)); + assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> new ReusableByteBuffer(-1)); + } + + + @Test + public void size() throws IOException { + ReusableByteBuffer buffer = new ReusableByteBuffer(); + + buffer.write(helloBytes); + assertThat(buffer.size()).isEqualTo(helloBytes.length); + } + + + @Test + public void autoGrow() throws IOException { + ReusableByteBuffer buffer = new ReusableByteBuffer(1); + + for (int i=0; i<10; i++) { + buffer.write(1); + } + + assertThat(buffer.size()).isEqualTo(10); + assertThat(buffer.toByteArray()).containsExactly(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}); + } + + + @Test + public void reset() throws IOException { + ReusableByteBuffer buffer = new ReusableByteBuffer(); + + buffer.write(helloBytes); + assertThat(buffer.toByteArray()).containsExactly(helloBytes); + + buffer.reset(); + assertThat(buffer.size()).isZero(); + + buffer.write(helloBytes); + assertThat(buffer.toByteArray()).containsExactly(helloBytes); + } + + + @Test + public void close() { + ReusableByteBuffer buffer = new ReusableByteBuffer(); + + buffer.close(); + assertThatIOException().isThrownBy(() -> buffer.write(this.helloBytes)); + + buffer.reset(); + assertThatCode(() -> buffer.write(this.helloBytes)).doesNotThrowAnyException(); + } + + + @Test + public void writeTo() throws IOException { + ReusableByteBuffer buffer = new ReusableByteBuffer(); + + buffer.write(this.helloBytes); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + buffer.writeTo(baos); + assertThat(baos.toByteArray()).isEqualTo(this.helloBytes); + } +} From 0d8ad26a7a0f9e67ec55df4ae97636aba907bebc Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Mon, 5 Jul 2021 23:24:27 +0200 Subject: [PATCH 5/6] Misc changes to take review comments into account --- README.md | 4 ++ .../composite/CompositeJsonFormatter.java | 55 +++++++++---------- .../logback/encoder/CompositeJsonEncoder.java | 12 ++-- .../logback/layout/CompositeJsonLayout.java | 12 ++-- .../logback/util/ReusableByteBuffer.java | 31 +++++------ .../logback/util/ReusableByteBuffers.java | 30 +++++----- 6 files changed, 71 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 913cc2f7..e8d3c470 100644 --- a/README.md +++ b/README.md @@ -1801,6 +1801,10 @@ The logstash-logback-encoder library contains many providers out-of-the-box, and you can even plug-in your own by extending `JsonProvider`. Each provider has its own configuration options to further customize it. +These encoders/layouts make use of an internal buffer to hold the JSON output during the rendering process. +The size of this buffer is set to `1024` bytes by default. A different size can be configured by setting the `minBufferSize` property to the desired value. +The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector. + #### Providers for LoggingEvents diff --git a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java index d0773130..8d4df851 100644 --- a/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java +++ b/src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java @@ -119,31 +119,7 @@ private JsonFactory createJsonFactory() { } } - JsonFactory jsonFactory = objectMapper - .getFactory() - /* - * When generators are flushed, don't flush the underlying outputStream. - * - * This allows some streaming optimizations when using an encoder. - * - * The encoder generally determines when the stream should be flushed - * by an 'immediateFlush' property. - * - * The 'immediateFlush' property of the encoder can be set to false - * when the appender performs the flushes at appropriate times - * (such as the end of a batch in the AbstractLogstashTcpSocketAppender). - */ - .disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM) - - /* - * Don't let the json generator close the underlying outputStream and let the - * encoder managed it. - */ - .disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); - - //FIXME these settings are mandatory - should we instead apply them *after* jsonGeneratorDecorator is applied (ie. in createGenerator()) ? - - return this.jsonFactoryDecorator.decorate(jsonFactory); + return this.jsonFactoryDecorator.decorate(objectMapper.getFactory()); } public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException { @@ -180,13 +156,36 @@ protected void prepareForDeferredProcessing(Event event) { } private JsonGenerator createGenerator(OutputStream outputStream) throws IOException { - return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(outputStream, encoding)); + return decorateGenerator(jsonFactory.createGenerator(outputStream, encoding)); } private JsonGenerator createGenerator(Writer writer) throws IOException { - return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(writer)); + return decorateGenerator(jsonFactory.createGenerator(writer)); + } + + private JsonGenerator decorateGenerator(JsonGenerator generator) { + return this.jsonGeneratorDecorator.decorate(generator) + /* + * When generators are flushed, don't flush the underlying outputStream. + * + * This allows some streaming optimizations when using an encoder. + * + * The encoder generally determines when the stream should be flushed + * by an 'immediateFlush' property. + * + * The 'immediateFlush' property of the encoder can be set to false + * when the appender performs the flushes at appropriate times + * (such as the end of a batch in the AbstractLogstashTcpSocketAppender). + */ + .disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM) + + /* + * Don't let the json generator close the underlying outputStream and let the + * encoder managed it. + */ + .disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); } - + public JsonFactory getJsonFactory() { return jsonFactory; } diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index d75a896d..7bfe3be0 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -35,9 +35,7 @@ public abstract class CompositeJsonEncoderThe buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes @@ -89,7 +87,7 @@ public byte[] encode(Event event) { throw new IllegalStateException("Encoder is not started"); } - ReusableByteBuffer buffer = bufferPool.getBuffer(); + ReusableByteBuffer buffer = bufferPool.acquire(); try { encode(event, buffer); return buffer.toByteArray(); @@ -97,7 +95,7 @@ public byte[] encode(Event event) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; } finally { - bufferPool.releaseBuffer(buffer); + bufferPool.release(buffer); } } @@ -254,9 +252,7 @@ public int getMinBufferSize() { } /** - * The minimum size of the byte buffer used when encoding events in logback versions - * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of - * the encoder. + * The minimum size of the byte buffer used when encoding events. * *

The buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes diff --git a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java index 1de09f6e..e56a1e5c 100644 --- a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java +++ b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java @@ -49,9 +49,7 @@ public abstract class CompositeJsonLayout private String lineSeparator; /** - * The minimum size of the byte buffer used when encoding events in logback versions - * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of - * the encoder. + * The minimum size of the byte buffer used when encoding events. * *

The buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes @@ -82,7 +80,7 @@ public String doLayout(Event event) { throw new IllegalStateException("Layout is not started"); } - ReusableByteBuffer buffer = this.bufferPool.getBuffer(); + ReusableByteBuffer buffer = this.bufferPool.acquire(); try (OutputStreamWriter writer = new OutputStreamWriter(buffer)) { writeLayout(prefix, writer, event); writeFormatter(writer, event); @@ -98,7 +96,7 @@ public String doLayout(Event event) { addWarn("Error formatting logging event", e); return null; } finally { - bufferPool.releaseBuffer(buffer); + bufferPool.release(buffer); } } @@ -243,9 +241,7 @@ public int getMinBufferSize() { } /** - * The minimum size of the byte buffer used when encoding events in logback versions - * greater than or equal to 1.2.0. The buffer is reused by subsequent invocations of - * the encoder. + * The minimum size of the byte buffer used when encoding events. * *

The buffer automatically grows above the {@code #minBufferSize} when needed to * accommodate with larger events. However, only the first {@code minBufferSize} bytes diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java index 2e433b9f..6ca2ff43 100644 --- a/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java +++ b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; /** * A speedy alternative to {@link java.io.ByteArrayOutputStream}. @@ -41,7 +42,7 @@ public class ReusableByteBuffer extends OutputStream { /** * The default size of the initial buffer */ - public static final int INITIAL_SIZE = 1024; + static final int DEFAULT_INITIAL_CAPACITY = 1024; /** * Constant with an empty byte array @@ -70,24 +71,22 @@ public class ReusableByteBuffer extends OutputStream { /** - * Create a new {@link ReusableByteBuffer} - * with the default initial capacity of 1024 bytes. + * Create a new {@link ReusableByteBuffer} with the default initial capacity of 1024 bytes. */ public ReusableByteBuffer() { - this(INITIAL_SIZE); + this(DEFAULT_INITIAL_CAPACITY); } /** - * Create a new {@link ReusableByteBuffer} - * with the specified initial capacity. + * Create a new {@link ReusableByteBuffer} with the specified initial capacity. * - * @param initialBlockSize the initial buffer size in bytes + * @param initialCapacity the initial buffer size in bytes */ - public ReusableByteBuffer(int initialBlockSize) { - if (initialBlockSize <= 0) { - throw new IllegalArgumentException("Initial block size must be greater than 0"); + public ReusableByteBuffer(int initialCapacity) { + if (initialCapacity <= 0) { + throw new IllegalArgumentException("initialCapacity must be greater than 0"); } - this.buffers.add(new byte[initialBlockSize]); + this.buffers.add(new byte[initialCapacity]); } @@ -104,9 +103,7 @@ public void write(int datum) throws IOException { @Override public void write(byte[] data, int offset, int length) throws IOException { - if (data == null) { - throw new NullPointerException(); - } + Objects.requireNonNull(data, "data must not be null"); if (offset < 0 || offset + length > data.length || length < 0) { throw new IndexOutOfBoundsException(); } @@ -141,10 +138,12 @@ public void close() { /** - * Return the number of bytes stored in this {@link ReusableByteBuffer}. + * Return the current size of the buffer. + * + * @return the current size of the buffer. */ public int size() { - return (this.alreadyBufferedSize + this.index); + return this.alreadyBufferedSize + this.index; } diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java b/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java index 6328783a..e898f3b9 100644 --- a/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java +++ b/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java @@ -21,8 +21,9 @@ /** * A pool of {@link ReusableByteBuffer}. * - *

The pool is unbounded and can hold as many buffers as needed. Buffers are kept in the pool - * using weak references so they can be garbage collected by the JVM when running low in memory. + *

The pool is technically unbounded but will never hold more buffers than the number of concurrent + * threads accessing it. Buffers are kept in the pool using weak references so they can be garbage + * collected by the JVM when running low in memory. * * @author brenuart */ @@ -34,42 +35,45 @@ public class ReusableByteBuffers { private final Deque> buffers = new ConcurrentLinkedDeque<>(); /** - * The size (in bytes) of the initial buffer that is reused across consecutive usages. + * The capacity (in bytes) of the initial buffer that is reused across consecutive usages. */ - private final int initialSize; + private final int initialCapacity; /** * Create a new buffer pool holding buffers with an initial capacity of {@code initialSize} bytes. * - * @param initialSize the initial capacity of buffers created by this pool. + * @param intialCapacity the initial capacity of buffers created by this pool. */ - public ReusableByteBuffers(int initialSize) { - this.initialSize = initialSize; + public ReusableByteBuffers(int intialCapacity) { + if (intialCapacity <= 0) { + throw new IllegalArgumentException("initialCapacity must be greater than 0"); + } + this.initialCapacity = intialCapacity; } /** * Create a new buffer pool holding buffers with a default initial capacity. */ public ReusableByteBuffers() { - this(ReusableByteBuffer.INITIAL_SIZE); + this(ReusableByteBuffer.DEFAULT_INITIAL_CAPACITY); } /** - * Create a new buffer with an initial size of {@link #initialSize} bytes. + * Create a new buffer with an initial size of {@link #initialCapacity} bytes. * * @return a new buffer instance */ private ReusableByteBuffer createBuffer() { - return new ReusableByteBuffer(initialSize); + return new ReusableByteBuffer(initialCapacity); } /** * Get a buffer from the pool or create a new one if none is available. - * The buffer must be returned to the pool after usage by a call to {@link #releaseBuffer(ReusableByteBuffer)}. + * The buffer is automatically returned to the pooled when closed. * * @return a reusable byte buffer */ - public ReusableByteBuffer getBuffer() { + public ReusableByteBuffer acquire() { ReusableByteBuffer buffer = null; while (buffer == null) { @@ -92,7 +96,7 @@ public ReusableByteBuffer getBuffer() { * * @param buffer the buffer to return to the pool. */ - public void releaseBuffer(ReusableByteBuffer buffer) { + public void release(ReusableByteBuffer buffer) { buffer.reset(); this.buffers.add(new SoftReference<>(buffer)); } From 513da38bb70912604899a2f721c7be70e7de3565 Mon Sep 17 00:00:00 2001 From: Bertrand Renuart Date: Mon, 5 Jul 2021 23:40:40 +0200 Subject: [PATCH 6/6] Rename ReusableByteBuffers into ReusableByteBufferPool --- .../logback/encoder/CompositeJsonEncoder.java | 6 ++-- .../logback/layout/CompositeJsonLayout.java | 6 ++-- .../logback/util/ReusableByteBuffer.java | 29 ++++++++++--------- ...ffers.java => ReusableByteBufferPool.java} | 10 +++---- 4 files changed, 26 insertions(+), 25 deletions(-) rename src/main/java/net/logstash/logback/util/{ReusableByteBuffers.java => ReusableByteBufferPool.java} (90%) diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 7bfe3be0..44968855 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -22,7 +22,7 @@ import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; import net.logstash.logback.util.ReusableByteBuffer; -import net.logstash.logback.util.ReusableByteBuffers; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.encoder.Encoder; import ch.qos.logback.core.encoder.EncoderBase; @@ -48,7 +48,7 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; @@ -115,7 +115,7 @@ public void start() { } super.start(); - this.bufferPool = new ReusableByteBuffers(this.minBufferSize); + this.bufferPool = new ReusableByteBufferPool(this.minBufferSize); formatter.setContext(getContext()); formatter.start(); charset = Charset.forName(formatter.getEncoding()); diff --git a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java index e56a1e5c..29b1b49c 100644 --- a/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java +++ b/src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java @@ -24,7 +24,7 @@ import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.SeparatorParser; import net.logstash.logback.util.ReusableByteBuffer; -import net.logstash.logback.util.ReusableByteBuffers; +import net.logstash.logback.util.ReusableByteBufferPool; import ch.qos.logback.core.Layout; import ch.qos.logback.core.LayoutBase; @@ -62,7 +62,7 @@ public abstract class CompositeJsonLayout /** * Provides reusable byte buffers (initialized when layout is started) */ - private ReusableByteBuffers bufferPool; + private ReusableByteBufferPool bufferPool; private final CompositeJsonFormatter formatter; @@ -120,7 +120,7 @@ private void writeFormatter(Writer writer, Event event) throws IOException { @Override public void start() { super.start(); - this.bufferPool = new ReusableByteBuffers(this.minBufferSize); + this.bufferPool = new ReusableByteBufferPool(this.minBufferSize); formatter.setContext(getContext()); formatter.start(); startWrapped(prefix); diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java index 6ca2ff43..5e18884f 100644 --- a/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java +++ b/src/main/java/net/logstash/logback/util/ReusableByteBuffer.java @@ -34,6 +34,8 @@ *

The {@link #reset()} method clears the content and resets the buffer to its initial state. * Buffers are disposed except the initial buffer which is reused by subsequent usage. * + *

This class is *not* thread-safe! + * * @author brenuart * */ @@ -62,7 +64,7 @@ public class ReusableByteBuffer extends OutputStream { /** * The write index in the tail buffer */ - private int index = 0; + private int tailWriteIndex = 0; /** * Is the stream closed? @@ -97,7 +99,7 @@ public void write(int datum) throws IOException { } growIfNeeded(); - getTailBuffer()[this.index++] = (byte) datum; + getTailBuffer()[this.tailWriteIndex++] = (byte) datum; } @@ -114,13 +116,13 @@ public void write(byte[] data, int offset, int length) throws IOException { while (length > 0) { byte[] buffer = getTailBuffer(); - int freeSpace = buffer.length - this.index; + int freeSpace = buffer.length - this.tailWriteIndex; if (freeSpace > 0) { int toCopy = Math.min(freeSpace, length); - System.arraycopy(data, offset, buffer, this.index, toCopy); + System.arraycopy(data, offset, buffer, this.tailWriteIndex, toCopy); offset += toCopy; - this.index += toCopy; + this.tailWriteIndex += toCopy; length -= toCopy; } @@ -143,7 +145,7 @@ public void close() { * @return the current size of the buffer. */ public int size() { - return this.alreadyBufferedSize + this.index; + return this.alreadyBufferedSize + this.tailWriteIndex; } @@ -158,9 +160,8 @@ public void reset() { this.buffers.clear(); this.buffers.add(initialBuffer); - //this.nextBlockSize = this.initialBlockSize; this.closed = false; - this.index = 0; + this.tailWriteIndex = 0; this.alreadyBufferedSize = 0; } @@ -177,7 +178,7 @@ public void writeTo(OutputStream out) throws IOException { if (it.hasNext()) { out.write(buffer, 0, buffer.length); } else { - out.write(buffer, 0, this.index); + out.write(buffer, 0, this.tailWriteIndex); } } } @@ -209,7 +210,7 @@ public byte[] toByteArray() { System.arraycopy(buffer, 0, result, offset, buffer.length); offset += buffer.length; } else { - System.arraycopy(buffer, 0, result, offset, this.index); + System.arraycopy(buffer, 0, result, offset, this.tailWriteIndex); } } @@ -221,10 +222,10 @@ public byte[] toByteArray() { * Allocate a new chunk if needed */ private void growIfNeeded() { - if (getTailBuffer().length == this.index) { - this.alreadyBufferedSize += this.index; - this.buffers.add(new byte[this.index * 2]); // block size doubles each time - this.index = 0; + if (getTailBuffer().length == this.tailWriteIndex) { + this.alreadyBufferedSize += this.tailWriteIndex; + this.buffers.add(new byte[this.tailWriteIndex * 2]); // block size doubles each time + this.tailWriteIndex = 0; } } diff --git a/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java similarity index 90% rename from src/main/java/net/logstash/logback/util/ReusableByteBuffers.java rename to src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java index e898f3b9..6a8281c8 100644 --- a/src/main/java/net/logstash/logback/util/ReusableByteBuffers.java +++ b/src/main/java/net/logstash/logback/util/ReusableByteBufferPool.java @@ -22,12 +22,12 @@ * A pool of {@link ReusableByteBuffer}. * *

The pool is technically unbounded but will never hold more buffers than the number of concurrent - * threads accessing it. Buffers are kept in the pool using weak references so they can be garbage + * threads accessing it. Buffers are kept in the pool using soft references so they can be garbage * collected by the JVM when running low in memory. * * @author brenuart */ -public class ReusableByteBuffers { +public class ReusableByteBufferPool { /** * Pool of reusable buffers. @@ -44,7 +44,7 @@ public class ReusableByteBuffers { * * @param intialCapacity the initial capacity of buffers created by this pool. */ - public ReusableByteBuffers(int intialCapacity) { + public ReusableByteBufferPool(int intialCapacity) { if (intialCapacity <= 0) { throw new IllegalArgumentException("initialCapacity must be greater than 0"); } @@ -54,7 +54,7 @@ public ReusableByteBuffers(int intialCapacity) { /** * Create a new buffer pool holding buffers with a default initial capacity. */ - public ReusableByteBuffers() { + public ReusableByteBufferPool() { this(ReusableByteBuffer.DEFAULT_INITIAL_CAPACITY); } @@ -69,7 +69,7 @@ private ReusableByteBuffer createBuffer() { /** * Get a buffer from the pool or create a new one if none is available. - * The buffer is automatically returned to the pooled when closed. + * The buffer must be returned to the pool after usage by a call to {@link #release(ReusableByteBuffer)}. * * @return a reusable byte buffer */