Skip to content

Commit

Permalink
[hibernate#1909] Run queries for schema creation using the pool
Browse files Browse the repository at this point in the history
Before we were creating a connection and then ignoring it for each
query required to update the schema or collect metatada.

Now the method for running queries outside the "current" transaction
is in the SqlClientPool.

Note that nowdays it might not be necessary to run these queries
in a separate transaction, but it simplify the code quite a bit
and it's consistent to what we were doing before.
  • Loading branch information
DavideD committed May 29, 2024
1 parent a03614b commit cdce97b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,6 @@ interface Expectation {

CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues);

/**
* This method is intended to be used only for queries returning
* a ResultSet that must be executed outside of any "current"
* transaction (i.e with autocommit=true).
* <p/>
* For example, it would be appropriate to use this method when
* performing queries on information_schema or system tables in
* order to obtain metadata information about catalogs, schemas,
* tables, etc.
*
* @param sql - the query to execute outside of a transaction
* @param paramValues - a non-null array of parameter values
*
* @return the CompletionStage<ResultSet> from executing the query.
*/
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);

<T> CompletionStage<T> insertAndSelectIdentifier(String sql, Object[] paramValues, Class<T> idClass, String idColumnName);
CompletionStage<ResultSet> insertAndSelectIdentifierAsResultSet(String sql, Object[] paramValues, Class<?> idClass, String idColumnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
import org.hibernate.service.Service;

import java.sql.ResultSet;
import java.util.concurrent.CompletionStage;


/**
* A Hibernate {@link Service} that provides access to pooled
* {@link ReactiveConnection reactive connections}.
Expand Down Expand Up @@ -63,6 +65,8 @@ public interface ReactiveConnectionPool extends Service {
*/
CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExceptionHelper sqlExceptionHelper);

CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);

/**
* The shutdown of the pool is actually asynchronous but the
* core service registry won't return the {@link CompletionStage}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@
*/
package org.hibernate.reactive.pool.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.internal.FormatStyle;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
import org.hibernate.reactive.mutiny.Mutiny;
import org.hibernate.reactive.stage.Stage;
import org.hibernate.reactive.util.impl.CompletionStages;

import io.vertx.sqlclient.DatabaseException;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;

import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;

/**
* A pool of reactive connections backed by a Vert.x {@link Pool}.
Expand Down Expand Up @@ -82,4 +93,41 @@ public SqlExceptionHelper getSqlExceptionHelper() {
public CompletionStage<Void> getCloseFuture() {
return CompletionStages.voidFuture();
}


@Override
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
.thenApply( ResultSetAdaptor::new );
}

public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
feedback( sql );
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

/**
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
*/
private <T> T convertException(T rows, String sql, Throwable sqlException) {
if ( sqlException == null ) {
return rows;
}
if ( sqlException instanceof DatabaseException ) {
DatabaseException de = (DatabaseException) sqlException;
sqlException = sqlExceptionHelper
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
}
return rethrow( sqlException );
}

private void feedback(String sql) {
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )
? FormatStyle.BASIC
: FormatStyle.NONE;
sqlStatementLogger.logStatement( sql, formatStyle.getFormatter() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
*/
package org.hibernate.reactive.pool.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.hibernate.engine.jdbc.internal.FormatStyle;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;

import io.vertx.core.Future;
import io.vertx.sqlclient.DatabaseException;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Tuple;

import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;

/**
* A pool of reactive connections backed by a supplier of
Expand Down Expand Up @@ -99,6 +110,56 @@ private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, Sql
);
}

/**
* This method is intended to be used only for queries returning
* a ResultSet that must be executed outside any "current"
* transaction (i.e. with autocommit=true).
* <p/>
* For example, it would be appropriate to use this method when
* performing queries on information_schema or system tables in
* order to obtain metadata information about catalogs, schemas,
* tables, etc.
*
* @param sql - the query to execute outside a transaction
* @param paramValues - a non-null array of parameter values
*
* @return the CompletionStage<ResultSet> from executing the query.
*/
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
.thenApply( ResultSetAdaptor::new );
}

private CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
feedback( sql );
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

/**
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
*/
private <T> T convertException(T rows, String sql, Throwable sqlException) {
if ( sqlException == null ) {
return rows;
}
if ( sqlException instanceof DatabaseException ) {
DatabaseException de = (DatabaseException) sqlException;
sqlException = getSqlExceptionHelper()
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
}
return rethrow( sqlException );
}

private void feedback(String sql) {
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
FormatStyle formatStyle = getSqlStatementLogger().isFormat() && !sql.contains( System.lineSeparator() )
? FormatStyle.BASIC
: FormatStyle.NONE;
getSqlStatementLogger().logStatement( sql, formatStyle.getFormatter() );
}

/**
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import java.util.Calendar;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import org.hibernate.boot.model.relational.SqlStringGenerationContext;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;
import org.hibernate.reactive.pool.impl.Parameters;
import org.hibernate.resource.transaction.spi.DdlTransactionIsolator;
Expand All @@ -48,11 +46,10 @@
import org.hibernate.tool.schema.internal.exec.JdbcContext;

import static org.hibernate.reactive.util.impl.CompletionStages.logSqlException;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {

private final ReactiveConnectionPool service;
private final ReactiveConnectionPool connectionPool;

public ReactiveImprovedExtractionContextImpl(
ServiceRegistry registry,
Expand All @@ -65,54 +62,43 @@ public ReactiveImprovedExtractionContextImpl(
NoopDdlTransactionIsolator.INSTANCE,
databaseObjectAccess
);
service = registry.getService( ReactiveConnectionPool.class );
connectionPool = registry.getService( ReactiveConnectionPool.class );
}

@Override
public <T> T getQueryResults(
String queryString,
Object[] positionalParameters,
ResultSetProcessor<T> resultSetProcessor) throws SQLException {

final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();

try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters )) {
return resultSetProcessor.process( resultSet );
}
finally {
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
connectionStage
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );

}
}

private static ReactiveConnection ignoreException(ReactiveConnection reactiveConnection, Throwable throwable) {
return reactiveConnection;
}

private static CompletionStage<Void> closeConnection(ReactiveConnection connection) {
// Avoid NullPointerException if we couldn't create a connection
return connection != null ? connection.close() : voidFuture();
}

private ResultSet getQueryResultSet(
String queryString,
Object[] positionalParameters,
CompletionStage<ReactiveConnection> connectionStage) {
Object[] positionalParameters) {
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
final Parameters parametersDialectSpecific = Parameters.instance(
getJdbcEnvironment().getDialect()
);
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
return connectionPool
// It might not be necessary anymore to run the queries outside the current transaction,
// but we don't have to deal with creating and closing connections.
// Because the schema migration API, we can change this part when we will make the schema migration
// reactive
.selectJdbcOutsideTransaction( queryToUse, parametersToUse )
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
.thenApply(ResultSetWorkaround::new)
.thenApply( ResultSetWorkaround::new )
// During schema migration, errors are ignored
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.toCompletableFuture()
.join();
}

private static <T> T ignoreException(T result, Throwable throwable) {
return result;
}

private static class NoopDdlTransactionIsolator implements DdlTransactionIsolator {
static final NoopDdlTransactionIsolator INSTANCE = new NoopDdlTransactionIsolator();

Expand Down

0 comments on commit cdce97b

Please sign in to comment.