From c58781af810ae7e064f42aed345ec9d7c3fb53ed Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 25 Mar 2022 18:47:24 +0100 Subject: [PATCH] Reactive SQL Client Pool Event Loop Sizing When a new connection object is created, the pool assigns it an event loop. This change allows to configure the behavior of the pool as follows: - event-loop-size is set to a strictly positive value, the pool assigns as many event loops as specified, in a round-robin fashion - by default, the number of event loops configured or calculated by Quarkus is used - if event-loop-size is set to zero or a negative value, the pool assigns the current event loop to the new connection (the default behavior for a Vert.x app) --- .../DataSourceReactiveRuntimeConfig.java | 12 ++++++++++++ .../deployment/ReactiveDB2ClientProcessor.java | 8 ++++++-- .../db2/client/runtime/DB2PoolRecorder.java | 18 +++++++++++++++--- .../ReactiveMSSQLClientProcessor.java | 8 ++++++-- .../client/runtime/MSSQLPoolRecorder.java | 18 +++++++++++++++--- .../ReactiveMySQLClientProcessor.java | 8 ++++++-- .../client/runtime/MySQLPoolRecorder.java | 18 +++++++++++++++--- .../ReactiveOracleClientProcessor.java | 9 +++++++-- .../client/runtime/OraclePoolRecorder.java | 18 +++++++++++++++--- .../deployment/ReactivePgClientProcessor.java | 8 ++++++-- .../pg/client/runtime/PgPoolRecorder.java | 18 +++++++++++++++--- 11 files changed, 118 insertions(+), 25 deletions(-) diff --git a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java index 22b0f5d14c278..6ee776bacaa8f 100644 --- a/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java +++ b/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java @@ -32,6 +32,18 @@ public class DataSourceReactiveRuntimeConfig { @ConfigItem public OptionalInt maxSize = OptionalInt.empty(); + /** + * When a new connection object is created, the pool assigns it an event loop. + *

+ * When {@code #event-loop-size} is set to a strictly positive value, the pool assigns as many event loops as specified, in + * a round-robin fashion. + * By default, the number of event loops configured or calculated by Quarkus is used. + * If {@code #event-loop-size} is set to zero or a negative value, the pool assigns the current event loop to the new + * connection. + */ + @ConfigItem + public OptionalInt eventLoopSize = OptionalInt.empty(); + /** * Whether all server certificates should be trusted. */ diff --git a/extensions/reactive-db2-client/deployment/src/main/java/io/quarkus/reactive/db2/client/deployment/ReactiveDB2ClientProcessor.java b/extensions/reactive-db2-client/deployment/src/main/java/io/quarkus/reactive/db2/client/deployment/ReactiveDB2ClientProcessor.java index 78ff2e7e3d487..cf1f721447f8e 100644 --- a/extensions/reactive-db2-client/deployment/src/main/java/io/quarkus/reactive/db2/client/deployment/ReactiveDB2ClientProcessor.java +++ b/extensions/reactive-db2-client/deployment/src/main/java/io/quarkus/reactive/db2/client/deployment/ReactiveDB2ClientProcessor.java @@ -36,6 +36,7 @@ import io.quarkus.reactive.db2.client.runtime.DataSourcesReactiveDB2Config; import io.quarkus.runtime.RuntimeValue; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.deployment.VertxBuildItem; import io.vertx.db2client.DB2Pool; import io.vertx.sqlclient.Pool; @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer feature, BuildProducer vertxPool, DB2PoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans, BuildProducer sslNativeSupport, @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer feature, feature.produce(new FeatureBuildItem(Feature.REACTIVE_DB2_CLIENT)); - createPoolIfDefined(recorder, vertx, shutdown, db2Pool, vertxPool, syntheticBeans, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, db2Pool, vertxPool, syntheticBeans, DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveDB2Config, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) { - createPoolIfDefined(recorder, vertx, shutdown, db2Pool, vertxPool, syntheticBeans, dataSourceName, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, db2Pool, vertxPool, syntheticBeans, dataSourceName, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveDB2Config, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); @@ -117,6 +119,7 @@ void addHealthCheck( private void createPoolIfDefined(DB2PoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer db2Pool, BuildProducer vertxPool, @@ -136,6 +139,7 @@ private void createPoolIfDefined(DB2PoolRecorder recorder, } RuntimeValue pool = recorder.configureDB2Pool(vertx.getVertx(), + eventLoopCount.getEventLoopCount(), dataSourceName, dataSourcesRuntimeConfig, dataSourcesReactiveRuntimeConfig, diff --git a/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java index 289314266e605..56d7cafdffab0 100644 --- a/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java +++ b/extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -34,6 +35,7 @@ public class DB2PoolRecorder { private static final Logger log = Logger.getLogger(DB2PoolRecorder.class); public RuntimeValue configureDB2Pool(RuntimeValue vertx, + Supplier eventLoopCount, String dataSourceName, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig, @@ -41,6 +43,7 @@ public RuntimeValue configureDB2Pool(RuntimeValue vertx, ShutdownContext shutdown) { DB2Pool db2Pool = initialize(vertx.getValue(), + eventLoopCount.get(), dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName), dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName), dataSourcesReactiveDB2Config.getDataSourceReactiveRuntimeConfig(dataSourceName)); @@ -53,10 +56,12 @@ public RuntimeValue mutinyDB2Pool(RuntimeValu return new RuntimeValue<>(io.vertx.mutiny.db2client.DB2Pool.newInstance(db2Pool.getValue())); } - private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig, + private DB2Pool initialize(Vertx vertx, + Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveDB2Config dataSourceReactiveDB2Config) { - PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveDB2Config); DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveDB2Config); @@ -67,7 +72,8 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim return DB2Pool.pool(vertx, connectOptions, poolOptions); } - private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PoolOptions toPoolOptions(Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveDB2Config dataSourceReactiveDB2Config) { PoolOptions poolOptions; @@ -89,6 +95,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi } } + if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) { + poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt())); + } else if (eventLoopCount != null) { + poolOptions.setEventLoopSize(Math.max(0, eventLoopCount)); + } + return poolOptions; } diff --git a/extensions/reactive-mssql-client/deployment/src/main/java/io/quarkus/reactive/mssql/client/deployment/ReactiveMSSQLClientProcessor.java b/extensions/reactive-mssql-client/deployment/src/main/java/io/quarkus/reactive/mssql/client/deployment/ReactiveMSSQLClientProcessor.java index e888bf28d844f..02a98ae5f3e0d 100644 --- a/extensions/reactive-mssql-client/deployment/src/main/java/io/quarkus/reactive/mssql/client/deployment/ReactiveMSSQLClientProcessor.java +++ b/extensions/reactive-mssql-client/deployment/src/main/java/io/quarkus/reactive/mssql/client/deployment/ReactiveMSSQLClientProcessor.java @@ -36,6 +36,7 @@ import io.quarkus.reactive.mssql.client.runtime.MSSQLPoolRecorder; import io.quarkus.runtime.RuntimeValue; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.deployment.VertxBuildItem; import io.vertx.mssqlclient.MSSQLPool; import io.vertx.sqlclient.Pool; @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer feature, BuildProducer vertxPool, MSSQLPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans, BuildProducer sslNativeSupport, @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer feature, feature.produce(new FeatureBuildItem(Feature.REACTIVE_MSSQL_CLIENT)); - createPoolIfDefined(recorder, vertx, shutdown, msSQLPool, vertxPool, syntheticBeans, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, msSQLPool, vertxPool, syntheticBeans, DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMSSQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) { - createPoolIfDefined(recorder, vertx, shutdown, msSQLPool, vertxPool, syntheticBeans, dataSourceName, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, msSQLPool, vertxPool, syntheticBeans, dataSourceName, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMSSQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); @@ -117,6 +119,7 @@ void addHealthCheck( private void createPoolIfDefined(MSSQLPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer msSQLPool, BuildProducer vertxPool, @@ -136,6 +139,7 @@ private void createPoolIfDefined(MSSQLPoolRecorder recorder, } RuntimeValue pool = recorder.configureMSSQLPool(vertx.getVertx(), + eventLoopCount.getEventLoopCount(), dataSourceName, dataSourcesRuntimeConfig, dataSourcesReactiveRuntimeConfig, diff --git a/extensions/reactive-mssql-client/runtime/src/main/java/io/quarkus/reactive/mssql/client/runtime/MSSQLPoolRecorder.java b/extensions/reactive-mssql-client/runtime/src/main/java/io/quarkus/reactive/mssql/client/runtime/MSSQLPoolRecorder.java index e1d670d5e6a72..46b15be6bc31a 100644 --- a/extensions/reactive-mssql-client/runtime/src/main/java/io/quarkus/reactive/mssql/client/runtime/MSSQLPoolRecorder.java +++ b/extensions/reactive-mssql-client/runtime/src/main/java/io/quarkus/reactive/mssql/client/runtime/MSSQLPoolRecorder.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -35,6 +36,7 @@ public class MSSQLPoolRecorder { private static final Logger log = Logger.getLogger(MSSQLPoolRecorder.class); public RuntimeValue configureMSSQLPool(RuntimeValue vertx, + Supplier eventLoopCount, String dataSourceName, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig, @@ -42,6 +44,7 @@ public RuntimeValue configureMSSQLPool(RuntimeValue vertx, ShutdownContext shutdown) { MSSQLPool mssqlPool = initialize(vertx.getValue(), + eventLoopCount.get(), dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName), dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName), dataSourcesReactiveMSSQLConfig.getDataSourceReactiveRuntimeConfig(dataSourceName)); @@ -54,10 +57,12 @@ public RuntimeValue mutinyMSSQLPool(Runti return new RuntimeValue<>(io.vertx.mutiny.mssqlclient.MSSQLPool.newInstance(mssqlPool.getValue())); } - private MSSQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig, + private MSSQLPool initialize(Vertx vertx, + Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveMSSQLConfig dataSourceReactiveMSSQLConfig) { - PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMSSQLConfig); MSSQLConnectOptions mssqlConnectOptions = toMSSQLConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMSSQLConfig); @@ -68,7 +73,8 @@ private MSSQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt return MSSQLPool.pool(vertx, mssqlConnectOptions, poolOptions); } - private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PoolOptions toPoolOptions(Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveMSSQLConfig dataSourceReactiveMSSQLConfig) { PoolOptions poolOptions; @@ -90,6 +96,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi } } + if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) { + poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt())); + } else if (eventLoopCount != null) { + poolOptions.setEventLoopSize(Math.max(0, eventLoopCount)); + } + return poolOptions; } diff --git a/extensions/reactive-mysql-client/deployment/src/main/java/io/quarkus/reactive/mysql/client/deployment/ReactiveMySQLClientProcessor.java b/extensions/reactive-mysql-client/deployment/src/main/java/io/quarkus/reactive/mysql/client/deployment/ReactiveMySQLClientProcessor.java index 13b282e330264..669f063c102c2 100644 --- a/extensions/reactive-mysql-client/deployment/src/main/java/io/quarkus/reactive/mysql/client/deployment/ReactiveMySQLClientProcessor.java +++ b/extensions/reactive-mysql-client/deployment/src/main/java/io/quarkus/reactive/mysql/client/deployment/ReactiveMySQLClientProcessor.java @@ -36,6 +36,7 @@ import io.quarkus.reactive.mysql.client.runtime.MySQLPoolRecorder; import io.quarkus.runtime.RuntimeValue; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.deployment.VertxBuildItem; import io.vertx.mysqlclient.MySQLPool; import io.vertx.sqlclient.Pool; @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer feature, BuildProducer vertxPool, MySQLPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans, BuildProducer sslNativeSupport, @@ -61,13 +63,13 @@ ServiceStartBuildItem build(BuildProducer feature, feature.produce(new FeatureBuildItem(Feature.REACTIVE_MYSQL_CLIENT)); - createPoolIfDefined(recorder, vertx, shutdown, mySQLPool, vertxPool, syntheticBeans, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, mySQLPool, vertxPool, syntheticBeans, DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMySQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) { - createPoolIfDefined(recorder, vertx, shutdown, mySQLPool, vertxPool, syntheticBeans, dataSourceName, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, mySQLPool, vertxPool, syntheticBeans, dataSourceName, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveMySQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); @@ -117,6 +119,7 @@ void addHealthCheck( private void createPoolIfDefined(MySQLPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer mySQLPool, BuildProducer vertxPool, @@ -136,6 +139,7 @@ private void createPoolIfDefined(MySQLPoolRecorder recorder, } RuntimeValue pool = recorder.configureMySQLPool(vertx.getVertx(), + eventLoopCount.getEventLoopCount(), dataSourceName, dataSourcesRuntimeConfig, dataSourcesReactiveRuntimeConfig, diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java index 02a42e369e58c..75ab6bdd1e04b 100644 --- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java +++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -36,6 +37,7 @@ public class MySQLPoolRecorder { private static final Logger log = Logger.getLogger(MySQLPoolRecorder.class); public RuntimeValue configureMySQLPool(RuntimeValue vertx, + Supplier eventLoopCount, String dataSourceName, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig, @@ -43,6 +45,7 @@ public RuntimeValue configureMySQLPool(RuntimeValue vertx, ShutdownContext shutdown) { MySQLPool mysqlPool = initialize(vertx.getValue(), + eventLoopCount.get(), dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName), dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName), dataSourcesReactiveMySQLConfig.getDataSourceReactiveRuntimeConfig(dataSourceName)); @@ -55,10 +58,12 @@ public RuntimeValue mutinyMySQLPool(Runti return new RuntimeValue<>(io.vertx.mutiny.mysqlclient.MySQLPool.newInstance(mysqlPool.getValue())); } - private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig, + private MySQLPool initialize(Vertx vertx, + Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) { - PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig); MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig); @@ -69,7 +74,8 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions); } - private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PoolOptions toPoolOptions(Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveMySQLConfig dataSourceReactiveMySQLConfig) { PoolOptions poolOptions; @@ -91,6 +97,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi } } + if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) { + poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt())); + } else if (eventLoopCount != null) { + poolOptions.setEventLoopSize(Math.max(0, eventLoopCount)); + } + return poolOptions; } diff --git a/extensions/reactive-oracle-client/deployment/src/main/java/io/quarkus/reactive/oracle/client/deployment/ReactiveOracleClientProcessor.java b/extensions/reactive-oracle-client/deployment/src/main/java/io/quarkus/reactive/oracle/client/deployment/ReactiveOracleClientProcessor.java index 18562fa090d3d..a2b4a784c1aac 100644 --- a/extensions/reactive-oracle-client/deployment/src/main/java/io/quarkus/reactive/oracle/client/deployment/ReactiveOracleClientProcessor.java +++ b/extensions/reactive-oracle-client/deployment/src/main/java/io/quarkus/reactive/oracle/client/deployment/ReactiveOracleClientProcessor.java @@ -36,6 +36,7 @@ import io.quarkus.reactive.oracle.client.runtime.OraclePoolRecorder; import io.quarkus.runtime.RuntimeValue; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.deployment.VertxBuildItem; import io.vertx.oracleclient.OraclePool; import io.vertx.sqlclient.Pool; @@ -49,6 +50,7 @@ ServiceStartBuildItem build(BuildProducer feature, BuildProducer vertxPool, OraclePoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans, BuildProducer sslNativeSupport, @@ -61,13 +63,14 @@ ServiceStartBuildItem build(BuildProducer feature, feature.produce(new FeatureBuildItem(Feature.REACTIVE_ORACLE_CLIENT)); - createPoolIfDefined(recorder, vertx, shutdown, oraclePool, vertxPool, syntheticBeans, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, oraclePool, vertxPool, syntheticBeans, DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveOracleConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) { - createPoolIfDefined(recorder, vertx, shutdown, oraclePool, vertxPool, syntheticBeans, dataSourceName, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, oraclePool, vertxPool, syntheticBeans, + dataSourceName, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactiveOracleConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); @@ -117,6 +120,7 @@ void addHealthCheck( private void createPoolIfDefined(OraclePoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer oraclePool, BuildProducer vertxPool, @@ -136,6 +140,7 @@ private void createPoolIfDefined(OraclePoolRecorder recorder, } RuntimeValue pool = recorder.configureOraclePool(vertx.getVertx(), + eventLoopCount.getEventLoopCount(), dataSourceName, dataSourcesRuntimeConfig, dataSourcesReactiveRuntimeConfig, diff --git a/extensions/reactive-oracle-client/runtime/src/main/java/io/quarkus/reactive/oracle/client/runtime/OraclePoolRecorder.java b/extensions/reactive-oracle-client/runtime/src/main/java/io/quarkus/reactive/oracle/client/runtime/OraclePoolRecorder.java index b2d7c082ba0e6..357ff469bcc38 100644 --- a/extensions/reactive-oracle-client/runtime/src/main/java/io/quarkus/reactive/oracle/client/runtime/OraclePoolRecorder.java +++ b/extensions/reactive-oracle-client/runtime/src/main/java/io/quarkus/reactive/oracle/client/runtime/OraclePoolRecorder.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -29,6 +30,7 @@ public class OraclePoolRecorder { private static final Logger log = Logger.getLogger(OraclePoolRecorder.class); public RuntimeValue configureOraclePool(RuntimeValue vertx, + Supplier eventLoopCount, String dataSourceName, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig, @@ -36,6 +38,7 @@ public RuntimeValue configureOraclePool(RuntimeValue vertx, ShutdownContext shutdown) { OraclePool oraclePool = initialize(vertx.getValue(), + eventLoopCount.get(), dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName), dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName), dataSourcesReactiveOracleConfig.getDataSourceReactiveRuntimeConfig(dataSourceName)); @@ -48,10 +51,12 @@ public RuntimeValue mutinyOraclePool(Ru return new RuntimeValue<>(io.vertx.mutiny.oracleclient.OraclePool.newInstance(oraclePool.getValue())); } - private OraclePool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig, + private OraclePool initialize(Vertx vertx, + Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveOracleConfig dataSourceReactiveOracleConfig) { - PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveOracleConfig); OracleConnectOptions oracleConnectOptions = toOracleConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactiveOracleConfig); @@ -62,7 +67,8 @@ private OraclePool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRun return OraclePool.pool(vertx, oracleConnectOptions, poolOptions); } - private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PoolOptions toPoolOptions(Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactiveOracleConfig dataSourceReactiveOracleConfig) { PoolOptions poolOptions; @@ -77,6 +83,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi poolOptions.setIdleTimeout(idleTimeout).setIdleTimeoutUnit(TimeUnit.MILLISECONDS); } + if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) { + poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt())); + } else if (eventLoopCount != null) { + poolOptions.setEventLoopSize(Math.max(0, eventLoopCount)); + } + return poolOptions; } diff --git a/extensions/reactive-pg-client/deployment/src/main/java/io/quarkus/reactive/pg/client/deployment/ReactivePgClientProcessor.java b/extensions/reactive-pg-client/deployment/src/main/java/io/quarkus/reactive/pg/client/deployment/ReactivePgClientProcessor.java index b79c052d757ef..980ca33e3d122 100644 --- a/extensions/reactive-pg-client/deployment/src/main/java/io/quarkus/reactive/pg/client/deployment/ReactivePgClientProcessor.java +++ b/extensions/reactive-pg-client/deployment/src/main/java/io/quarkus/reactive/pg/client/deployment/ReactivePgClientProcessor.java @@ -37,6 +37,7 @@ import io.quarkus.reactive.pg.client.runtime.PgPoolRecorder; import io.quarkus.runtime.RuntimeValue; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; +import io.quarkus.vertx.core.deployment.EventLoopCountBuildItem; import io.quarkus.vertx.deployment.VertxBuildItem; import io.vertx.pgclient.PgPool; import io.vertx.sqlclient.Pool; @@ -61,6 +62,7 @@ ServiceStartBuildItem build(BuildProducer feature, BuildProducer vertxPool, PgPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans, BuildProducer sslNativeSupport, @@ -73,13 +75,13 @@ ServiceStartBuildItem build(BuildProducer feature, feature.produce(new FeatureBuildItem(Feature.REACTIVE_PG_CLIENT)); - createPoolIfDefined(recorder, vertx, shutdown, pgPool, vertxPool, syntheticBeans, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, pgPool, vertxPool, syntheticBeans, DataSourceUtil.DEFAULT_DATASOURCE_NAME, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactivePostgreSQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); for (String dataSourceName : dataSourcesBuildTimeConfig.namedDataSources.keySet()) { - createPoolIfDefined(recorder, vertx, shutdown, pgPool, vertxPool, syntheticBeans, dataSourceName, + createPoolIfDefined(recorder, vertx, eventLoopCount, shutdown, pgPool, vertxPool, syntheticBeans, dataSourceName, dataSourcesBuildTimeConfig, dataSourcesRuntimeConfig, dataSourcesReactiveBuildTimeConfig, dataSourcesReactiveRuntimeConfig, dataSourcesReactivePostgreSQLConfig, defaultDataSourceDbKindBuildItems, curateOutcomeBuildItem); @@ -124,6 +126,7 @@ void registerServiceBinding(BuildProducer dbKi private void createPoolIfDefined(PgPoolRecorder recorder, VertxBuildItem vertx, + EventLoopCountBuildItem eventLoopCount, ShutdownContextBuildItem shutdown, BuildProducer pgPool, BuildProducer vertxPool, @@ -143,6 +146,7 @@ private void createPoolIfDefined(PgPoolRecorder recorder, } RuntimeValue pool = recorder.configurePgPool(vertx.getVertx(), + eventLoopCount.getEventLoopCount(), dataSourceName, dataSourcesRuntimeConfig, dataSourcesReactiveRuntimeConfig, diff --git a/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java index 03606bc7f1a40..7a17c333749d3 100644 --- a/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java +++ b/extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -36,6 +37,7 @@ public class PgPoolRecorder { private static final Logger log = Logger.getLogger(PgPoolRecorder.class); public RuntimeValue configurePgPool(RuntimeValue vertx, + Supplier eventLoopCount, String dataSourceName, DataSourcesRuntimeConfig dataSourcesRuntimeConfig, DataSourcesReactiveRuntimeConfig dataSourcesReactiveRuntimeConfig, @@ -43,6 +45,7 @@ public RuntimeValue configurePgPool(RuntimeValue vertx, ShutdownContext shutdown) { PgPool pgPool = initialize(vertx.getValue(), + eventLoopCount.get(), dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName), dataSourcesReactiveRuntimeConfig.getDataSourceReactiveRuntimeConfig(dataSourceName), dataSourcesReactivePostgreSQLConfig.getDataSourceReactiveRuntimeConfig(dataSourceName)); @@ -55,10 +58,12 @@ public RuntimeValue mutinyPgPool(RuntimeValue

(io.vertx.mutiny.pgclient.PgPool.newInstance(pgPool.getValue())); } - private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PgPool initialize(Vertx vertx, + Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactivePostgreSQLConfig dataSourceReactivePostgreSQLConfig) { - PoolOptions poolOptions = toPoolOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, + PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactivePostgreSQLConfig); PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig, dataSourceReactivePostgreSQLConfig); @@ -69,7 +74,8 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime return PgPool.pool(vertx, pgConnectOptions, poolOptions); } - private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig, + private PoolOptions toPoolOptions(Integer eventLoopCount, + DataSourceRuntimeConfig dataSourceRuntimeConfig, DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig, DataSourceReactivePostgreSQLConfig dataSourceReactivePostgreSQLConfig) { PoolOptions poolOptions; @@ -91,6 +97,12 @@ private PoolOptions toPoolOptions(DataSourceRuntimeConfig dataSourceRuntimeConfi } } + if (dataSourceReactiveRuntimeConfig.eventLoopSize.isPresent()) { + poolOptions.setEventLoopSize(Math.max(0, dataSourceReactiveRuntimeConfig.eventLoopSize.getAsInt())); + } else if (eventLoopCount != null) { + poolOptions.setEventLoopSize(Math.max(0, eventLoopCount)); + } + return poolOptions; }