diff --git a/extensions/reactive-mysql-client/deployment/pom.xml b/extensions/reactive-mysql-client/deployment/pom.xml
index 19c3517b65487..65a588d4c143d 100644
--- a/extensions/reactive-mysql-client/deployment/pom.xml
+++ b/extensions/reactive-mysql-client/deployment/pom.xml
@@ -209,6 +209,7 @@
${project.basedir}/custom-mariadbconfig:/etc/mysql/conf.d${volume.access.modifier}
+ ${project.basedir}/src/test/resources/setup.sql:/docker-entrypoint-initdb.d/setup.sql
diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsProvider.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsProvider.java
new file mode 100644
index 0000000000000..528ae82c493d6
--- /dev/null
+++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsProvider.java
@@ -0,0 +1,36 @@
+package io.quarkus.reactive.mysql.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.credentials.CredentialsProvider;
+
+@ApplicationScoped
+public class ChangingCredentialsProvider implements CredentialsProvider {
+
+ private static final Logger log = Logger.getLogger(ChangingCredentialsProvider.class.getName());
+
+ private volatile Map properties;
+
+ public ChangingCredentialsProvider() {
+ properties = new HashMap<>();
+ properties.put(USER_PROPERTY_NAME, "hibernate_orm_test");
+ properties.put(PASSWORD_PROPERTY_NAME, "hibernate_orm_test");
+ }
+
+ public void changeProperties() {
+ properties = new HashMap<>();
+ properties.put(USER_PROPERTY_NAME, "user2");
+ properties.put(PASSWORD_PROPERTY_NAME, "user2");
+ }
+
+ @Override
+ public Map getCredentials(String credentialsProviderName) {
+ log.info("credentials provider returning " + properties);
+ return properties;
+ }
+}
diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTest.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTest.java
new file mode 100644
index 0000000000000..f992585ca3d9c
--- /dev/null
+++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTest.java
@@ -0,0 +1,38 @@
+package io.quarkus.reactive.mysql.client;
+
+import static io.restassured.RestAssured.given;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.quarkus.test.QuarkusUnitTest;
+
+public class ChangingCredentialsTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .withApplicationRoot((jar) -> jar
+ .addClass(ChangingCredentialsProvider.class)
+ .addClass(ChangingCredentialsTestResource.class)
+ .addAsResource("application-changing-credentials.properties", "application.properties"));
+
+ @Test
+ public void testConnect() throws Exception {
+ given()
+ .when().get("/test")
+ .then()
+ .statusCode(200)
+ .body(CoreMatchers.equalTo("hibernate_orm_test@%"));
+
+ SECONDS.sleep(2); // sleep longer than pool idle connection timeout
+
+ given()
+ .when().get("/test")
+ .then()
+ .statusCode(200)
+ .body(CoreMatchers.equalTo("user2@%"));
+ }
+
+}
diff --git a/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTestResource.java b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTestResource.java
new file mode 100644
index 0000000000000..491885db6cac8
--- /dev/null
+++ b/extensions/reactive-mysql-client/deployment/src/test/java/io/quarkus/reactive/mysql/client/ChangingCredentialsTestResource.java
@@ -0,0 +1,34 @@
+package io.quarkus.reactive.mysql.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+
+import io.smallrye.mutiny.Uni;
+import io.vertx.mutiny.mysqlclient.MySQLPool;
+
+@Path("/test")
+public class ChangingCredentialsTestResource {
+
+ @Inject
+ MySQLPool client;
+
+ @Inject
+ ChangingCredentialsProvider credentialsProvider;
+
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public Uni connect() {
+ return client.query("SELECT CURRENT_USER").execute()
+ .map(rowSet -> {
+ assertEquals(1, rowSet.size());
+ return Response.ok(rowSet.iterator().next().getString(0)).build();
+ }).eventually(credentialsProvider::changeProperties);
+ }
+
+}
diff --git a/extensions/reactive-mysql-client/deployment/src/test/resources/application-changing-credentials.properties b/extensions/reactive-mysql-client/deployment/src/test/resources/application-changing-credentials.properties
new file mode 100644
index 0000000000000..173159df97db6
--- /dev/null
+++ b/extensions/reactive-mysql-client/deployment/src/test/resources/application-changing-credentials.properties
@@ -0,0 +1,5 @@
+quarkus.datasource.db-kind=mysql
+quarkus.datasource.credentials-provider=changing
+quarkus.datasource.reactive.url=${reactive-mysql.url}
+quarkus.datasource.reactive.max-size=1
+quarkus.datasource.reactive.idle-timeout=PT1S
\ No newline at end of file
diff --git a/extensions/reactive-mysql-client/deployment/src/test/resources/setup.sql b/extensions/reactive-mysql-client/deployment/src/test/resources/setup.sql
new file mode 100644
index 0000000000000..183344aafbc0c
--- /dev/null
+++ b/extensions/reactive-mysql-client/deployment/src/test/resources/setup.sql
@@ -0,0 +1,3 @@
+CREATE USER 'user2'@'%' IDENTIFIED BY 'user2';
+GRANT ALL PRIVILEGES ON hibernate_orm_test.* TO 'user2'@'%';
+FLUSH PRIVILEGES;
\ No newline at end of file
diff --git a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java
index d47125f2eaf9e..38ca630a719f2 100644
--- a/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java
+++ b/extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java
@@ -9,8 +9,10 @@
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxKeyCertOptions;
import static io.quarkus.vertx.core.runtime.SSLConfigHelper.configurePfxTrustOptions;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Supplier;
import jakarta.enterprise.inject.Instance;
@@ -26,15 +28,24 @@
import io.quarkus.reactive.datasource.ReactiveDataSource;
import io.quarkus.reactive.datasource.runtime.DataSourceReactiveRuntimeConfig;
import io.quarkus.reactive.datasource.runtime.DataSourcesReactiveRuntimeConfig;
+import io.quarkus.reactive.datasource.runtime.PoolCloseFutureFactory;
+import io.quarkus.reactive.datasource.runtime.ReactiveDatasourceConnectionProvider;
+import io.quarkus.reactive.datasource.runtime.ReactiveDatasourceCredentialsProvider;
import io.quarkus.reactive.mysql.client.MySQLPoolCreator;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import io.vertx.core.impl.CloseFuture;
+import io.vertx.core.impl.VertxInternal;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.mysqlclient.SslMode;
+import io.vertx.mysqlclient.spi.MySQLDriver;
import io.vertx.sqlclient.PoolOptions;
+import io.vertx.sqlclient.SqlConnection;
@Recorder
@SuppressWarnings("deprecation")
@@ -50,7 +61,7 @@ public RuntimeValue configureMySQLPool(RuntimeValue vertx,
DataSourcesReactiveMySQLConfig dataSourcesReactiveMySQLConfig,
ShutdownContext shutdown) {
- MySQLPool mysqlPool = initialize(vertx.getValue(),
+ MySQLPool mysqlPool = initialize((VertxInternal) vertx.getValue(),
eventLoopCount.get(),
dataSourceName,
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
@@ -65,7 +76,7 @@ public RuntimeValue mutinyMySQLPool(Runti
return new RuntimeValue<>(io.vertx.mutiny.mysqlclient.MySQLPool.newInstance(mysqlPool.getValue()));
}
- private MySQLPool initialize(Vertx vertx,
+ private MySQLPool initialize(VertxInternal vertx,
Integer eventLoopCount,
String dataSourceName, DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
@@ -74,14 +85,47 @@ private MySQLPool initialize(Vertx vertx,
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
-
- // Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with mysql.
+ CloseFuture poolCloseFuture = PoolCloseFutureFactory.create(vertx);
+ ReactiveDatasourceConnectionProvider connectionProvider = toConnectionProvider(vertx,
+ mysqlConnectOptions, dataSourceRuntimeConfig, poolCloseFuture); // Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with mysql.
// and the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
mysqlConnectOptions.setMetricsName("mysql|" + dataSourceName);
- return createPool(vertx, poolOptions, mysqlConnectOptions, dataSourceName);
+ return createPool(vertx, poolOptions, mysqlConnectOptions, dataSourceName, connectionProvider, poolCloseFuture);
+ }
+
+ private ReactiveDatasourceConnectionProvider toConnectionProvider(VertxInternal vertx,
+ MySQLConnectOptions mysqlConnectOptions, DataSourceRuntimeConfig dataSourceRuntimeConfig,
+ CloseFuture poolCloseFuture) {
+ ReactiveDatasourceCredentialsProvider reactiveDatasourceCredentialsProvider;
+ if (dataSourceRuntimeConfig.credentialsProvider.isPresent()) {
+ String beanName = dataSourceRuntimeConfig.credentialsProviderName.orElse(null);
+ CredentialsProvider credentialsProvider = CredentialsProviderFinder.find(beanName);
+ String name = dataSourceRuntimeConfig.credentialsProvider.get();
+ Map credentials = credentialsProvider.getCredentials(name);
+ String user = credentials.get(USER_PROPERTY_NAME);
+ String password = credentials.get(PASSWORD_PROPERTY_NAME);
+ if (user != null) {
+ mysqlConnectOptions.setUser(user);
+ }
+ if (password != null) {
+ mysqlConnectOptions.setPassword(password);
+ }
+ reactiveDatasourceCredentialsProvider = new ReactiveDatasourceCredentialsProvider(credentialsProvider, name);
+ } else {
+ reactiveDatasourceCredentialsProvider = new ReactiveDatasourceCredentialsProvider(new CredentialsProvider() {
+ @Override
+ public Map getCredentials(String credentialsProviderName) {
+ return Map.of(USER_PROPERTY_NAME, mysqlConnectOptions.getUser(), PASSWORD_PROPERTY_NAME,
+ mysqlConnectOptions.getPassword());
+ }
+ }, null);
+ }
+
+ return new ReactiveDatasourceConnectionProvider<>(vertx, MySQLDriver.INSTANCE, reactiveDatasourceCredentialsProvider,
+ mysqlConnectOptions, MySQLConnectOptions::new, poolCloseFuture);
}
private PoolOptions toPoolOptions(Integer eventLoopCount,
@@ -142,22 +186,6 @@ private MySQLConnectOptions toMySQLConnectOptions(DataSourceRuntimeConfig dataSo
mysqlConnectOptions.setPassword(dataSourceRuntimeConfig.password.get());
}
- // credentials provider
- if (dataSourceRuntimeConfig.credentialsProvider.isPresent()) {
- String beanName = dataSourceRuntimeConfig.credentialsProviderName.orElse(null);
- CredentialsProvider credentialsProvider = CredentialsProviderFinder.find(beanName);
- String name = dataSourceRuntimeConfig.credentialsProvider.get();
- Map credentials = credentialsProvider.getCredentials(name);
- String user = credentials.get(USER_PROPERTY_NAME);
- String password = credentials.get(PASSWORD_PROPERTY_NAME);
- if (user != null) {
- mysqlConnectOptions.setUser(user);
- }
- if (password != null) {
- mysqlConnectOptions.setPassword(password);
- }
- }
-
mysqlConnectOptions.setCachePreparedStatements(dataSourceReactiveRuntimeConfig.cachePreparedStatements);
if (dataSourceReactiveMySQLConfig.charset.isPresent()) {
@@ -216,7 +244,7 @@ private MySQLConnectOptions toMySQLConnectOptions(DataSourceRuntimeConfig dataSo
}
private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectOptions mySQLConnectOptions,
- String dataSourceName) {
+ String dataSourceName, Function> connectionProvider, CloseFuture poolCloseFuture) {
Instance instance;
if (DataSourceUtil.isDefault(dataSourceName)) {
instance = Arc.container().select(MySQLPoolCreator.class);
@@ -228,7 +256,10 @@ private MySQLPool createPool(Vertx vertx, PoolOptions poolOptions, MySQLConnectO
MySQLPoolCreator.Input input = new DefaultInput(vertx, poolOptions, mySQLConnectOptions);
return instance.get().create(input);
}
- return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
+ MySQLPool pool = MySQLDriver.INSTANCE
+ .newPool(vertx, Collections.singletonList(mySQLConnectOptions), poolOptions, poolCloseFuture)
+ .connectionProvider(connectionProvider);
+ return pool;
}
private static class DefaultInput implements MySQLPoolCreator.Input {