Skip to content

Commit

Permalink
Reduce object allocations by use int[] instead of Collection<Integer>
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 30, 2020
1 parent 68e7817 commit 422ce09
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private DefaultPortalNameSupplier() {

@Override
public String get() {
return String.format("B_%d", COUNTER.getAndIncrement());
return "B_%d" + COUNTER.getAndIncrement();
}

}
44 changes: 33 additions & 11 deletions src/main/java/io/r2dbc/postgresql/IndefiniteStatementCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.HashMap;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

final class IndefiniteStatementCache implements StatementCache {

private final Map<Tuple2<String, List<Integer>>, Mono<String>> cache = new HashMap<>();
private final Map<String, Map<int[], Mono<String>>> cache = new ConcurrentHashMap<>();

private final Client client;

Expand All @@ -45,11 +44,35 @@ final class IndefiniteStatementCache implements StatementCache {
public Mono<String> getName(Binding binding, String sql) {
Assert.requireNonNull(binding, "binding must not be null");
Assert.requireNonNull(sql, "sql must not be null");
Map<int[], Mono<String>> typedMap = this.cache.computeIfAbsent(sql, ignore -> new TreeMap<>((o1, o2) -> {

synchronized (this.cache) {
return this.cache.computeIfAbsent(Tuples.of(sql, binding.getParameterTypes()),
tuple -> this.parse(tuple.getT1(), tuple.getT2()));
if (Arrays.equals(o1, o2)) {
return 0;
}

if (o1.length != o2.length) {
return o1.length - o2.length;
}

for (int i = 0; i < o1.length; i++) {

int cmp = Integer.compare(o1[i], o2[i]);

if (cmp != 0) {
return cmp;
}
}

return 0;
}));

Mono<String> mono = typedMap.get(binding.getParameterTypes());
if (mono == null) {
mono = this.parse(sql, binding.getParameterTypes());
typedMap.put(binding.getParameterTypes(), mono);
}

return mono;
}

@Override
Expand All @@ -61,8 +84,8 @@ public String toString() {
'}';
}

private Mono<String> parse(String sql, List<Integer> types) {
String name = String.format("S_%d", this.counter.getAndIncrement());
private Mono<String> parse(String sql, int[] types) {
String name = "S_" + this.counter.getAndIncrement();

ExceptionFactory factory = ExceptionFactory.withSql(name);
return ExtendedQueryMessageFlow
Expand All @@ -71,5 +94,4 @@ private Mono<String> parse(String sql, List<Integer> types) {
.then(Mono.just(name))
.cache();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean equals(Object o) {

@Override
public Class<?> getJavaType() {
return codecs.preferredType(this.nativeType, this.format);
return this.codecs.preferredType(this.nativeType, this.format);
}

@Override
Expand Down Expand Up @@ -106,9 +106,6 @@ public String toString() {
}

static PostgresqlColumnMetadata toColumnMetadata(Codecs codecs, Field field) {
Assert.requireNonNull(codecs, "codecs must not be null");
Assert.requireNonNull(field, "field must not be null");

return new PostgresqlColumnMetadata(codecs, field);
}

Expand Down
47 changes: 31 additions & 16 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.DataRow;
import io.r2dbc.postgresql.message.backend.EmptyQueryResponse;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.RowDescription;
import io.r2dbc.postgresql.util.Assert;
Expand All @@ -32,6 +33,7 @@
import io.r2dbc.spi.RowMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

import java.util.function.BiFunction;
import java.util.function.Predicate;
Expand All @@ -55,39 +57,52 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db

private volatile RowDescription rowDescription;

PostgresqlResult(Codecs codecs, Flux<BackendMessage> messages, ExceptionFactory factory) {
this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
this.messages = Assert.requireNonNull(messages, "messages must not be null");
this.factory = Assert.requireNonNull(factory, "factory must not be null");
private PostgresqlResult(Codecs codecs, Flux<BackendMessage> messages, ExceptionFactory factory) {
this.codecs = codecs;
this.messages = messages;
this.factory = factory;
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Mono<Integer> getRowsUpdated() {

return this.messages
.handle(this.factory::handleErrorResponse)
.doOnNext(ReferenceCountUtil::release)
.ofType(CommandComplete.class)
.singleOrEmpty()
.handle((commandComplete, sink) -> {
Integer rowCount = commandComplete.getRows();
if (rowCount != null) {
sink.next(rowCount);
} else {
sink.complete();
.<Integer>handle((message, sink) -> {

if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
return;
}
});

if (message instanceof DataRow) {
((DataRow) message).release();
}

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(rowCount);
}
}
}).singleOrEmpty();
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
Assert.requireNonNull(f, "f must not be null");

return this.messages.takeUntil(TAKE_UNTIL)
.handle(this.factory::handleErrorResponse)
.handle((message, sink) -> {

try {
if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
return;
}

if (message instanceof RowDescription) {
this.rowDescription = (RowDescription) message;
this.metadata = PostgresqlRowMetadata.toRowMetadata(this.codecs, (RowDescription) message);
Expand Down
43 changes: 38 additions & 5 deletions src/main/java/io/r2dbc/postgresql/client/Binding.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.r2dbc.postgresql.message.Format;
import io.r2dbc.postgresql.util.Assert;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -39,6 +40,8 @@ public final class Binding {

private final List<Parameter> parameters;

private final int[] types;

/**
* Create a new instance.
*
Expand All @@ -47,6 +50,7 @@ public final class Binding {
public Binding(int expectedSize) {
this.expectedSize = expectedSize;
this.parameters = new ArrayList<>(Collections.nCopies(expectedSize, UNSPECIFIED));
this.types = new int[expectedSize];
}

/**
Expand All @@ -57,15 +61,15 @@ public Binding(int expectedSize) {
* @return this {@link Binding}
* @throws IllegalArgumentException if {@code index} or {@code parameter} is {@code null}
*/
public Binding add(Integer index, Parameter parameter) {
Assert.requireNonNull(index, "index must not be null");
public Binding add(int index, Parameter parameter) {
Assert.requireNonNull(parameter, "parameter must not be null");

if (index >= this.expectedSize) {
throw new IndexOutOfBoundsException(String.format("Binding index %d when only %d parameters are expected", index, this.expectedSize));
}

this.parameters.set(index, parameter);
this.types[index] = parameter.getType();

return this;
}
Expand Down Expand Up @@ -97,8 +101,15 @@ public List<Format> getParameterFormats() {
*
* @return the types of the parameters in the binding
*/
public List<Integer> getParameterTypes() {
return getTransformedParameters(Parameter::getType);
public int[] getParameterTypes() {

for (int i = 0; i < this.parameters.size(); i++) {
Parameter parameter = this.parameters.get(i);
if (parameter == UNSPECIFIED) {
throw new IllegalStateException(String.format("No parameter specified for index %d", i));
}
}
return this.types;
}

/**
Expand All @@ -110,6 +121,10 @@ public List<Publisher<? extends ByteBuf>> getParameterValues() {
return getTransformedParameters(Parameter::getValue);
}

Flux<Publisher<? extends ByteBuf>> parameterValues() {
return Flux.fromIterable(this.parameters).map(Parameter::getValue);
}

@Override
public int hashCode() {
return Objects.hash(this.parameters);
Expand All @@ -119,6 +134,10 @@ public boolean isEmpty() {
return this.parameters.isEmpty();
}

public int size() {
return this.parameters.size();
}

@Override
public String toString() {
return "Binding{" +
Expand All @@ -140,14 +159,28 @@ public void validate() {
}

private <T> List<T> getTransformedParameters(Function<Parameter, T> transformer) {
List<T> transformed = new ArrayList<>(this.parameters.size());

if (this.parameters.isEmpty()) {
return Collections.emptyList();
}

List<T> transformed = null;

for (int i = 0; i < this.parameters.size(); i++) {
Parameter parameter = this.parameters.get(i);
if (parameter == UNSPECIFIED) {
throw new IllegalStateException(String.format("No parameter specified for index %d", i));
}

if (transformed == null) {
if (this.parameters.size() == 1) {
return Collections.singletonList(transformer.apply(parameter));
}

transformed = new ArrayList<>(this.parameters.size());
}


transformed.add(transformer.apply(parameter));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import static io.r2dbc.postgresql.message.frontend.Execute.NO_LIMIT;
Expand Down Expand Up @@ -91,7 +90,7 @@ public static Flux<BackendMessage> execute(Publisher<Binding> bindings, Client c
* @return the messages received in response to this exchange
* @throws IllegalArgumentException if {@code client}, {@code name}, {@code query}, or {@code types} is {@code null}
*/
public static Flux<BackendMessage> parse(Client client, String name, String query, List<Integer> types) {
public static Flux<BackendMessage> parse(Client client, String name, String query, int[] types) {
Assert.requireNonNull(client, "client must not be null");
Assert.requireNonNull(name, "name must not be null");
Assert.requireNonNull(query, "query must not be null");
Expand Down Expand Up @@ -154,7 +153,7 @@ private static Flux<FrontendMessage> toBindFlow(Binding binding, PortalNameSuppl
.flatMapMany(values -> {
Bind bind = new Bind(portal, binding.getParameterFormats(), values, resultFormat(forceBinary), statementName);

return Flux.just(bind, new Describe(portal, PORTAL), new Execute(portal, NO_LIMIT), new Close(portal, PORTAL));
return Flux.<FrontendMessage>just(bind, new Describe(portal, PORTAL), new Execute(portal, NO_LIMIT), new Close(portal, PORTAL));
}).doOnSubscribe(ignore -> QueryLogger.logQuery(query));
}

Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/r2dbc/postgresql/message/frontend/Parse.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Arrays;
import java.util.Objects;

import static io.r2dbc.postgresql.message.frontend.FrontendMessageUtils.writeByte;
Expand All @@ -49,7 +49,7 @@ public final class Parse implements FrontendMessage {

private final String name;

private final List<Integer> parameters;
private final int[] parameters;

private final String query;

Expand All @@ -63,7 +63,7 @@ public final class Parse implements FrontendMessage {
* @see #UNNAMED_STATEMENT
* @see #UNSPECIFIED
*/
public Parse(String name, List<Integer> parameters, String query) {
public Parse(String name, int[] parameters, String query) {
this.name = Assert.requireNonNull(name, "name must not be null");
this.parameters = Assert.requireNonNull(parameters, "parameters must not be null");
this.query = Assert.requireNonNull(query, "query must not be null");
Expand All @@ -81,8 +81,10 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
writeCStringUTF8(out, this.name);
writeCStringUTF8(out, this.query);

writeShort(out, this.parameters.size());
this.parameters.forEach(parameter -> writeInt(out, parameter));
writeShort(out, this.parameters.length);
for (int parameter : this.parameters) {
writeInt(out, parameter);
}

return writeSize(out);
});
Expand All @@ -98,7 +100,7 @@ public boolean equals(Object o) {
}
Parse that = (Parse) o;
return Objects.equals(this.name, that.name) &&
Objects.equals(this.parameters, that.parameters) &&
Arrays.equals(this.parameters, that.parameters) &&
Objects.equals(this.query, that.query);
}

Expand All @@ -111,7 +113,7 @@ public int hashCode() {
public String toString() {
return "Parse{" +
"name='" + this.name + '\'' +
", parameters=" + this.parameters +
", parameters=" + Arrays.toString(this.parameters) +
", query='" + this.query + '\'' +
'}';
}
Expand Down
Loading

0 comments on commit 422ce09

Please sign in to comment.