Skip to content

Commit

Permalink
Reactive Datasource: support CredentialsProvider changing values
Browse files Browse the repository at this point in the history
Instead of:

- creating pgConnectOptions after calling the CredentialsProvider on startup
- using static pool configuration

We:

- use dynamic pool configuration
- call the Credentials provider every time a new connection must be created
  • Loading branch information
tsegismont committed Jun 12, 2023
1 parent 197c921 commit 3e5afbf
Show file tree
Hide file tree
Showing 31 changed files with 679 additions and 69 deletions.
7 changes: 7 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,13 @@
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-db2-client</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions extensions/reactive-datasource/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -46,6 +51,17 @@
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.reactive.datasource;

import java.util.HashMap;
import java.util.Map;

import org.jboss.logging.Logger;

import io.quarkus.credentials.CredentialsProvider;

public abstract class ChangingCredentialsProviderBase implements CredentialsProvider {

private static final Logger log = Logger.getLogger(ChangingCredentialsProviderBase.class.getName());

private final String user2;
private final String password2;

private volatile Map<String, String> properties;

protected ChangingCredentialsProviderBase(String user1, String password1, String user2, String password2) {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, user1);
properties.put(PASSWORD_PROPERTY_NAME, password1);
this.user2 = user2;
this.password2 = password2;
}

public void changeProperties() {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, user2);
properties.put(PASSWORD_PROPERTY_NAME, password2);
}

