Skip to content

Commit

Permalink
[hibernate#1909] Re-use existing connection during schema migration
Browse files Browse the repository at this point in the history
  • Loading branch information
DavideD committed May 27, 2024
1 parent e8a37f3 commit 9799398
Showing 1 changed file with 30 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Properties;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

import org.hibernate.boot.model.relational.SqlStringGenerationContext;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
Expand All @@ -53,6 +54,8 @@
public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {

private final ReactiveConnectionPool service;
private final AtomicInteger level = new AtomicInteger( 0 );
private CompletionStage<ReactiveConnection> connectionStage;

public ReactiveImprovedExtractionContextImpl(
ServiceRegistry registry,
Expand All @@ -68,23 +71,35 @@ public ReactiveImprovedExtractionContextImpl(
service = registry.getService( ReactiveConnectionPool.class );
}

@Override
public <T> T getQueryResults(
String queryString,
Object[] positionalParameters,
ResultSetProcessor<T> resultSetProcessor) throws SQLException {
public ReactiveImprovedExtractionContextImpl push() {
int currentLevel = level.getAndIncrement();
if ( currentLevel == 0 ) {
connectionStage = service.getConnection();
}
return this;
}

final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();
public ReactiveImprovedExtractionContextImpl pop() {
int currentLevel = level.decrementAndGet();
if ( currentLevel == 0 && connectionStage != null ) {
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
connectionStage
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection )
.toCompletableFuture()
.join();
}
return this;
}

@Override
public <T> T getQueryResults( String queryString, Object[] positionalParameters, ResultSetProcessor<T> resultSetProcessor) throws SQLException {
push();
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
return resultSetProcessor.process( resultSet );
}
finally {
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
connectionStage
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );

pop();
}
}

Expand All @@ -102,13 +117,12 @@ private ResultSet getQueryResultSet(
Object[] positionalParameters,
CompletionStage<ReactiveConnection> connectionStage) {
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
final Parameters parametersDialectSpecific = Parameters.instance(
getJdbcEnvironment().getDialect()
);
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
return connectionStage
.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
.thenApply(ResultSetWorkaround::new)
.thenApply( ResultSetWorkaround::new )
.toCompletableFuture()
.join();
}
Expand Down

0 comments on commit 9799398

Please sign in to comment.