diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java index a33ff8178..24a5b5d85 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java @@ -36,6 +36,7 @@ import java.util.Properties; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import org.hibernate.boot.model.relational.SqlStringGenerationContext; import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment; @@ -53,6 +54,8 @@ public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl { private final ReactiveConnectionPool service; + private final AtomicInteger level = new AtomicInteger( 0 ); + private CompletionStage connectionStage; public ReactiveImprovedExtractionContextImpl( ServiceRegistry registry, @@ -68,23 +71,35 @@ public ReactiveImprovedExtractionContextImpl( service = registry.getService( ReactiveConnectionPool.class ); } - @Override - public T getQueryResults( - String queryString, - Object[] positionalParameters, - ResultSetProcessor resultSetProcessor) throws SQLException { + public ReactiveImprovedExtractionContextImpl push() { + int currentLevel = level.getAndIncrement(); + if ( currentLevel == 0 ) { + connectionStage = service.getConnection(); + } + return this; + } - final CompletionStage connectionStage = service.getConnection(); + public ReactiveImprovedExtractionContextImpl pop() { + int currentLevel = level.decrementAndGet(); + if ( currentLevel == 0 && connectionStage != null ) { + // This method doesn't return a reactive type, so we start closing the connection and ignore the result + connectionStage + .handle( ReactiveImprovedExtractionContextImpl::ignoreException ) + .thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection ) + .toCompletableFuture() + .join(); + } + return this; + } + @Override + public T getQueryResults( String queryString, Object[] positionalParameters, ResultSetProcessor resultSetProcessor) throws SQLException { + push(); try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) { return resultSetProcessor.process( resultSet ); } finally { - // This method doesn't return a reactive type, so we start closing the connection and ignore the result - connectionStage - .handle( ReactiveImprovedExtractionContextImpl::ignoreException ) - .thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection ); - + pop(); } } @@ -102,13 +117,12 @@ private ResultSet getQueryResultSet( Object[] positionalParameters, CompletionStage connectionStage) { final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0]; - final Parameters parametersDialectSpecific = Parameters.instance( - getJdbcEnvironment().getDialect() - ); + final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() ); final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length ); - return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) ) + return connectionStage + .thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) ) .whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) ) - .thenApply(ResultSetWorkaround::new) + .thenApply( ResultSetWorkaround::new ) .toCompletableFuture() .join(); }