Skip to content

Commit

Permalink
Merge pull request #13977 from Sanne/ReactiveHealth
Browse files Browse the repository at this point in the history
Reactive Connection Pools: always use TLOCAL, HealthCheck refactoring
  • Loading branch information
gsmet authored Dec 22, 2020
2 parents 8c5859f + 6b33c18 commit a698de8
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 205 deletions.
6 changes: 6 additions & 0 deletions extensions/reactive-datasource/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-core</artifactId>
</dependency>
<!-- Add the health extension as optional as we will produce the health check only if it's included -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class DataSourceReactiveRuntimeConfig {

/**
* The datasource pool maximum size.
* Note that a separate pool instance is started for each thread using it: the size limits each individual pool instance.
*/
@ConfigItem
public OptionalInt maxSize = OptionalInt.empty();
Expand Down Expand Up @@ -87,9 +88,13 @@ public class DataSourceReactiveRuntimeConfig {
public PfxConfiguration keyCertificatePfx = new PfxConfiguration();

/**
* Experimental: use one connection pool per thread.
* Deprecated: this will be removed with no replacement.
* We always return a threadsafe pool now, using a separate Pool instance for each Thread.
*
* @Deprecated
*/
@ConfigItem
@Deprecated
public Optional<Boolean> threadLocal = Optional.empty();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.enterprise.inject.spi.Bean;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.jboss.logging.Logger;

import io.quarkus.datasource.common.runtime.DataSourceUtil;
import io.quarkus.reactive.datasource.ReactiveDataSource;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;

public abstract class ReactiveDatasourceHealthCheck implements HealthCheck {

private static final Logger log = Logger.getLogger(ReactiveDatasourceHealthCheck.class);

private final Map<String, Pool> pools = new ConcurrentHashMap<>();
private final String healthCheckResponseName;
private final String healthCheckSQL;

protected ReactiveDatasourceHealthCheck(String healthCheckResponseName, String healthCheckSQL) {
this.healthCheckResponseName = healthCheckResponseName;
this.healthCheckSQL = healthCheckSQL;
}

protected void addPool(String name, Pool p) {
final Pool previous = pools.put(name, p);
if (previous != null) {
throw new IllegalStateException("Duplicate pool name: " + name);
}
}

@Override
public HealthCheckResponse call() {
HealthCheckResponseBuilder builder = HealthCheckResponse.named(healthCheckResponseName);
builder.up();

for (Map.Entry<String, Pool> pgPoolEntry : pools.entrySet()) {
final String dataSourceName = pgPoolEntry.getKey();
final Pool pgPool = pgPoolEntry.getValue();
try {
CompletableFuture<Void> databaseConnectionAttempt = new CompletableFuture<>();
Context context = Vertx.currentContext();
if (context != null) {
log.debug("Run health check on the current Vert.x context");
context.runOnContext(v -> {
pgPool.query(healthCheckSQL)
.execute(ar -> {
checkFailure(ar, builder, dataSourceName);
databaseConnectionAttempt.complete(null);
});
});
} else {
log.warn("Vert.x context unavailable to perform healthcheck of reactive datasource `" + dataSourceName
+ "`. This is unlikely to work correctly.");
pgPool.query(healthCheckSQL)
.execute(ar -> {
checkFailure(ar, builder, dataSourceName);
databaseConnectionAttempt.complete(null);
});
}

//20 seconds is rather high, but using just 10 is often not enough on slow CI
//systems, especially if the connections have to be established for the first time.
databaseConnectionAttempt.get(20, TimeUnit.SECONDS);
builder.withData(dataSourceName, "up");
} catch (RuntimeException | ExecutionException exception) {
operationsError(dataSourceName, exception);
builder.down();
builder.withData(dataSourceName, "down - connection failed: " + exception.getMessage());
} catch (InterruptedException e) {
log.warn("Interrupted while obtaining database connection for healthcheck of datasource " + dataSourceName);
Thread.currentThread().interrupt();
return builder.build();
} catch (TimeoutException e) {
log.warn("Timed out while waiting for an available connection to perform healthcheck of datasource "
+ dataSourceName);
builder.down();
builder.withData(dataSourceName, "timed out, unable to obtain connection to perform healthcheck of datasource");
}
}

return builder.build();
}

private void operationsError(final String datasourceName, final Throwable cause) {
log.warn("Error obtaining database connection for healthcheck of datasource '" + datasourceName + '\'', cause);
}

private void checkFailure(AsyncResult<RowSet<Row>> ar, HealthCheckResponseBuilder builder, String dataSourceName) {
if (ar.failed()) {
operationsError(dataSourceName, ar.cause());
builder.down();
builder.withData(dataSourceName, "down - connection failed: " + ar.cause().getMessage());
}
}

protected String getPoolName(Bean<?> bean) {
for (Object qualifier : bean.getQualifiers()) {
if (qualifier instanceof ReactiveDataSource) {
return ((ReactiveDataSource) qualifier).value();
}
}
return DataSourceUtil.DEFAULT_DATASOURCE_NAME;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;

import org.jboss.logging.Logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
Expand All @@ -21,30 +17,43 @@

public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);
//List of all opened pools. Access requires synchronization on the list instance.
private final List<Pool> threadLocalPools = new ArrayList<>();

private final AtomicReference<ThreadLocalPoolSet> poolset = new AtomicReference<>(new ThreadLocalPoolSet());
//The pool instance for the current thread
private final ThreadLocal<PoolType> threadLocal = new ThreadLocal<>();

//Used by subclasses to create new pool instances
protected final PoolOptions poolOptions;

//Used by subclasses to create new pool instances
protected final Vertx vertx;

private volatile boolean closed = false;

public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
this.vertx = vertx;
this.poolOptions = poolOptions;
}

private PoolType pool() {
//We re-try to be nice on an extremely unlikely race condition.
//3 attempts should be more than enough:
//especially consider that if this race is triggered, then someone is trying to use the pool on shutdown,
//which is inherently a broken plan.
for (int i = 0; i < 3; i++) {
final ThreadLocalPoolSet currentConnections = poolset.get();
PoolType p = currentConnections.getPool();
if (p != null)
return p;
checkPoolIsOpen();
PoolType pool = threadLocal.get();
if (pool == null) {
synchronized (threadLocalPools) {
checkPoolIsOpen();
pool = createThreadLocalPool();
threadLocalPools.add(pool);
threadLocal.set(pool);
}
}
return pool;
}

private void checkPoolIsOpen() {
if (closed) {
throw new IllegalStateException("This Pool has been closed");
}
throw new IllegalStateException("Multiple attempts to reopen a new pool on a closed instance: aborting");
}

protected abstract PoolType createThreadLocalPool();
Expand All @@ -69,80 +78,12 @@ public void begin(Handler<AsyncResult<Transaction>> handler) {
pool().begin(handler);
}

/**
* This is a bit weird because it works on all ThreadLocal pools, but it's only
* called from a single thread, when doing shutdown, and needs to close all the
* pools and reinitialise the thread local so that all newly created pools after
* the restart will start with an empty thread local instead of a closed one.
* N.B. while we take care of the pool to behave as best as we can,
* it's responsibility of the user of the returned pools to not use them
* while a close is being requested.
*/
@Override
public void close() {
// close all the thread-local pools, then discard the current ThreadLocal pool.
// Atomically set a new pool to be used: useful for live-reloading.
final ThreadLocalPoolSet previousPool = poolset.getAndSet(new ThreadLocalPoolSet());
previousPool.close();
}

private class ThreadLocalPoolSet {
final List<Pool> threadLocalPools = new ArrayList<>();
final ThreadLocal<PoolType> threadLocal = new ThreadLocal<>();
final StampedLock stampedLock = new StampedLock();
boolean isOpen = true;

public PoolType getPool() {
final long optimisticRead = stampedLock.tryOptimisticRead();
if (isOpen == false) {
//Let the caller re-try on a different instance
return null;
}
PoolType ret = threadLocal.get();
if (ret != null) {
if (stampedLock.validate(optimisticRead)) {
return ret;
} else {
//On invalid optimisticRead stamp, it means this pool instance was closed:
//let the caller re-try on a different instance
return null;
}
} else {
//Now acquire an exclusive readlock:
final long readLock = stampedLock.tryConvertToReadLock(optimisticRead);
//Again, on failure the pool was closed, return null in such case.
if (readLock == 0)
return null;
//else, we own the exclusive read lock and can now enter our slow path:
try {
log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
threadLocal.set(ret);
return ret;
} finally {
stampedLock.unlockRead(readLock);
}
}
}

public void close() {
final long lock = stampedLock.writeLock();
try {
isOpen = false;
//While this synchronized block might take a while as we have to close all
//pool instances, it shouldn't block the getPool method as contention is
//prevented by the exclusive stamped lock.
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
}
}
} finally {
stampedLock.unlockWrite(lock);
synchronized (threadLocalPools) {
this.closed = true;
for (Pool threadLocalPool : threadLocalPools) {
threadLocalPool.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent()) {
log.warn(
"Configuration element 'thread-local' on Reactive datasource connections is deprecated and will be ignored. The started pool will always be based on a per-thread separate pool now.");
}
return DB2Pool.pool(vertx, connectOptions, poolOptions);
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
}

private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

@Readiness
@ApplicationScoped
/**
* Implementation note: this healthcheck doesn't extend ReactiveDatasourceHealthCheck
* as a DB2Pool is based on Mutiny: does not extend io.vertx.sqlclient.Pool
*/
class ReactiveDB2DataSourcesHealthCheck implements HealthCheck {

private Map<String, DB2Pool> db2Pools = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent()) {
log.warn(
"Configuration element 'thread-local' on Reactive datasource connections is deprecated and will be ignored. The started pool will always be based on a per-thread separate pool now.");
}
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
}

private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
Expand Down
Loading

0 comments on commit a698de8

Please sign in to comment.