@Override
public Map<String, String> getCredentials(String credentialsProviderName) {
log.info("credentials provider returning " + properties);
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.reactive.datasource;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;

public abstract class ChangingCredentialsTestBase {

private String user1;
private String user2;

protected ChangingCredentialsTestBase(String user1, String user2) {
this.user1 = user1;
this.user2 = user2;
}

@Test
public void testConnect() throws Exception {
given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo(user1));

SECONDS.sleep(2); // sleep longer than pool idle connection timeout

given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo(user2));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.reactive.datasource.runtime;

import static io.quarkus.credentials.CredentialsProvider.PASSWORD_PROPERTY_NAME;
import static io.quarkus.credentials.CredentialsProvider.USER_PROPERTY_NAME;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import io.quarkus.credentials.CredentialsProvider;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.SqlConnectOptions;

public class ConnectOptionsSupplier<CO extends SqlConnectOptions> implements Supplier<Future<CO>> {

private final Vertx vertx;
private final CredentialsProvider credentialsProvider;
private final String credentialsProviderName;
private final List<CO> connectOptionsList;
private final UnaryOperator<CO> connectOptionsCopy;
private final Handler<Promise<CO>> blockingCodeHandler;

public ConnectOptionsSupplier(Vertx vertx, CredentialsProvider credentialsProvider, String credentialsProviderName,
List<CO> connectOptionsList, UnaryOperator<CO> connectOptionsCopy) {
this.vertx = vertx;
this.credentialsProvider = credentialsProvider;
this.credentialsProviderName = credentialsProviderName;
this.connectOptionsList = connectOptionsList;
this.connectOptionsCopy = connectOptionsCopy;
blockingCodeHandler = new BlockingCodeHandler();
}

@Override
public Future<CO> get() {
return vertx.executeBlocking(blockingCodeHandler, false);
}

private class BlockingCodeHandler implements Handler<Promise<CO>>, IntUnaryOperator {

final AtomicInteger idx = new AtomicInteger();

@Override
public void handle(Promise<CO> promise) {
Map<String, String> credentials = credentialsProvider.getCredentials(credentialsProviderName);
String user = credentials.get(USER_PROPERTY_NAME);
String password = credentials.get(PASSWORD_PROPERTY_NAME);

int nextIdx = idx.getAndUpdate(this);

CO connectOptions = connectOptionsCopy.apply(connectOptionsList.get(nextIdx));
connectOptions.setUser(user).setPassword(password);

promise.complete(connectOptions);
}

@Override
public int applyAsInt(int previousIdx) {
return previousIdx == connectOptionsList.size() - 1 ? 0 : previousIdx + 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import io.quarkus.datasource.runtime.DataSourceRuntimeConfig;
import io.quarkus.datasource.runtime.DataSourcesRuntimeConfig;
import io.quarkus.reactive.datasource.ReactiveDataSource;
import io.quarkus.reactive.datasource.runtime.ConnectOptionsSupplier;
import io.quarkus.reactive.datasource.runtime.DataSourceReactiveRuntimeConfig;
import io.quarkus.reactive.datasource.runtime.DataSourcesReactiveRuntimeConfig;
import io.quarkus.reactive.db2.client.DB2PoolCreator;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.impl.Utils;

@Recorder
public class DB2PoolRecorder {
Expand All @@ -49,7 +53,7 @@ public RuntimeValue<DB2Pool> configureDB2Pool(RuntimeValue<Vertx> vertx,
DataSourcesReactiveDB2Config dataSourcesReactiveDB2Config,
ShutdownContext shutdown) {

DB2Pool db2Pool = initialize(vertx.getValue(),
DB2Pool db2Pool = initialize((VertxInternal) vertx.getValue(),
eventLoopCount.get(),
dataSourceName,
dataSourcesRuntimeConfig.getDataSourceRuntimeConfig(dataSourceName),
Expand All @@ -64,24 +68,34 @@ public RuntimeValue<io.vertx.mutiny.db2client.DB2Pool> mutinyDB2Pool(RuntimeValu
return new RuntimeValue<>(io.vertx.mutiny.db2client.DB2Pool.newInstance(db2Pool.getValue()));
}

private DB2Pool initialize(Vertx vertx,
private DB2Pool initialize(VertxInternal vertx,
Integer eventLoopCount,
String dataSourceName,
DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
PoolOptions poolOptions = toPoolOptions(eventLoopCount, dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);

// Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with db2. and
// the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
connectOptions.setMetricsName("db2|" + dataSourceName);
DB2ConnectOptions db2ConnectOptions = toConnectOptions(dataSourceName, dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveDB2Config);
Supplier<Future<DB2ConnectOptions>> databasesSupplier = toDatabasesSupplier(vertx, List.of(db2ConnectOptions),
dataSourceRuntimeConfig);
return createPool(vertx, poolOptions, db2ConnectOptions, dataSourceName, databasesSupplier);
}

return createPool(vertx, poolOptions, connectOptions, dataSourceName);
private Supplier<Future<DB2ConnectOptions>> toDatabasesSupplier(Vertx vertx, List<DB2ConnectOptions> db2ConnectOptionsList,
DataSourceRuntimeConfig dataSourceRuntimeConfig) {
Supplier<Future<DB2ConnectOptions>> supplier;
if (dataSourceRuntimeConfig.credentialsProvider.isPresent()) {
String beanName = dataSourceRuntimeConfig.credentialsProviderName.orElse(null);
CredentialsProvider credentialsProvider = CredentialsProviderFinder.find(beanName);
String name = dataSourceRuntimeConfig.credentialsProvider.get();
supplier = new ConnectOptionsSupplier<>(vertx, credentialsProvider, name, db2ConnectOptionsList,
DB2ConnectOptions::new);
} else {
supplier = Utils.roundRobinSupplier(db2ConnectOptionsList);
}
return supplier;
}

private PoolOptions toPoolOptions(Integer eventLoopCount,
Expand Down Expand Up @@ -114,7 +128,7 @@ private PoolOptions toPoolOptions(Integer eventLoopCount,
return poolOptions;
}

private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRuntimeConfig,
private DB2ConnectOptions toConnectOptions(String dataSourceName, DataSourceRuntimeConfig dataSourceRuntimeConfig,
DataSourceReactiveRuntimeConfig dataSourceReactiveRuntimeConfig,
DataSourceReactiveDB2Config dataSourceReactiveDB2Config) {
DB2ConnectOptions connectOptions;
Expand Down Expand Up @@ -155,7 +169,7 @@ private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRun
connectOptions.setUser(user);
}
if (password != null) {
connectOptions.setPassword(user);
connectOptions.setPassword(password);
}
}

Expand Down Expand Up @@ -184,11 +198,17 @@ private DB2ConnectOptions toConnectOptions(DataSourceRuntimeConfig dataSourceRun

dataSourceReactiveRuntimeConfig.additionalProperties.forEach(connectOptions::addProperty);

// Use the convention defined by Quarkus Micrometer Vert.x metrics to create metrics prefixed with db2.
// and the client_name as tag.
// See io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractPrefix and
// io.quarkus.micrometer.runtime.binder.vertx.VertxMeterBinderAdapter.extractClientName
connectOptions.setMetricsName("db2|" + dataSourceName);

return connectOptions;
}

private DB2Pool createPool(Vertx vertx, PoolOptions poolOptions, DB2ConnectOptions dB2ConnectOptions,
String dataSourceName) {
String dataSourceName, Supplier<Future<DB2ConnectOptions>> databases) {
Instance<DB2PoolCreator> instance;
if (DataSourceUtil.isDefault(dataSourceName)) {
instance = Arc.container().select(DB2PoolCreator.class);
Expand All @@ -200,7 +220,7 @@ private DB2Pool createPool(Vertx vertx, PoolOptions poolOptions, DB2ConnectOptio
DB2PoolCreator.Input input = new DefaultInput(vertx, poolOptions, dB2ConnectOptions);
return instance.get().create(input);
}
return DB2Pool.pool(vertx, dB2ConnectOptions, poolOptions);
return DB2Pool.pool(vertx, databases, poolOptions);
}

private static class DefaultInput implements DB2PoolCreator.Input {
Expand Down
6 changes: 6 additions & 0 deletions extensions/reactive-mssql-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-datasource-deployment</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.quarkus.reactive.mssql.client;

import jakarta.enterprise.context.ApplicationScoped;

import io.quarkus.reactive.datasource.ChangingCredentialsProviderBase;

@ApplicationScoped
public class ChangingCredentialsProvider extends ChangingCredentialsProviderBase {

public ChangingCredentialsProvider() {
super("sa", "A_Str0ng_Required_Password", "user2", "user2_Has_A_Str0ng_Required_Password");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.reactive.mssql.client;

import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.reactive.datasource.ChangingCredentialsTestBase;
import io.quarkus.test.QuarkusUnitTest;

public class ChangingCredentialsTest extends ChangingCredentialsTestBase {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClass(ChangingCredentialsProvider.class)
.addClass(ChangingCredentialsTestResource.class)
.addAsResource("application-changing-credentials.properties", "application.properties"));

public ChangingCredentialsTest() {
super("dbo", "user2");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.reactive.mssql.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.mssqlclient.MSSQLPool;

@Path("/test")
public class ChangingCredentialsTestResource {

@Inject
MSSQLPool client;

@Inject
ChangingCredentialsProvider credentialsProvider;

void addUser(@Observes StartupEvent ignored) {
client.query("CREATE LOGIN user2 WITH PASSWORD = 'user2_Has_A_Str0ng_Required_Password'").executeAndAwait();
client.query("CREATE USER user2 FOR LOGIN user2").executeAndAwait();
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<Response> connect() {
return client.query("SELECT CURRENT_USER").execute()
.map(rowSet -> {
assertEquals(1, rowSet.size());
return Response.ok(rowSet.iterator().next().getString(0)).build();
}).eventually(credentialsProvider::changeProperties);
}

}
Loading

0 comments on commit 3e5afbf

Please sign in to comment.