Skip to content

Commit

Permalink
Reactive Datasource: support CredentialsProvider changing values
Browse files Browse the repository at this point in the history
Instead of:

- creating pgConnectOptions after calling the CredentialsProvider on startup
- using static pool configuration

We:

- use dynamic pool configuration
- call the Credentials provider every time a new connection must be created

If the credentials have changed, we must close the current connection factory (holds resources, like a NetClient) and create a new one.
  • Loading branch information
tsegismont committed Mar 15, 2023
1 parent 56f2dab commit b12de93
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.reactive.datasource.runtime;

import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;

public class PoolCloseFutureFactory {

public static CloseFuture create(VertxInternal vertx) {
CloseFuture poolCloseFuture = new CloseFuture();
ContextInternal ctx = vertx.getContext();
if (ctx != null) {
ctx.addCloseHook(poolCloseFuture);
} else {
vertx.addCloseHook(poolCloseFuture);
}
return poolCloseFuture;
}

private PoolCloseFutureFactory() {
// Utility
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package io.quarkus.reactive.datasource.runtime;

import static io.quarkus.credentials.CredentialsProvider.PASSWORD_PROPERTY_NAME;
import static io.quarkus.credentials.CredentialsProvider.USER_PROPERTY_NAME;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.CloseFuture;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.spi.ConnectionFactory;
import io.vertx.sqlclient.spi.Driver;

public class ReactiveDatasourceConnectionProvider<CO extends SqlConnectOptions>
implements Function<Context, Future<SqlConnection>> {

private final Vertx vertx;
private final Driver driver;
private final ReactiveDatasourceCredentialsProvider credentialsProvider;
private final CO connectOptions;
private final UnaryOperator<CO> connectOptionsCopy;
private final CloseFuture poolCloseFuture;
private final AtomicReference<ConnectionFactoryPromise> connectionFactoryPromiseRef;

public ReactiveDatasourceConnectionProvider(Vertx vertx, Driver driver,
ReactiveDatasourceCredentialsProvider credentialsProvider, CO connectOptions, UnaryOperator<CO> connectOptionsCopy,
CloseFuture poolCloseFuture) {

this.vertx = vertx;
this.driver = driver;
this.credentialsProvider = credentialsProvider;
this.connectOptions = connectOptions;
this.connectOptionsCopy = connectOptionsCopy;
this.poolCloseFuture = poolCloseFuture;

ConnectionFactory connectionFactory = driver.createConnectionFactory(vertx, connectOptions);
poolCloseFuture.add(connectionFactory);
ConnectionFactoryPromise connectionFactoryPromise = new ConnectionFactoryPromise(connectOptions.getUser(),
connectOptions.getPassword());
connectionFactoryPromise.tryComplete(connectionFactory);

connectionFactoryPromiseRef = new AtomicReference<>(connectionFactoryPromise);
}

@Override
public Future<SqlConnection> apply(Context context) {
return credentialsProvider.getCredentials(context).compose(new Connector(context));
}

private static class ConnectionFactoryPromise implements Promise<ConnectionFactory> {

final String user;
final String password;
final Promise<ConnectionFactory> internal;

ConnectionFactoryPromise(String user, String password) {
this.user = user;
this.password = password;
internal = Promise.promise();
}

@Override
public boolean tryComplete(ConnectionFactory connectionFactory) {
return internal.tryComplete(connectionFactory);
}

@Override
public boolean tryFail(Throwable cause) {
return internal.tryFail(cause);
}

@Override
public Future<ConnectionFactory> future() {
return internal.future();
}
}

private class Connector implements Function<Map<String, String>, Future<SqlConnection>> {

final Context context;

Connector(Context context) {
this.context = context;
}

@Override
public Future<SqlConnection> apply(Map<String, String> credentials) {

String user = credentials.get(USER_PROPERTY_NAME);
String password = credentials.get(PASSWORD_PROPERTY_NAME);

ConnectionFactoryPromise current = connectionFactoryPromiseRef.get();
if (Objects.equals(user, current.user) && Objects.equals(password, current.password)) {
return connect(current, context);
}

ConnectionFactoryPromise candidate = new ConnectionFactoryPromise(user, password);
if (!connectionFactoryPromiseRef.compareAndSet(current, candidate)) {
return connect(connectionFactoryPromiseRef.get(), context);
}

current.future().onSuccess(new Handler<>() {
@Override
public void handle(ConnectionFactory cf) {
poolCloseFuture.remove(cf);
cf.close(Promise.promise());
}
});

SqlConnectOptions options = connectOptionsCopy.apply(connectOptions)
.setUser(user)
.setPassword(password);

ConnectionFactory connectionFactory = driver.createConnectionFactory(vertx, options);
poolCloseFuture.add(connectionFactory);
candidate.complete(connectionFactory);

return connect(candidate, context);
}
}

private static Future<SqlConnection> connect(ConnectionFactoryPromise connectionFactoryPromise, Context context) {
return connectionFactoryPromise.future().compose(new Function<>() {
@Override
public Future<SqlConnection> apply(ConnectionFactory cf) {
return cf.connect(context);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.Map;

import io.quarkus.credentials.CredentialsProvider;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;

public class ReactiveDatasourceCredentialsProvider {

private final Handler<Promise<Map<String, String>>> handler;

public ReactiveDatasourceCredentialsProvider(CredentialsProvider credentialsProvider, String credentialsProviderName) {
handler = new CredentialsHandler(credentialsProvider, credentialsProviderName);
}

public Future<Map<String, String>> getCredentials(Context context) {
return context.executeBlocking(handler, false);
}

private static class CredentialsHandler implements Handler<Promise<Map<String, String>>> {

final CredentialsProvider credentialsProvider;
final String credentialsProviderName;

public CredentialsHandler(CredentialsProvider credentialsProvider, String credentialsProviderName) {
this.credentialsProvider = credentialsProvider;
this.credentialsProviderName = credentialsProviderName;
}

@Override
public void handle(Promise<Map<String, String>> promise) {
promise.complete(credentialsProvider.getCredentials(credentialsProviderName));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.reactive.pg.client;

import java.util.HashMap;
import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.jboss.logging.Logger;

import io.quarkus.credentials.CredentialsProvider;

@ApplicationScoped
public class ChangingCredentialsProvider implements CredentialsProvider {

private static final Logger log = Logger.getLogger(ChangingCredentialsProvider.class.getName());

private volatile Map<String, String> properties;

public ChangingCredentialsProvider() {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, "hibernate_orm_test");
properties.put(PASSWORD_PROPERTY_NAME, "hibernate_orm_test");
}

public void changeProperties() {
properties = new HashMap<>();
properties.put(USER_PROPERTY_NAME, "user2");
properties.put(PASSWORD_PROPERTY_NAME, "user2");
}

@Override
public Map<String, String> getCredentials(String credentialsProviderName) {
log.info("credentials provider returning " + properties);
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.reactive.pg.client;

import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.SECONDS;

import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;

public class ChangingCredentialsTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClass(ChangingCredentialsProvider.class)
.addClass(ChangingCredentialsTestResource.class)
.addAsResource("application-changing-credentials.properties", "application.properties"));

@Test
public void testConnect() throws Exception {
given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo("hibernate_orm_test"));

SECONDS.sleep(2); // sleep longer than pool idle connection timeout

given()
.when().get("/test")
.then()
.statusCode(200)
.body(CoreMatchers.equalTo("user2"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.quarkus.reactive.pg.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;

@Path("/test")
public class ChangingCredentialsTestResource {

@Inject
PgPool client;

@Inject
ChangingCredentialsProvider credentialsProvider;

@PostConstruct
@Blocking
void addUser() {
client.query("CREATE USER user2 WITH PASSWORD 'user2' SUPERUSER").executeAndForget();
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<Response> connect() {
return client.query("SELECT CURRENT_USER").execute()
.map(pgRowSet -> {
assertEquals(1, pgRowSet.size());
return Response.ok(pgRowSet.iterator().next().getString(0)).build();
}).eventually(credentialsProvider::changeProperties);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.credentials-provider=changing
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.max-size=1
quarkus.datasource.reactive.idle-timeout=PT1S
Loading

0 comments on commit b12de93

Please sign in to comment.