From 08ddf590e41f85ae8a23f2091a5643953e689a6d Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 23 Oct 2019 11:10:18 +0200 Subject: [PATCH] Pre-encode static messages [#138] --- .../postgresql/message/frontend/Bind.java | 59 +++++++++++-------- .../postgresql/message/frontend/Close.java | 22 +++++-- .../postgresql/message/frontend/CopyDone.java | 22 +++++-- .../postgresql/message/frontend/Describe.java | 23 +++++--- .../postgresql/message/frontend/Execute.java | 22 ++++--- .../postgresql/message/frontend/Flush.java | 21 +++++-- .../message/frontend/FrontendMessage.java | 14 +++++ .../frontend/FrontendMessageUtils.java | 14 +++++ .../message/frontend/SSLRequest.java | 25 ++++++-- .../postgresql/message/frontend/Sync.java | 23 ++++++-- .../message/frontend/Terminate.java | 22 +++++-- 11 files changed, 197 insertions(+), 70 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java index 128ca642..226c6738 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java @@ -39,7 +39,7 @@ /** * The Bind message. */ -public final class Bind implements FrontendMessage { +public final class Bind implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A marker indicating a {@code NULL} value. @@ -95,30 +95,41 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(); - writeByte(out, 'B'); - writeLengthPlaceholder(out); - writeCStringUTF8(out, this.name); - writeCStringUTF8(out, this.source); - - writeShort(out, this.parameterFormats.size()); - this.parameterFormats.forEach(format -> writeShort(out, format.getDiscriminator())); - - writeShort(out, this.parameters.size()); - this.parameters.forEach(parameters -> { - if (parameters == NULL_VALUE) { - writeInt(out, NULL); - } else { - writeInt(out, parameters.readableBytes()); - writeBytes(out, parameters); - parameters.release(); - } - }); - - writeShort(out, this.resultFormats.size()); - this.resultFormats.forEach(format -> writeShort(out, format.getDiscriminator())); - - return writeSize(out); + encode(out); + + return out; + }); + } + + @Override + public void encode(ByteBuf byteBuf) { + + writeByte(byteBuf, 'B'); + + int writerIndex = byteBuf.writerIndex(); + + writeLengthPlaceholder(byteBuf); + writeCStringUTF8(byteBuf, this.name); + writeCStringUTF8(byteBuf, this.source); + + writeShort(byteBuf, this.parameterFormats.size()); + this.parameterFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator())); + + writeShort(byteBuf, this.parameters.size()); + this.parameters.forEach(parameters -> { + if (parameters == NULL_VALUE) { + writeInt(byteBuf, NULL); + } else { + writeInt(byteBuf, parameters.readableBytes()); + writeBytes(byteBuf, parameters); + parameters.release(); + } }); + + writeShort(byteBuf, this.resultFormats.size()); + this.resultFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator())); + + writeSize(byteBuf, writerIndex); } @Override diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Close.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Close.java index 6659046c..6ae57fbc 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Close.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Close.java @@ -32,7 +32,7 @@ /** * The Close message. */ -public final class Close implements FrontendMessage { +public final class Close implements FrontendMessage, FrontendMessage.DirectEncoder { /** * The unnamed statement or portal. @@ -63,15 +63,25 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(); - writeByte(out, 'C'); - writeLengthPlaceholder(out); - writeByte(out, this.type.getDiscriminator()); - writeCStringUTF8(out, this.name); + encode(out); - return writeSize(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + + writeByte(byteBuf, 'C'); + + int writerIndex = byteBuf.writerIndex(); + + writeLengthPlaceholder(byteBuf); + writeByte(byteBuf, this.type.getDiscriminator()); + writeCStringUTF8(byteBuf, this.name); + writeSize(byteBuf, writerIndex); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java b/src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java index c0069223..48cac717 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Mono; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD; +import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize; @@ -30,13 +31,21 @@ /** * The CopyDone message. */ -public final class CopyDone implements FrontendMessage { +public final class CopyDone implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A static singleton instance that should always be used. */ public static final CopyDone INSTANCE = new CopyDone(); + private final byte[] message = writeArray(buffer -> { + + writeByte(buffer, 'c'); + writeLengthPlaceholder(buffer); + + return writeSize(buffer); + }); + private CopyDone() { } @@ -47,13 +56,16 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD); - writeByte(out, 'c'); - writeLengthPlaceholder(out); - - return writeSize(out); + encode(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + byteBuf.writeBytes(this.message); + } + @Override public String toString() { return "CopyDone{}"; diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java index 7a760f5f..a8c09abf 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java @@ -32,7 +32,7 @@ /** * The Describe message. */ -public final class Describe implements FrontendMessage { +public final class Describe implements FrontendMessage, FrontendMessage.DirectEncoder { /** * The unnamed statement or portal. @@ -63,15 +63,24 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(); - writeByte(out, 'D'); - writeLengthPlaceholder(out); - writeByte(out, this.type.getDiscriminator()); - writeCStringUTF8(out, this.name); - - return writeSize(out); + encode(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + + writeByte(byteBuf, 'D'); + + int writerIndex = byteBuf.writerIndex(); + + writeLengthPlaceholder(byteBuf); + writeByte(byteBuf, this.type.getDiscriminator()); + writeCStringUTF8(byteBuf, this.name); + writeSize(byteBuf, writerIndex); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java index dce429ed..c713e289 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java @@ -33,7 +33,7 @@ /** * The Execute message. */ -public final class Execute implements FrontendMessage { +public final class Execute implements FrontendMessage, FrontendMessage.DirectEncoder { /** * No limit on returned rows. @@ -69,14 +69,22 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(); + encode(out); + return out; + }); + } + + @Override + public void encode(ByteBuf byteBuf) { - writeByte(out, 'E'); - writeLengthPlaceholder(out); - writeCStringUTF8(out, this.name); - writeInt(out, this.rows); + writeByte(byteBuf, 'E'); - return writeSize(out); - }); + int writerIndex = byteBuf.writerIndex(); + + writeLengthPlaceholder(byteBuf); + writeCStringUTF8(byteBuf, this.name); + writeInt(byteBuf, this.rows); + writeSize(byteBuf, writerIndex); } @Override diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java index b411ac91..92227489 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Mono; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD; +import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize; @@ -30,13 +31,21 @@ /** * The Flush message. */ -public final class Flush implements FrontendMessage { +public final class Flush implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A static singleton instance that should always be used. */ public static final Flush INSTANCE = new Flush(); + private final byte[] message = writeArray(buffer -> { + + writeByte(buffer, 'H'); + writeLengthPlaceholder(buffer); + + return writeSize(buffer); + }); + private Flush() { } @@ -47,13 +56,17 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD); - writeByte(out, 'H'); - writeLengthPlaceholder(out); + encode(out); - return writeSize(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + byteBuf.writeBytes(this.message); + } + @Override public String toString() { return "Flush{}"; diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessage.java b/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessage.java index d2b2a973..66bec1f8 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessage.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessage.java @@ -34,4 +34,18 @@ public interface FrontendMessage { */ Publisher encode(ByteBufAllocator byteBufAllocator); + + /** + * Interface for messages that can be directly encoded without producing a {@link Publisher} first. + */ + interface DirectEncoder { + + /** + * Encode a message directly by writing its content to a {@link ByteBuf}. + * + * @param byteBuf the target {@link ByteBuf} to write into + */ + void encode(ByteBuf byteBuf); + } + } diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessageUtils.java b/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessageUtils.java index d92c6376..077cd42c 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessageUtils.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/FrontendMessageUtils.java @@ -17,9 +17,11 @@ package io.r2dbc.postgresql.message.frontend; import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import io.r2dbc.postgresql.util.Assert; import java.nio.ByteBuffer; +import java.util.function.Function; import static io.netty.util.CharsetUtil.UTF_8; @@ -135,4 +137,16 @@ static ByteBuf writeSize(ByteBuf out, int startIndex) { return out; } + static byte[] writeArray(Function writeFunction) { + + ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(); + ByteBuf result = writeFunction.apply(buffer); + + byte[] bytes = new byte[result.readableBytes()]; + result.readBytes(bytes); + result.release(); + + return bytes; + } + } diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/SSLRequest.java b/src/main/java/io/r2dbc/postgresql/message/frontend/SSLRequest.java index 492ee84a..b10f5787 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/SSLRequest.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/SSLRequest.java @@ -22,6 +22,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeInt; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize; @@ -29,7 +30,7 @@ /** * The SSLRequest message. */ -public final class SSLRequest implements FrontendMessage { +public final class SSLRequest implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A static singleton instance that should always be used. @@ -38,6 +39,15 @@ public final class SSLRequest implements FrontendMessage { private static final int REQUEST_CODE = 80877103; + private final byte[] message = writeArray(buffer -> { + + writeLengthPlaceholder(buffer); + writeInt(buffer, REQUEST_CODE); + + return writeSize(buffer, 0); + + }); + private SSLRequest() { } @@ -46,15 +56,18 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { Assert.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null"); return Mono.fromSupplier(() -> { - ByteBuf out = byteBufAllocator.ioBuffer(8); + ByteBuf out = byteBufAllocator.ioBuffer(); - writeLengthPlaceholder(out); - writeInt(out, REQUEST_CODE); - - return writeSize(out, 0); + encode(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + byteBuf.writeBytes(this.message); + } + @Override public String toString() { return "SSLRequest{}"; diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Sync.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Sync.java index 342016d3..b37f5638 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Sync.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Sync.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Mono; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD; +import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize; @@ -30,13 +31,21 @@ /** * The Sync message. */ -public final class Sync implements FrontendMessage { +public final class Sync implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A static singleton instance that should always be used. */ public static final Sync INSTANCE = new Sync(); + private final byte[] message = writeArray(buffer -> { + + writeByte(buffer, 'S'); + writeLengthPlaceholder(buffer); + + return writeSize(buffer); + }); + private Sync() { } @@ -46,14 +55,16 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD); - - writeByte(out, 'S'); - writeLengthPlaceholder(out); - - return writeSize(out); + encode(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + byteBuf.writeBytes(this.message); + } + @Override public String toString() { return "Sync{}"; diff --git a/src/main/java/io/r2dbc/postgresql/message/frontend/Terminate.java b/src/main/java/io/r2dbc/postgresql/message/frontend/Terminate.java index 27cd9d19..b3d1d7da 100644 --- a/src/main/java/io/r2dbc/postgresql/message/frontend/Terminate.java +++ b/src/main/java/io/r2dbc/postgresql/message/frontend/Terminate.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Mono; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD; +import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder; import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize; @@ -30,13 +31,21 @@ /** * The Termination message. */ -public final class Terminate implements FrontendMessage { +public final class Terminate implements FrontendMessage, FrontendMessage.DirectEncoder { /** * A static singleton instance that should always be used. */ public static final Terminate INSTANCE = new Terminate(); + private final byte[] message = writeArray(buffer -> { + + writeByte(buffer, 'X'); + writeLengthPlaceholder(buffer); + + return writeSize(buffer); + }); + private Terminate() { } @@ -47,13 +56,16 @@ public Publisher encode(ByteBufAllocator byteBufAllocator) { return Mono.fromSupplier(() -> { ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD); - writeByte(out, 'X'); - writeLengthPlaceholder(out); - - return writeSize(out); + encode(out); + return out; }); } + @Override + public void encode(ByteBuf byteBuf) { + byteBuf.writeBytes(this.message); + } + @Override public String toString() { return "Terminate{}";