From d7936780bbbe1e9d4a6af3a0045077044bf31c62 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Thu, 3 Aug 2023 11:45:39 +0300 Subject: [PATCH] Reduce allocation cost reporting spans This is done by leveraging Vert.x's Buffer directly instead of relying on a ByteArrayOutputStream --- .../exporter/otlp/VertxGrpcExporter.java | 6 ++-- .../exporter/otlp/VertxHttpExporter.java | 9 +++--- .../runtime/SmallRyeHealthHandlerBase.java | 20 +------------ .../core/runtime/BufferOutputStream.java | 29 +++++++++++++++++++ 4 files changed, 37 insertions(+), 27 deletions(-) create mode 100644 extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/BufferOutputStream.java diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java index 5377044d463fb..d80e966e8d8a3 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java @@ -20,6 +20,7 @@ import io.opentelemetry.sdk.internal.ThrottlingLogger; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.quarkus.vertx.core.runtime.BufferOutputStream; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -194,10 +195,9 @@ public void handle(GrpcClientRequest request) { try { int messageSize = marshaler.getBinarySerializedSize(); - var baos = new NonCopyingByteArrayOutputStream(messageSize); // TODO: we can probably use Vert.x / Netty buffering here - marshaler.writeBinaryTo(baos); Buffer buffer = Buffer.buffer(messageSize); - buffer.appendBytes(baos.toByteArray()); + var os = new BufferOutputStream(buffer); + marshaler.writeBinaryTo(os); request.send(buffer).onSuccess(new Handler<>() { @Override public void handle(GrpcClientResponse response) { diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java index 0242aef5ae5f4..176ca444d8b93 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxHttpExporter.java @@ -2,7 +2,6 @@ import static io.quarkus.opentelemetry.runtime.exporter.otlp.OtlpExporterUtil.getPort; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; @@ -18,6 +17,7 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.quarkus.vertx.core.runtime.BufferOutputStream; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -129,9 +129,9 @@ public byte[] responseBody() { }) .putHeader("Content-Type", contentType); - ByteArrayOutputStream os; // TODO: we can probably use Vert.x / Netty buffering here + Buffer buffer = Buffer.buffer(contentLength); + OutputStream os = new BufferOutputStream(buffer); if (compressionEnabled) { - os = new ByteArrayOutputStream(contentLength); clientRequest.putHeader("Content-Encoding", "gzip"); try (var gzos = new GZIPOutputStream(os)) { marshaler.accept(gzos); @@ -139,7 +139,6 @@ public byte[] responseBody() { throw new IllegalStateException(e); } } else { - os = new NonCopyingByteArrayOutputStream(contentLength); marshaler.accept(os); } @@ -149,7 +148,7 @@ public byte[] responseBody() { } } - clientRequest.send(Buffer.buffer(os.toByteArray())); + clientRequest.send(buffer); } }) diff --git a/extensions/smallrye-health/runtime/src/main/java/io/quarkus/smallrye/health/runtime/SmallRyeHealthHandlerBase.java b/extensions/smallrye-health/runtime/src/main/java/io/quarkus/smallrye/health/runtime/SmallRyeHealthHandlerBase.java index 0724d7026625c..f4127cf7cf72b 100644 --- a/extensions/smallrye-health/runtime/src/main/java/io/quarkus/smallrye/health/runtime/SmallRyeHealthHandlerBase.java +++ b/extensions/smallrye-health/runtime/src/main/java/io/quarkus/smallrye/health/runtime/SmallRyeHealthHandlerBase.java @@ -1,7 +1,7 @@ package io.quarkus.smallrye.health.runtime; +import io.quarkus.vertx.core.runtime.BufferOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.UncheckedIOException; import io.quarkus.arc.Arc; @@ -56,22 +56,4 @@ private void doHandle(RoutingContext ctx) { } } - private static class BufferOutputStream extends OutputStream { - - private final Buffer buffer; - - private BufferOutputStream(Buffer buffer) { - this.buffer = buffer; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - buffer.appendBytes(b, off, len); - } - - @Override - public void write(int b) throws IOException { - buffer.appendInt(b); - } - } } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/BufferOutputStream.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/BufferOutputStream.java new file mode 100644 index 0000000000000..51f02bc622050 --- /dev/null +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/BufferOutputStream.java @@ -0,0 +1,29 @@ +package io.quarkus.vertx.core.runtime; + +import java.io.IOException; +import java.io.OutputStream; + +import io.vertx.core.buffer.Buffer; + +/** + * Simple {@link OutputStream} implementation that appends content + * written in given {@link Buffer} instance. + */ +public class BufferOutputStream extends OutputStream { + + private final Buffer buffer; + + public BufferOutputStream(Buffer buffer) { + this.buffer = buffer; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + buffer.appendBytes(b, off, len); + } + + @Override + public void write(int b) throws IOException { + buffer.appendInt(b); + } +}