Skip to content

Commit

Permalink
Reactive SQL Client Pool Event Loop Sizing
Browse files Browse the repository at this point in the history
When a new connection object is created, the pool assigns it an event loop.

This change allows to configure the behavior of the pool as follows:

- event-loop-size is set to a strictly positive value, the pool assigns as many event loops as specified, in a round-robin fashion
- by default, the number of event loops configured or calculated by Quarkus is used
- if event-loop-size is set to zero or a negative value, the pool assigns the current event loop to the new connection (the default behavior for a Vert.x app)
  • Loading branch information
tsegismont committed Mar 25, 2022
1 parent 851d236 commit c58781a
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ public class DataSourceReactiveRuntimeConfig {
@ConfigItem
public OptionalInt maxSize = OptionalInt.empty();

/**
* When a new connection object is created, the pool assigns it an event loop.
* <p>
* When {@code #event-loop-size} is set to a strictly positive value, the pool assigns as many event loops as specified, in
* a round-robin fashion.
* By default, the number of event loops configured or calculated by Quarkus is used.
* If {@code #event-loop-size} is set to zero or a negative value, the pool assigns the current event loop to the new
* connection.
*/
@ConfigItem
public OptionalInt eventLoopSize = OptionalInt.empty();

/**
* Whether all server certificates should be trusted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.quarkus.reactive.db2.client.runtime.DataSourcesReactiveDB2Config;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.Pool;
Expand All @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,
BuildProducer<VertxPoolBuildItem> vertxPool,
DB2PoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport,
Expand All @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,

feature.produce(new FeatureBuildItem(Feature.REACTIVE_DB2_CLIENT));

createPoolIfDefined(recorder, vertx, shutdown, db2Pool, vertxPool, syntheticBeans,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, db2Pool, vertxPool, syntheticBeans,
DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig,
dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig,
dataSourcesReactiveDB2Config, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem);

for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) {
createPoolIfDefined(recorder, vertx, shutdown, db2Pool, vertxPool, syntheticBeans, dataSourceName,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, db2Pool, vertxPool, syntheticBeans, dataSourceName,
dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig,
dataSourcesReactiveRuntimeConfig, dataSourcesReactiveDB2Config, defaultDataSourceDbKindBuildItems,
curateOutcomeBuildItem);
Expand Down Expand Up @@ -117,6 +119,7 @@ void addHealthCheck(

private void createPoolIfDefined(DB2PoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<DB2PoolBuildItem> db2Pool,
BuildProducer<VertxPoolBuildItem> vertxPool,
Expand All @@ -136,6 +139,7 @@ private void createPoolIfDefined(DB2PoolRecorder recorder,
}

RuntimeValue<DB2Pool> pool = recorder.configureDB2Pool(vertx.getVertx(),
eventLoopCount.getEventLoopCount(),
dataSourceName,
dataSourcesRuntimeConfig,
dataSourcesReactiveRuntimeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

Expand All @@ -34,13 +35,15 @@ public class DB2PoolRecorder {
private static final Logger log = Logger.getLogger(DB2PoolRecorder.class);

public RuntimeValue<DB2Pool> configureDB2Pool(RuntimeValue<Vertx> vertx,
Supplier<Integer> eventLoopCount,
String dataSourceName,
DataSourcesRuntimeConfig dataSourcesRuntimeConfig,
DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig,
DataSourcesReactiveDB2Config dataSourcesReactiveDB2Config,
ShutdownContext shutdown) {

DB2Pool db2Pool = initialize(vertx.getValue(),
eventLoopCount.get(),
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName),
dataSourcesReactiveDB2Config.getDataSourceReactiveRuntimeConfig(dataSourceName));
Expand All @@ -53,10 +56,12 @@ public RuntimeValue<io.vertx.mutiny.db2client.DB2Pool> mutinyDB2Pool(RuntimeValu
return new RuntimeValue<>(io.vertx.mutiny.db2client.DB2Pool.newInstance(db2Pool.getValue()));
}

private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig,
private DB2Pool initialize(Vertx vertx,
Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
Expand All @@ -67,7 +72,8 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
return DB2Pool.pool(vertx, connectOptions, poolOptions);
}

private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
private PoolOptions toPoolOptions(Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
PoolOptions poolOptions;
Expand All @@ -89,6 +95,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi
}
}

if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) {
poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt()));
} else if (eventLoopCount != null) {
poolOptions.setEventLoopSize(Math.max(0, eventLoopCount));
}

return poolOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.quarkus.reactive.mssql.client.runtime.MSSQLPoolRecorder;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;
import io.vertx.mssqlclient.MSSQLPool;
import io.vertx.sqlclient.Pool;
Expand All @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,
BuildProducer<VertxPoolBuildItem> vertxPool,
MSSQLPoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport,
Expand All @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,

feature.produce(new FeatureBuildItem(Feature.REACTIVE_MSSQL_CLIENT));

createPoolIfDefined(recorder, vertx, shutdown, msSQLPool, vertxPool, syntheticBeans,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, msSQLPool, vertxPool, syntheticBeans,
DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig,
dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig,
dataSourcesReactiveMSSQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem);

for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) {
createPoolIfDefined(recorder, vertx, shutdown, msSQLPool, vertxPool, syntheticBeans, dataSourceName,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, msSQLPool, vertxPool, syntheticBeans, dataSourceName,
dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig,
dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMSSQLConfig, defaultDataSourceDbKindBuildItems,
curateOutcomeBuildItem);
Expand Down Expand Up @@ -117,6 +119,7 @@ void addHealthCheck(

private void createPoolIfDefined(MSSQLPoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<MSSQLPoolBuildItem> msSQLPool,
BuildProducer<VertxPoolBuildItem> vertxPool,
Expand All @@ -136,6 +139,7 @@ private void createPoolIfDefined(MSSQLPoolRecorder recorder,
}

RuntimeValue<MSSQLPool> pool = recorder.configureMSSQLPool(vertx.getVertx(),
eventLoopCount.getEventLoopCount(),
dataSourceName,
dataSourcesRuntimeConfig,
dataSourcesReactiveRuntimeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

Expand All @@ -35,13 +36,15 @@ public class MSSQLPoolRecorder {
private static final Logger log = Logger.getLogger(MSSQLPoolRecorder.class);

public RuntimeValue<MSSQLPool> configureMSSQLPool(RuntimeValue<Vertx> vertx,
Supplier<Integer> eventLoopCount,
String dataSourceName,
DataSourcesRuntimeConfig dataSourcesRuntimeConfig,
DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig,
DataSourcesReactiveMSSQLConfig dataSourcesReactiveMSSQLConfig,
ShutdownContext shutdown) {

MSSQLPool mssqlPool = initialize(vertx.getValue(),
eventLoopCount.get(),
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName),
dataSourcesReactiveMSSQLConfig.getDataSourceReactiveRuntimeConfig(dataSourceName));
Expand All @@ -54,10 +57,12 @@ public RuntimeValue<io.vertx.mutiny.mssqlclient.MSSQLPool> mutinyMSSQLPool(Runti
return new RuntimeValue<>(io.vertx.mutiny.mssqlclient.MSSQLPool.newInstance(mssqlPool.getValue()));
}

private MSSQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig,
private MSSQLPool initialize(Vertx vertx,
Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveMSSQLConfig dataSourceReactiveMSSQLConfig) {
PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveMSSQLConfig);
MSSQLConnectOptions mssqlConnectOptions = toMSSQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMSSQLConfig);
Expand All @@ -68,7 +73,8 @@ private MSSQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
return MSSQLPool.pool(vertx, mssqlConnectOptions, poolOptions);
}

private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
private PoolOptions toPoolOptions(Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveMSSQLConfig dataSourceReactiveMSSQLConfig) {
PoolOptions poolOptions;
Expand All @@ -90,6 +96,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi
}
}

if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) {
poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt()));
} else if (eventLoopCount != null) {
poolOptions.setEventLoopSize(Math.max(0, eventLoopCount));
}

return poolOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.quarkus.reactive.mysql.client.runtime.MySQLPoolRecorder;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.Pool;
Expand All @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,
BuildProducer<VertxPoolBuildItem> vertxPool,
MySQLPoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<ExtensionSslNativeSupportBuildItem> sslNativeSupport,
Expand All @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer<FeatureBuildItem> feature,

feature.produce(new FeatureBuildItem(Feature.REACTIVE_MYSQL_CLIENT));

createPoolIfDefined(recorder, vertx, shutdown, mySQLPool, vertxPool, syntheticBeans,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, mySQLPool, vertxPool, syntheticBeans,
DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig,
dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig,
dataSourcesReactiveMySQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem);

for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) {
createPoolIfDefined(recorder, vertx, shutdown, mySQLPool, vertxPool, syntheticBeans, dataSourceName,
createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, mySQLPool, vertxPool, syntheticBeans, dataSourceName,
dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig,
dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMySQLConfig, defaultDataSourceDbKindBuildItems,
curateOutcomeBuildItem);
Expand Down Expand Up @@ -117,6 +119,7 @@ void addHealthCheck(

private void createPoolIfDefined(MySQLPoolRecorder recorder,
VertxBuildItem vertx,
EventLoopCountBuildItem eventLoopCount,
ShutdownContextBuildItem shutdown,
BuildProducer<MySQLPoolBuildItem> mySQLPool,
BuildProducer<VertxPoolBuildItem> vertxPool,
Expand All @@ -136,6 +139,7 @@ private void createPoolIfDefined(MySQLPoolRecorder recorder,
}

RuntimeValue<MySQLPool> pool = recorder.configureMySQLPool(vertx.getVertx(),
eventLoopCount.getEventLoopCount(),
dataSourceName,
dataSourcesRuntimeConfig,
dataSourcesReactiveRuntimeConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

Expand All @@ -36,13 +37,15 @@ public class MySQLPoolRecorder {
private static final Logger log = Logger.getLogger(MySQLPoolRecorder.class);

public RuntimeValue<MySQLPool> configureMySQLPool(RuntimeValue<Vertx> vertx,
Supplier<Integer> eventLoopCount,
String dataSourceName,
DataSourcesRuntimeConfig dataSourcesRuntimeConfig,
DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig,
DataSourcesReactiveMySQLConfig dataSourcesReactiveMySQLConfig,
ShutdownContext shutdown) {

MySQLPool mysqlPool = initialize(vertx.getValue(),
eventLoopCount.get(),
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName),
dataSourcesReactiveMySQLConfig.getDataSourceReactiveRuntimeConfig(dataSourceName));
Expand All @@ -55,10 +58,12 @@ public RuntimeValue<io.vertx.mutiny.mysqlclient.MySQLPool> mutinyMySQLPool(Runti
return new RuntimeValue<>(io.vertx.mutiny.mysqlclient.MySQLPool.newInstance(mysqlPool.getValue()));
}

private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig,
private MySQLPool initialize(Vertx vertx,
Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) {
PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
Expand All @@ -69,7 +74,8 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
}

private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
private PoolOptions toPoolOptions(Integer eventLoopCount,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) {
PoolOptions poolOptions;
Expand All @@ -91,6 +97,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi
}
}

if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) {
poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt()));
} else if (eventLoopCount != null) {
poolOptions.setEventLoopSize(Math.max(0, eventLoopCount));
}

return poolOptions;
}

Expand Down
Loading

0 comments on commit c58781a

Please sign in to comment.