Skip to content

Commit

Permalink
Pre-encode static messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 30, 2020
1 parent 4617f8c commit 08ddf59
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 70 deletions.
59 changes: 35 additions & 24 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Bind.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
/**
* The Bind message.
*/
public final class Bind implements FrontendMessage {
public final class Bind implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* A marker indicating a {@code NULL} value.
Expand Down Expand Up @@ -95,30 +95,41 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer();

writeByte(out, 'B');
writeLengthPlaceholder(out);
writeCStringUTF8(out, this.name);
writeCStringUTF8(out, this.source);

writeShort(out, this.parameterFormats.size());
this.parameterFormats.forEach(format -> writeShort(out, format.getDiscriminator()));

writeShort(out, this.parameters.size());
this.parameters.forEach(parameters -> {
if (parameters == NULL_VALUE) {
writeInt(out, NULL);
} else {
writeInt(out, parameters.readableBytes());
writeBytes(out, parameters);
parameters.release();
}
});

writeShort(out, this.resultFormats.size());
this.resultFormats.forEach(format -> writeShort(out, format.getDiscriminator()));

return writeSize(out);
encode(out);

return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {

writeByte(byteBuf, 'B');

int writerIndex = byteBuf.writerIndex();

writeLengthPlaceholder(byteBuf);
writeCStringUTF8(byteBuf, this.name);
writeCStringUTF8(byteBuf, this.source);

writeShort(byteBuf, this.parameterFormats.size());
this.parameterFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));

writeShort(byteBuf, this.parameters.size());
this.parameters.forEach(parameters -> {
if (parameters == NULL_VALUE) {
writeInt(byteBuf, NULL);
} else {
writeInt(byteBuf, parameters.readableBytes());
writeBytes(byteBuf, parameters);
parameters.release();
}
});

writeShort(byteBuf, this.resultFormats.size());
this.resultFormats.forEach(format -> writeShort(byteBuf, format.getDiscriminator()));

writeSize(byteBuf, writerIndex);
}

@Override
Expand Down
22 changes: 16 additions & 6 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Close.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* The Close message.
*/
public final class Close implements FrontendMessage {
public final class Close implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* The unnamed statement or portal.
Expand Down Expand Up @@ -63,15 +63,25 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer();

writeByte(out, 'C');
writeLengthPlaceholder(out);
writeByte(out, this.type.getDiscriminator());
writeCStringUTF8(out, this.name);
encode(out);

return writeSize(out);
return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {

writeByte(byteBuf, 'C');

int writerIndex = byteBuf.writerIndex();

writeLengthPlaceholder(byteBuf);
writeByte(byteBuf, this.type.getDiscriminator());
writeCStringUTF8(byteBuf, this.name);
writeSize(byteBuf, writerIndex);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/io/r2dbc/postgresql/message/frontend/CopyDone.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@
import reactor.core.publisher.Mono;

import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize;

/**
* The CopyDone message.
*/
public final class CopyDone implements FrontendMessage {
public final class CopyDone implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* A static singleton instance that should always be used.
*/
public static final CopyDone INSTANCE = new CopyDone();

private final byte[] message = writeArray(buffer -> {

writeByte(buffer, 'c');
writeLengthPlaceholder(buffer);

return writeSize(buffer);
});

private CopyDone() {
}

Expand All @@ -47,13 +56,16 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD);

writeByte(out, 'c');
writeLengthPlaceholder(out);

return writeSize(out);
encode(out);
return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {
byteBuf.writeBytes(this.message);
}

@Override
public String toString() {
return "CopyDone{}";
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Describe.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* The Describe message.
*/
public final class Describe implements FrontendMessage {
public final class Describe implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* The unnamed statement or portal.
Expand Down Expand Up @@ -63,15 +63,24 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer();

writeByte(out, 'D');
writeLengthPlaceholder(out);
writeByte(out, this.type.getDiscriminator());
writeCStringUTF8(out, this.name);

return writeSize(out);
encode(out);
return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {

writeByte(byteBuf, 'D');

int writerIndex = byteBuf.writerIndex();

writeLengthPlaceholder(byteBuf);
writeByte(byteBuf, this.type.getDiscriminator());
writeCStringUTF8(byteBuf, this.name);
writeSize(byteBuf, writerIndex);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
22 changes: 15 additions & 7 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Execute.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* The Execute message.
*/
public final class Execute implements FrontendMessage {
public final class Execute implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* No limit on returned rows.
Expand Down Expand Up @@ -69,14 +69,22 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {

return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer();
encode(out);
return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {

writeByte(out, 'E');
writeLengthPlaceholder(out);
writeCStringUTF8(out, this.name);
writeInt(out, this.rows);
writeByte(byteBuf, 'E');

return writeSize(out);
});
int writerIndex = byteBuf.writerIndex();

writeLengthPlaceholder(byteBuf);
writeCStringUTF8(byteBuf, this.name);
writeInt(byteBuf, this.rows);
writeSize(byteBuf, writerIndex);
}

@Override
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Flush.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@
import reactor.core.publisher.Mono;

import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.MESSAGE_OVERHEAD;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeArray;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeLengthPlaceholder;
import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeSize;

/**
* The Flush message.
*/
public final class Flush implements FrontendMessage {
public final class Flush implements FrontendMessage, FrontendMessage.DirectEncoder {

/**
* A static singleton instance that should always be used.
*/
public static final Flush INSTANCE = new Flush();

private final byte[] message = writeArray(buffer -> {

writeByte(buffer, 'H');
writeLengthPlaceholder(buffer);

return writeSize(buffer);
});

private Flush() {
}

Expand All @@ -47,13 +56,17 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
return Mono.fromSupplier(() -> {
ByteBuf out = byteBufAllocator.ioBuffer(MESSAGE_OVERHEAD);

writeByte(out, 'H');
writeLengthPlaceholder(out);
encode(out);

return writeSize(out);
return out;
});
}

@Override
public void encode(ByteBuf byteBuf) {
byteBuf.writeBytes(this.message);
}

@Override
public String toString() {
return "Flush{}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,18 @@ public interface FrontendMessage {
*/
Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator);


/**
* Interface for messages that can be directly encoded without producing a {@link Publisher} first.
*/
interface DirectEncoder {

/**
* Encode a message directly by writing its content to a {@link ByteBuf}.
*
* @param byteBuf the target {@link ByteBuf} to write into
*/
void encode(ByteBuf byteBuf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package io.r2dbc.postgresql.message.frontend;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.r2dbc.postgresql.util.Assert;

import java.nio.ByteBuffer;
import java.util.function.Function;

import static io.netty.util.CharsetUtil.UTF_8;

Expand Down Expand Up @@ -135,4 +137,16 @@ static ByteBuf writeSize(ByteBuf out, int startIndex) {
return out;
}

static byte[] writeArray(Function<ByteBuf, ByteBuf> writeFunction) {

ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
ByteBuf result = writeFunction.apply(buffer);

byte[] bytes = new byte[result.readableBytes()];
result.readBytes(bytes);
result.release();

return bytes;
}

}
Loading

0 comments on commit 08ddf59

Please sign in to comment.