From cdce97bf54f20fd7a0661c93f1c334f5cd53cdd6 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Mon, 27 May 2024 13:59:30 +0200 Subject: [PATCH] [#1909] Run queries for schema creation using the pool Before we were creating a connection and then ignoring it for each query required to update the schema or collect metatada. Now the method for running queries outside the "current" transaction is in the SqlClientPool. Note that nowdays it might not be necessary to run these queries in a separate transaction, but it simplify the code quite a bit and it's consistent to what we were doing before. --- .../reactive/pool/ReactiveConnection.java | 17 ------ .../reactive/pool/ReactiveConnectionPool.java | 4 ++ .../pool/impl/ExternalSqlClientPool.java | 48 +++++++++++++++ .../reactive/pool/impl/SqlClientPool.java | 61 +++++++++++++++++++ ...ReactiveImprovedExtractionContextImpl.java | 50 ++++++--------- 5 files changed, 131 insertions(+), 49 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java index 5d48865771..10e304116e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java @@ -60,23 +60,6 @@ interface Expectation { CompletionStage selectJdbc(String sql, Object[] paramValues); - /** - * This method is intended to be used only for queries returning - * a ResultSet that must be executed outside of any "current" - * transaction (i.e with autocommit=true). - *

- * For example, it would be appropriate to use this method when - * performing queries on information_schema or system tables in - * order to obtain metadata information about catalogs, schemas, - * tables, etc. - * - * @param sql - the query to execute outside of a transaction - * @param paramValues - a non-null array of parameter values - * - * @return the CompletionStage from executing the query. - */ - CompletionStage selectJdbcOutsideTransaction(String sql, Object[] paramValues); - CompletionStage insertAndSelectIdentifier(String sql, Object[] paramValues, Class idClass, String idColumnName); CompletionStage insertAndSelectIdentifierAsResultSet(String sql, Object[] paramValues, Class idClass, String idColumnName); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java index a4d07968fe..ec1bdbd524 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java @@ -10,8 +10,10 @@ import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder; import org.hibernate.service.Service; +import java.sql.ResultSet; import java.util.concurrent.CompletionStage; + /** * A Hibernate {@link Service} that provides access to pooled * {@link ReactiveConnection reactive connections}. @@ -63,6 +65,8 @@ public interface ReactiveConnectionPool extends Service { */ CompletionStage getConnection(String tenantId, SqlExceptionHelper sqlExceptionHelper); + CompletionStage selectJdbcOutsideTransaction(String sql, Object[] paramValues); + /** * The shutdown of the pool is actually asynchronous but the * core service registry won't return the {@link CompletionStage}. diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java index e88f9c5a7c..602ecd7b04 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java @@ -5,15 +5,26 @@ */ package org.hibernate.reactive.pool.impl; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Objects; import java.util.concurrent.CompletionStage; +import org.hibernate.engine.jdbc.internal.FormatStyle; import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; +import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.util.impl.CompletionStages; +import io.vertx.sqlclient.DatabaseException; import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.Tuple; + +import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; /** * A pool of reactive connections backed by a Vert.x {@link Pool}. @@ -82,4 +93,41 @@ public SqlExceptionHelper getSqlExceptionHelper() { public CompletionStage getCloseFuture() { return CompletionStages.voidFuture(); } + + + @Override + public CompletionStage selectJdbcOutsideTransaction(String sql, Object[] paramValues) { + return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) ) + .thenApply( ResultSetAdaptor::new ); + } + + public CompletionStage> preparedQueryOutsideTransaction(String sql, Tuple parameters) { + feedback( sql ); + return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage() + .handle( (rows, throwable) -> convertException( rows, sql, throwable ) ); + } + + /** + * Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)} + */ + private T convertException(T rows, String sql, Throwable sqlException) { + if ( sqlException == null ) { + return rows; + } + if ( sqlException instanceof DatabaseException ) { + DatabaseException de = (DatabaseException) sqlException; + sqlException = sqlExceptionHelper + .convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql ); + } + return rethrow( sqlException ); + } + + private void feedback(String sql) { + Objects.requireNonNull( sql, "SQL query cannot be null" ); + // DDL already gets formatted by the client, so don't reformat it + FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() ) + ? FormatStyle.BASIC + : FormatStyle.NONE; + sqlStatementLogger.logStatement( sql, formatStyle.getFormatter() ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java index 4ffc8b3d0c..5e2a595172 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java @@ -5,18 +5,29 @@ */ package org.hibernate.reactive.pool.impl; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import org.hibernate.engine.jdbc.internal.FormatStyle; import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; +import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; import io.vertx.core.Future; +import io.vertx.sqlclient.DatabaseException; import io.vertx.sqlclient.Pool; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.Tuple; + +import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; /** * A pool of reactive connections backed by a supplier of @@ -99,6 +110,56 @@ private CompletionStage getConnectionFromPool(Pool pool, Sql ); } + /** + * This method is intended to be used only for queries returning + * a ResultSet that must be executed outside any "current" + * transaction (i.e. with autocommit=true). + *

+ * For example, it would be appropriate to use this method when + * performing queries on information_schema or system tables in + * order to obtain metadata information about catalogs, schemas, + * tables, etc. + * + * @param sql - the query to execute outside a transaction + * @param paramValues - a non-null array of parameter values + * + * @return the CompletionStage from executing the query. + */ + public CompletionStage selectJdbcOutsideTransaction(String sql, Object[] paramValues) { + return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) ) + .thenApply( ResultSetAdaptor::new ); + } + + private CompletionStage> preparedQueryOutsideTransaction(String sql, Tuple parameters) { + feedback( sql ); + return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage() + .handle( (rows, throwable) -> convertException( rows, sql, throwable ) ); + } + + /** + * Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)} + */ + private T convertException(T rows, String sql, Throwable sqlException) { + if ( sqlException == null ) { + return rows; + } + if ( sqlException instanceof DatabaseException ) { + DatabaseException de = (DatabaseException) sqlException; + sqlException = getSqlExceptionHelper() + .convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql ); + } + return rethrow( sqlException ); + } + + private void feedback(String sql) { + Objects.requireNonNull( sql, "SQL query cannot be null" ); + // DDL already gets formatted by the client, so don't reformat it + FormatStyle formatStyle = getSqlStatementLogger().isFormat() && !sql.contains( System.lineSeparator() ) + ? FormatStyle.BASIC + : FormatStyle.NONE; + getSqlStatementLogger().logStatement( sql, formatStyle.getFormatter() ); + } + /** * @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation. */ diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java index 7d68ae2f3f..41e37df412 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java @@ -34,12 +34,10 @@ import java.util.Calendar; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import org.hibernate.boot.model.relational.SqlStringGenerationContext; import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment; -import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; import org.hibernate.reactive.pool.impl.Parameters; import org.hibernate.resource.transaction.spi.DdlTransactionIsolator; @@ -48,11 +46,10 @@ import org.hibernate.tool.schema.internal.exec.JdbcContext; import static org.hibernate.reactive.util.impl.CompletionStages.logSqlException; -import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl { - private final ReactiveConnectionPool service; + private final ReactiveConnectionPool connectionPool; public ReactiveImprovedExtractionContextImpl( ServiceRegistry registry, @@ -65,7 +62,7 @@ public ReactiveImprovedExtractionContextImpl( NoopDdlTransactionIsolator.INSTANCE, databaseObjectAccess ); - service = registry.getService( ReactiveConnectionPool.class ); + connectionPool = registry.getService( ReactiveConnectionPool.class ); } @Override @@ -73,46 +70,35 @@ public T getQueryResults( String queryString, Object[] positionalParameters, ResultSetProcessor resultSetProcessor) throws SQLException { - - final CompletionStage connectionStage = service.getConnection(); - - try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) { + try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters )) { return resultSetProcessor.process( resultSet ); } - finally { - // This method doesn't return a reactive type, so we start closing the connection and ignore the result - connectionStage - .handle( ReactiveImprovedExtractionContextImpl::ignoreException ) - .thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection ); - - } - } - - private static ReactiveConnection ignoreException(ReactiveConnection reactiveConnection, Throwable throwable) { - return reactiveConnection; - } - - private static CompletionStage closeConnection(ReactiveConnection connection) { - // Avoid NullPointerException if we couldn't create a connection - return connection != null ? connection.close() : voidFuture(); } private ResultSet getQueryResultSet( String queryString, - Object[] positionalParameters, - CompletionStage connectionStage) { + Object[] positionalParameters) { final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0]; - final Parameters parametersDialectSpecific = Parameters.instance( - getJdbcEnvironment().getDialect() - ); + final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() ); final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length ); - return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) ) + return connectionPool + // It might not be necessary anymore to run the queries outside the current transaction, + // but we don't have to deal with creating and closing connections. + // Because the schema migration API, we can change this part when we will make the schema migration + // reactive + .selectJdbcOutsideTransaction( queryToUse, parametersToUse ) .whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) ) - .thenApply(ResultSetWorkaround::new) + .thenApply( ResultSetWorkaround::new ) + // During schema migration, errors are ignored + .handle( ReactiveImprovedExtractionContextImpl::ignoreException ) .toCompletableFuture() .join(); } + private static T ignoreException(T result, Throwable throwable) { + return result; + } + private static class NoopDdlTransactionIsolator implements DdlTransactionIsolator { static final NoopDdlTransactionIsolator INSTANCE = new NoopDdlTransactionIsolator();