Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Datasource: support CredentialsProvider changing values #31873

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,13 @@
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-db2-client</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions extensions/reactive-datasource/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -46,6 +51,17 @@
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.reactive.datasource;

import java.util.HashMap;
import java.util.Map;

import org.jboss.logging.Logger;

import io.quarkus.credentials.CredentialsProvider;

public abstract class ChangingCredentialsProviderBase implements CredentialsProvider {

private static final Logger log = Logger.getLogger(ChangingCredentialsProviderBase.class.getName());

private final String user2;
private final String password2;

private volatile Map<String, String> properties;

protected ChangingCredentialsProviderBase(String user1, String password1, String user2, String password2) {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, user1);
properties.put(PASSWORD_PROPERTY_NAME, password1);
this.user2 = user2;
this.password2 = password2;
}

public void changeProperties() {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, user2);
properties.put(PASSWORD_PROPERTY_NAME, password2);
}

@Override
public Map<String, String> getCredentials(String credentialsProviderName) {
log.info("credentials provider returning " + properties);
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.reactive.datasource;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;

public abstract class ChangingCredentialsTestBase {

private String user1;
private String user2;

protected ChangingCredentialsTestBase(String user1, String user2) {
this.user1 = user1;
this.user2 = user2;
}

@Test
public void testConnect() throws Exception {
given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo(user1));

SECONDS.sleep(2); // sleep longer than pool idle connection timeout

given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo(user2));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.reactive.datasource.runtime;

import static io.quarkus.credentials.CredentialsProvider.PASSWORD_PROPERTY_NAME;
import static io.quarkus.credentials.CredentialsProvider.USER_PROPERTY_NAME;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import io.quarkus.credentials.CredentialsProvider;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.SqlConnectOptions;

public class ConnectOptionsSupplier<CO extends SqlConnectOptions> implements Supplier<Future<CO>> {

private final Vertx vertx;
private final CredentialsProvider credentialsProvider;
private final String credentialsProviderName;
private final List<CO> connectOptionsList;
private final UnaryOperator<CO> connectOptionsCopy;
private final Handler<Promise<CO>> blockingCodeHandler;

public ConnectOptionsSupplier(Vertx vertx, CredentialsProvider credentialsProvider, String credentialsProviderName,
List<CO> connectOptionsList, UnaryOperator<CO> connectOptionsCopy) {
this.vertx = vertx;
this.credentialsProvider = credentialsProvider;
this.credentialsProviderName = credentialsProviderName;
this.connectOptionsList = connectOptionsList;
this.connectOptionsCopy = connectOptionsCopy;
blockingCodeHandler = new BlockingCodeHandler();
}

@Override
public Future<CO> get() {
return vertx.executeBlocking(blockingCodeHandler, false);
}

private class BlockingCodeHandler implements Handler<Promise<CO>>, IntUnaryOperator {

final AtomicInteger idx = new AtomicInteger();

@Override
public void handle(Promise<CO> promise) {
Map<String, String> credentials = credentialsProvider.getCredentials(credentialsProviderName);
String user = credentials.get(USER_PROPERTY_NAME);
String password = credentials.get(PASSWORD_PROPERTY_NAME);

int nextIdx = idx.getAndUpdate(this);

CO connectOptions = connectOptionsCopy.apply(connectOptionsList.get(nextIdx));
connectOptions.setUser(user).setPassword(password);

promise.complete(connectOptions);
}

@Override
public int applyAsInt(int previousIdx) {
return previousIdx == connectOptionsList.size() - 1 ? 0 : previousIdx + 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import io.quarkus.datasource.runtime.DataSourceRuntimeConfig;
import io.quarkus.datasource.runtime.DataSourcesRuntimeConfig;
import io.quarkus.reactive.datasource.ReactiveDataSource;
import io.quarkus.reactive.datasource.runtime.ConnectOptionsSupplier;
import io.quarkus.reactive.datasource.runtime.DataSourceReactiveRuntimeConfig;
import io.quarkus.reactive.datasource.runtime.DataSourcesReactiveRuntimeConfig;
import io.quarkus.reactive.db2.client.DB2PoolCreator;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.impl.Utils;

@Recorder
public class DB2PoolRecorder {
Expand All @@ -49,7 +53,7 @@ public RuntimeValue<DB2Pool> configureDB2Pool(RuntimeValue<Vertx> vertx,
DataSourcesReactiveDB2Config dataSourcesReactiveDB2Config,
ShutdownContext shutdown) {

DB2Pool db2Pool = initialize(vertx.getValue(),
DB2Pool db2Pool = initialize((VertxInternal) vertx.getValue(),
eventLoopCount.get(),
dataSourceName,
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
Expand All @@ -64,24 +68,34 @@ public RuntimeValue<io.vertx.mutiny.db2client.DB2Pool> mutinyDB2Pool(RuntimeValu
return new RuntimeValue<>(io.vertx.mutiny.db2client.DB2Pool.newInstance(db2Pool.getValue()));
}

private DB2Pool initialize(Vertx vertx,
private DB2Pool initialize(VertxInternal vertx,
Integer eventLoopCount,
String dataSourceName,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);

// Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with db2. and
// the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
connectOptions.setMetricsName("db2|" + dataSourceName);
DB2ConnectOptions db2ConnectOptions = toConnectOptions(dataSourceName, dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveDB2Config);
Supplier<Future<DB2ConnectOptions>> databasesSupplier = toDatabasesSupplier(vertx, List.of(db2ConnectOptions),
dataSourceRuntimeConfig);
return createPool(vertx, poolOptions, db2ConnectOptions, dataSourceName, databasesSupplier);
}

return createPool(vertx, poolOptions, connectOptions, dataSourceName);
private Supplier<Future<DB2ConnectOptions>> toDatabasesSupplier(Vertx vertx, List<DB2ConnectOptions> db2ConnectOptionsList,
DataSourceRuntimeConfig dataSourceRuntimeConfig) {
Supplier<Future<DB2ConnectOptions>> supplier;
if (dataSourceRuntimeConfig.credentialsProvider.isPresent()) {
String beanName = dataSourceRuntimeConfig.credentialsProviderName.orElse(null);
CredentialsProvider credentialsProvider = CredentialsProviderFinder.find(beanName);
String name = dataSourceRuntimeConfig.credentialsProvider.get();
supplier = new ConnectOptionsSupplier<>(vertx, credentialsProvider, name, db2ConnectOptionsList,
DB2ConnectOptions::new);
} else {
supplier = Utils.roundRobinSupplier(db2ConnectOptionsList);
}
return supplier;
}

private PoolOptions toPoolOptions(Integer eventLoopCount,
Expand Down Expand Up @@ -114,7 +128,7 @@ private PoolOptions toPoolOptions(Integer eventLoopCount,
return poolOptions;
}

private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
private DB2ConnectOptions toConnectOptions(String dataSourceName, DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
DB2ConnectOptions connectOptions;
Expand Down Expand Up @@ -155,7 +169,7 @@ private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRun
connectOptions.setUser(user);
}
if (password != null) {
connectOptions.setPassword(user);
connectOptions.setPassword(password);
}
}

Expand Down Expand Up @@ -184,11 +198,17 @@ private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRun

dataSourceReactiveRuntimeConfig.additionalProperties.forEach(connectOptions::addProperty);

// Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with db2.
// and the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
connectOptions.setMetricsName("db2|" + dataSourceName);

return connectOptions;
}

private DB2Pool createPool(Vertx vertx, PoolOptions poolOptions, DB2ConnectOptions dB2ConnectOptions,
String dataSourceName) {
String dataSourceName, Supplier<Future<DB2ConnectOptions>> databases) {
Instance<DB2PoolCreator> instance;
if (DataSourceUtil.isDefault(dataSourceName)) {
instance = Arc.container().select(DB2PoolCreator.class);
Expand All @@ -200,7 +220,7 @@ private DB2Pool createPool(Vertx vertx, PoolOptions poolOptions, DB2ConnectOptio
DB2PoolCreator.Input input = new DefaultInput(vertx, poolOptions, dB2ConnectOptions);
return instance.get().create(input);
}
return DB2Pool.pool(vertx, dB2ConnectOptions, poolOptions);
return DB2Pool.pool(vertx, databases, poolOptions);
}

private static class DefaultInput implements DB2PoolCreator.Input {
Expand Down
6 changes: 6 additions & 0 deletions extensions/reactive-mssql-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.quarkus.reactive.mssql.client;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkus.reactive.datasource.ChangingCredentialsProviderBase;

@ApplicationScoped
public class ChangingCredentialsProvider extends ChangingCredentialsProviderBase {

public ChangingCredentialsProvider() {
super("sa", "A_Str0ng_Required_Password", "user2", "user2_Has_A_Str0ng_Required_Password");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.reactive.mssql.client;

import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.reactive.datasource.ChangingCredentialsTestBase;
import io.quarkus.test.QuarkusUnitTest;

public class ChangingCredentialsTest extends ChangingCredentialsTestBase {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClass(ChangingCredentialsProvider.class)
.addClass(ChangingCredentialsTestResource.class)
.addAsResource("application-changing-credentials.properties", "application.properties"));

public ChangingCredentialsTest() {
super("dbo", "user2");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.reactive.mssql.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.mssqlclient.MSSQLPool;

@Path("/test")
public class ChangingCredentialsTestResource {

@Inject
MSSQLPool client;

@Inject
ChangingCredentialsProvider credentialsProvider;

void addUser(@Observes StartupEvent ignored) {
client.query("CREATE LOGIN user2 WITH PASSWORD = 'user2_Has_A_Str0ng_Required_Password'").executeAndAwait();
client.query("CREATE USER user2 FOR LOGIN user2").executeAndAwait();
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<Response> connect() {
return client.query("SELECT CURRENT_USER").execute()
.map(rowSet -> {
assertEquals(1, rowSet.size());
return Response.ok(rowSet.iterator().next().getString(0)).build();
}).eventually(credentialsProvider::changeProperties);
}

}
Loading