Skip to content

Commit

Permalink
Explore acquisition scheduler offloading.
Browse files Browse the repository at this point in the history
[resolves #217]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Oct 16, 2024
1 parent 5783652 commit fd0ca3c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 53 deletions.
84 changes: 55 additions & 29 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,14 @@

package io.r2dbc.pool;

import io.r2dbc.spi.Closeable;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Wrapped;
import io.r2dbc.spi.*;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PoolMetricsRecorder;
import reactor.pool.PooledRefMetadata;
import reactor.pool.*;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand All @@ -47,6 +38,7 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
Expand Down Expand Up @@ -114,31 +106,34 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
Mono<Connection> mono = this.connectionPool.acquire()
.flatMap(ref -> {

if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}
Connection connection = ref.poolable();
Scheduler scheduler = null;
Executor executor = null;
Mono<Connection> conn;

Mono<Void> prepare = null;
if (ref.poolable() instanceof Lifecycle) {
prepare = Mono.from(((Lifecycle) ref.poolable()).postAllocate());
}
if (connection instanceof Wrapped<?>) {

if (configuration.getPostAllocate() != null) {
Wrapped<?> wrapped = (Wrapped<?>) connection;

Mono<Void> postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(ref.poolable())));
prepare = prepare == null ? postAllocate : prepare.then(postAllocate);
scheduler = wrapped.unwrap(Scheduler.class);

if (scheduler == null) {
executor = wrapped.unwrap(Executor.class);
}

if (executor != null) {
scheduler = Schedulers.fromExecutor(executor);
}
}

PooledConnection connection = new PooledConnection(ref, this.preRelease);
Mono<Connection> conn;
if (prepare == null) {
conn = getValidConnection(allocateValidation, connection);
if (scheduler != null) {
conn = Mono.just(connection).publishOn(scheduler).flatMap(it -> {
return prepareConnection(configuration, ref, connection, allocateValidation);
});
} else {
conn = prepare.then(getValidConnection(allocateValidation, connection));
conn = prepareConnection(configuration, ref, connection, allocateValidation);
}

conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));

return Operators.discardOnCancel(conn, () -> {
ref.release().subscribe();
return false;
Expand Down Expand Up @@ -172,6 +167,37 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create;
}

private Mono<Connection> prepareConnection(ConnectionPoolConfiguration configuration, PooledRef<Connection> ref, Connection connection, Function<Connection, Mono<Void>> allocateValidation) {

Mono<Void> prepare = null;

if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}


if (connection instanceof Lifecycle) {
prepare = Mono.from(((Lifecycle) connection).postAllocate());
}

if (configuration.getPostAllocate() != null) {

Mono<Void> postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(connection)));
prepare = prepare == null ? postAllocate : prepare.then(postAllocate);
}

PooledConnection pooledConnection = new PooledConnection(ref, this.preRelease);
Mono<Connection> conn;
if (prepare == null) {
conn = getValidConnection(allocateValidation, pooledConnection);
} else {
conn = prepare.then(getValidConnection(allocateValidation, pooledConnection));
}

conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));
return conn;
}

private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, Connection connection) {
return allocateValidation.apply(connection).thenReturn(connection);
}
Expand Down
56 changes: 32 additions & 24 deletions src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@

package io.r2dbc.pool;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.ValidationDepth;
import io.r2dbc.spi.Wrapped;
import io.r2dbc.spi.*;
import io.r2dbc.spi.test.MockConnection;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand All @@ -32,41 +26,29 @@
import org.springframework.util.ReflectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

/**
* Unit tests for {@link ConnectionPool}.
Expand Down Expand Up @@ -127,12 +109,38 @@ void shouldCreateConnection() {

assertThat(actual).isInstanceOf(PooledConnection.class);
assertThat(((Wrapped) actual).unwrap()).isSameAs(connectionMock);
assertThat(Thread.currentThread().getName()).startsWith("single-");

}).verifyComplete();

verify(connectionFactoryMock).create();
}

@Test
@SuppressWarnings("unchecked")
void shouldCreateConnectionAndeOffloadPreparation() {

ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class);
Connection connectionMock = mock(Connection.class, withSettings().extraInterfaces(Wrapped.class));
Wrapped<?> wrapped = (Wrapped<?>) connectionMock;

when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.just(connectionMock));
when(connectionMock.validate(any())).thenReturn(Mono.empty());
when(wrapped.unwrap(Scheduler.class)).thenReturn(Schedulers.parallel());

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock).build();
ConnectionPool pool = new ConnectionPool(configuration);

pool.create().as(StepVerifier::create).consumeNextWith(actual -> {

assertThat(actual).isInstanceOf(PooledConnection.class);
assertThat(((Wrapped) actual).unwrap()).isSameAs(connectionMock);
assertThat(Thread.currentThread().getName()).startsWith("parallel-");
}).verifyComplete();

verify(connectionFactoryMock).create();
}

@Test
@SuppressWarnings("unchecked")
void shouldConsiderInitialSize() {
Expand Down

0 comments on commit fd0ca3c

Please sign in to comment.