Skip to content

Commit

Permalink
Reduce object allocations by use int[] instead of Collection<Integer>
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jan 30, 2020
1 parent 08ddf59 commit 1b2a141
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 140 deletions.
55 changes: 45 additions & 10 deletions src/main/java/io/r2dbc/postgresql/BoundedStatementCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -38,7 +37,7 @@
*/
final class BoundedStatementCache implements StatementCache {

private final Map<Tuple2<String, List<Integer>>, String> cache = new LinkedHashMap<>(16, 0.75f, true);
private final Map<CacheKey, String> cache = new LinkedHashMap<>(16, 0.75f, true);

private final Client client;

Expand All @@ -58,7 +57,7 @@ public BoundedStatementCache(Client client, int limit) {
public Mono<String> getName(Binding binding, String sql) {
Assert.requireNonNull(binding, "binding must not be null");
Assert.requireNonNull(sql, "sql must not be null");
Tuple2<String, List<Integer>> key = Tuples.of(sql, binding.getParameterTypes());
CacheKey key = new CacheKey(sql, binding.getParameterTypes());
String name = get(key);
if (name != null) {
return Mono.just(name);
Expand All @@ -76,7 +75,7 @@ public Mono<String> getName(Binding binding, String sql) {
.then();
});

return closeLastStatement.then(this.parse(sql, binding.getParameterTypes()))
return closeLastStatement.then(parse(sql, binding.getParameterTypes()))
.doOnNext(preparedName -> put(key, preparedName));
}

Expand All @@ -101,7 +100,7 @@ Collection<String> getCachedStatementNames() {
* @return statement name by key
*/
@Nullable
private String get(Tuple2<String, List<Integer>> key) {
private String get(CacheKey key) {
synchronized (this.cache) {
return this.cache.get(key);
}
Expand All @@ -114,7 +113,7 @@ private String get(Tuple2<String, List<Integer>> key) {
*/
private String getAndRemoveEldest() {
synchronized (this.cache) {
Iterator<Map.Entry<Tuple2<String, List<Integer>>, String>> iterator = this.cache.entrySet().iterator();
Iterator<Map.Entry<CacheKey, String>> iterator = this.cache.entrySet().iterator();
String entry = iterator.next().getValue();
iterator.remove();
return entry;
Expand All @@ -124,7 +123,7 @@ private String getAndRemoveEldest() {
/**
* Synchronized cache access: Store prepared statement.
*/
private void put(Tuple2<String, List<Integer>> key, String preparedName) {
private void put(CacheKey key, String preparedName) {
synchronized (this.cache) {
this.cache.put(key, preparedName);
}
Expand All @@ -151,8 +150,8 @@ public String toString() {
'}';
}

private Mono<String> parse(String sql, List<Integer> types) {
String name = String.format("S_%d", this.counter.getAndIncrement());
private Mono<String> parse(String sql, int[] types) {
String name = "S_" + this.counter.getAndIncrement();

ExceptionFactory factory = ExceptionFactory.withSql(name);
return ExtendedQueryMessageFlow
Expand All @@ -161,4 +160,40 @@ private Mono<String> parse(String sql, List<Integer> types) {
.then(Mono.just(name))
.cache();
}

static class CacheKey {

String sql;

int[] parameterTypes;

public CacheKey(String sql, int[] parameterTypes) {
this.sql = sql;
this.parameterTypes = parameterTypes;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CacheKey)) {
return false;
}

CacheKey cacheKey = (CacheKey) o;

if (this.sql != null ? !this.sql.equals(cacheKey.sql) : cacheKey.sql != null) {
return false;
}
return Arrays.equals(this.parameterTypes, cacheKey.parameterTypes);
}

@Override
public int hashCode() {
int result = this.sql != null ? this.sql.hashCode() : 0;
result = 31 * result + Arrays.hashCode(this.parameterTypes);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private DefaultPortalNameSupplier() {

@Override
public String get() {
return String.format("B_%d", COUNTER.getAndIncrement());
return "B_%d" + COUNTER.getAndIncrement();
}

}
44 changes: 33 additions & 11 deletions src/main/java/io/r2dbc/postgresql/IndefiniteStatementCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.HashMap;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

final class IndefiniteStatementCache implements StatementCache {

private final Map<Tuple2<String, List<Integer>>, Mono<String>> cache = new HashMap<>();
private final Map<String, Map<int[], Mono<String>>> cache = new ConcurrentHashMap<>();

private final Client client;

Expand All @@ -45,11 +44,35 @@ final class IndefiniteStatementCache implements StatementCache {
public Mono<String> getName(Binding binding, String sql) {
Assert.requireNonNull(binding, "binding must not be null");
Assert.requireNonNull(sql, "sql must not be null");
Map<int[], Mono<String>> typedMap = this.cache.computeIfAbsent(sql, ignore -> new TreeMap<>((o1, o2) -> {

synchronized (this.cache) {
return this.cache.computeIfAbsent(Tuples.of(sql, binding.getParameterTypes()),
tuple -> this.parse(tuple.getT1(), tuple.getT2()));
if (Arrays.equals(o1, o2)) {
return 0;
}

if (o1.length != o2.length) {
return o1.length - o2.length;
}

for (int i = 0; i < o1.length; i++) {

int cmp = Integer.compare(o1[i], o2[i]);

if (cmp != 0) {
return cmp;
}
}

return 0;
}));

Mono<String> mono = typedMap.get(binding.getParameterTypes());
if (mono == null) {
mono = parse(sql, binding.getParameterTypes());
typedMap.put(binding.getParameterTypes(), mono);
}

return mono;
}

@Override
Expand All @@ -61,8 +84,8 @@ public String toString() {
'}';
}

private Mono<String> parse(String sql, List<Integer> types) {
String name = String.format("S_%d", this.counter.getAndIncrement());
private Mono<String> parse(String sql, int[] types) {
String name = "S_" + this.counter.getAndIncrement();

ExceptionFactory factory = ExceptionFactory.withSql(name);
return ExtendedQueryMessageFlow
Expand All @@ -71,5 +94,4 @@ private Mono<String> parse(String sql, List<Integer> types) {
.then(Mono.just(name))
.cache();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean equals(Object o) {

@Override
public Class<?> getJavaType() {
return codecs.preferredType(this.nativeType, this.format);
return this.codecs.preferredType(this.nativeType, this.format);
}

@Override
Expand Down Expand Up @@ -106,9 +106,6 @@ public String toString() {
}

static PostgresqlColumnMetadata toColumnMetadata(Codecs codecs, Field field) {
Assert.requireNonNull(codecs, "codecs must not be null");
Assert.requireNonNull(field, "field must not be null");

return new PostgresqlColumnMetadata(codecs, field);
}

Expand Down
49 changes: 31 additions & 18 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.DataRow;
import io.r2dbc.postgresql.message.backend.EmptyQueryResponse;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.RowDescription;
import io.r2dbc.postgresql.util.Assert;
Expand All @@ -31,6 +32,7 @@
import io.r2dbc.spi.RowMetadata;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

import java.util.function.BiFunction;
import java.util.function.Predicate;
Expand All @@ -54,39 +56,52 @@ final class PostgresqlResult extends AbstractReferenceCounted implements io.r2db

private volatile RowDescription rowDescription;

PostgresqlResult(ConnectionContext context, Flux<BackendMessage> messages, ExceptionFactory factory) {
this.context = Assert.requireNonNull(context, "context must not be null");
this.messages = Assert.requireNonNull(messages, "messages must not be null");
this.factory = Assert.requireNonNull(factory, "factory must not be null");
private PostgresqlResult(ConnectionContext context, Flux<BackendMessage> messages, ExceptionFactory factory) {
this.context = context;
this.messages = messages;
this.factory = factory;
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public Mono<Integer> getRowsUpdated() {

return this.messages
.handle(this.factory::handleErrorResponse)
.doOnNext(ReferenceCountUtil::release)
.ofType(CommandComplete.class)
.singleOrEmpty()
.handle((commandComplete, sink) -> {
Integer rowCount = commandComplete.getRows();
if (rowCount != null) {
sink.next(rowCount);
} else {
sink.complete();
.<Integer>handle((message, sink) -> {

if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
return;
}
});

if (message instanceof DataRow) {
((DataRow) message).release();
}

if (message instanceof CommandComplete) {

Integer rowCount = ((CommandComplete) message).getRows();
if (rowCount != null) {
sink.next(rowCount);
}
}
}).singleOrEmpty();
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
Assert.requireNonNull(f, "f must not be null");

return this.messages.takeUntil(TAKE_UNTIL)
.handle(this.factory::handleErrorResponse)
.handle((message, sink) -> {

try {
if (message instanceof ErrorResponse) {
this.factory.handleErrorResponse(message, (SynchronousSink) sink);
return;
}

if (message instanceof RowDescription) {
this.rowDescription = (RowDescription) message;
this.metadata = PostgresqlRowMetadata.toRowMetadata(this.context.getCodecs(), (RowDescription) message);
Expand All @@ -95,8 +110,6 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {

if (message instanceof DataRow) {
PostgresqlRow row = PostgresqlRow.toRow(this.context, (DataRow) message, this.rowDescription);


sink.next(f.apply(row, this.metadata));
}

Expand Down
Loading

0 comments on commit 1b2a141

Please sign in to comment.