Skip to content

Commit

Permalink
[hibernate#1932] Support ReactiveSelectionQuery#getResultCount for HQL
Browse files Browse the repository at this point in the history
I need some changes in ORM before I can support it for native
queries.
  • Loading branch information
DavideD committed Aug 6, 2024
1 parent f03234e commit 8f05a8c
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ interface SelectionQuery<R> extends AbstractQuery {
*/
Uni<R> getSingleResultOrNull();

/**
* Determine the size of the query result list that would be
* returned by calling {@link #getResultList()} with no
* {@linkplain #getFirstResult() offset} or
* {@linkplain #getMaxResults() limit} applied to the query.
*
* @return the size of the list that would be returned
*/
@Incubating
Uni<Long> getResultCount();

/**
* Asynchronously execute this query, returning the query results
* as a {@link List}, via a {@link Uni}. If the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ default CompletionStage<List<R>> getReactiveResultList() {

CompletionStage<R> getReactiveSingleResultOrNull();

CompletionStage<Long> getReactiveResultCount();

CompletionStage<R> reactiveUnique();

CompletionStage<Optional<R>> reactiveUniqueResultOptional();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.query.IllegalQueryOperationException;
import org.hibernate.query.hql.internal.QuerySplitter;
import org.hibernate.query.internal.DelegatingDomainQueryExecutionContext;
import org.hibernate.query.spi.DomainQueryExecutionContext;
import org.hibernate.query.spi.QueryInterpretationCache;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.query.sqm.internal.DomainParameterXref;
Expand All @@ -33,6 +35,7 @@
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
import org.hibernate.sql.results.internal.TupleMetadata;

import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -146,6 +149,17 @@ public CompletionStage<R> getReactiveSingleResult() {
.exceptionally( this::convertException );
}

public CompletionStage<Long> getReactiveResultsCount(SqmSelectStatement<?> sqmStatement, DomainQueryExecutionContext domainQueryExecutionContext) {
final DelegatingDomainQueryExecutionContext context = new DelegatingDomainQueryExecutionContext( domainQueryExecutionContext ) {
@Override
public QueryOptions getQueryOptions() {
return QueryOptions.NONE;
}
};
return buildConcreteSelectQueryPlan( sqmStatement.createCountQuery(), Long.class, getQueryOptions() )
.reactiveExecuteQuery( context, new ReactiveSingleResultConsumer<>() );
}

private R reactiveSingleResult(List<R> list) {
if ( list.isEmpty() ) {
throw new NoResultException( String.format( "No result found for query [%s]", getQueryString() ) );
Expand Down Expand Up @@ -269,7 +283,7 @@ private ReactiveSelectQueryPlan<R> buildAggregatedSelectQueryPlan(SqmSelectState
return new AggregatedSelectReactiveQueryPlan<>( aggregatedQueryPlans );
}

private <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
public <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
SqmSelectStatement<?> concreteSqmStatement,
Class<T> resultType,
QueryOptions queryOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ private <T> T getNull() {
return null;
}

@Override
public long getResultCount() {
throw LOG.nonReactiveMethodCall( "getReactiveResultCount()" );
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
throw LOG.notYetImplemented();
}

private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSessionContractImplementor session) {
return new ReactiveAbstractSelectionQuery<>(
this::getQueryOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.ast.SqlAstTranslator;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.spi.FromClauseAccess;
Expand All @@ -59,6 +60,7 @@
public class ConcreteSqmSelectReactiveQueryPlan<R> extends ConcreteSqmSelectQueryPlan<R>
implements ReactiveSelectQueryPlan<R> {

private final SqmInterpreter<Object, ReactiveResultsConsumer<Object, R>> executeQueryInterpreter;
private final SqmInterpreter<List<R>, Void> listInterpreter;
private final RowTransformer<R> rowTransformer;

Expand All @@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
this.rowTransformer = determineRowTransformer( sqm, resultType, tupleMetadata, queryOptions );
this.listInterpreter = (unused, executionContext, sqmInterpretation, jdbcParameterBindings) ->
listInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer );
this.executeQueryInterpreter = (resultsConsumer, executionContext, sqmInterpretation, jdbcParameterBindings) ->
executeQueryInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer, resultsConsumer );
}

private static <R> CompletionStage<List<R>> listInterpreter(
Expand Down Expand Up @@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

private static <R> CompletionStage<Object> executeQueryInterpreter(
String hql,
DomainParameterXref domainParameterXref,
DomainQueryExecutionContext executionContext,
CacheableSqmInterpretation sqmInterpretation,
JdbcParameterBindings jdbcParameterBindings,
RowTransformer<R> rowTransformer,
ReactiveResultsConsumer<Object, R> resultsConsumer) {
final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor) executionContext.getSession();
final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation.getJdbcSelect();
// I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
Supplier<SubselectFetch.RegistrationHandler> fetchHandlerSupplier = () -> SubselectFetch
.createRegistrationHandler( session.getPersistenceContext().getBatchFetchQueue(), sqmInterpretation.selectStatement, JdbcParametersList.empty(), jdbcParameterBindings );
return completedFuture( fetchHandlerSupplier )
.thenApply( Supplier::get )
.thenCompose( subSelectFetchKeyHandler -> session
.reactiveAutoFlushIfRequired( jdbcSelect.getAffectedTableNames() )
.thenCompose( required -> StandardReactiveSelectExecutor.INSTANCE
.executeQuery( jdbcSelect,
jdbcParameterBindings,
ConcreteSqmSelectQueryPlan.listInterpreterExecutionContext( hql, executionContext, jdbcSelect, subSelectFetchKeyHandler ),
rowTransformer,
null,
sql -> executionContext.getSession()
.getJdbcCoordinator()
.getStatementPreparer()
.prepareQueryStatement( sql, false, null ),
resultsConsumer
)
)
)
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

@Override
public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, DomainQueryExecutionContext executionContext) {
throw new UnsupportedOperationException();
Expand All @@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
public CompletionStage<List<R>> reactivePerformList(DomainQueryExecutionContext executionContext) {
return executionContext.getQueryOptions().getEffectiveLimit().getMaxRowsJpa() == 0
? completedFuture( emptyList() )
: withCacheableSqmInterpretation( executionContext, listInterpreter );
: withCacheableSqmInterpretation( executionContext, null, listInterpreter );
}

@Override
public <T> CompletionStage<T> reactiveExecuteQuery(
DomainQueryExecutionContext executionContext,
ReactiveResultsConsumer<T, R> resultsConsumer) {
return withCacheableSqmInterpretation(
executionContext,
resultsConsumer,
(SqmInterpreter<T, ReactiveResultsConsumer<T, R>>) (SqmInterpreter) executeQueryInterpreter
);
}

private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, SqmInterpreter<T, X> interpreter) {
private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, X context, SqmInterpreter<T, X> interpreter) {
// NOTE : VERY IMPORTANT - intentional double-lock checking
// The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
// to protect access. However, synchronized is much simpler here. We will verify
Expand Down Expand Up @@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
jdbcParameterBindings = createJdbcParameterBindings( localCopy, executionContext );
}

return interpreter.interpret( null, executionContext, localCopy, jdbcParameterBindings );
return interpreter.interpret( context, executionContext, localCopy, jdbcParameterBindings );
}

private JdbcParameterBindings createJdbcParameterBindings(CacheableSqmInterpretation sqmInterpretation, DomainQueryExecutionContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ public CompletionStage<R> getReactiveSingleResult() {
return selectionQueryDelegate.getReactiveSingleResult();
}

@Override
public long getResultCount() {
throw LOG.nonReactiveMethodCall( "getReactiveResultCount()" );
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( ( (SqmSelectStatement<?>) getSqmStatement() ).createCountQuery(), this );
}

@Override
public CompletionStage<R> getReactiveSingleResultOrNull() {
return selectionQueryDelegate.getReactiveSingleResultOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public R getSingleResultOrNull() {
return selectionQueryDelegate.getSingleResultOrNull();
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( getSqmStatement().createCountQuery(), this );
}

@Override
public List<R> getResultList() {
return selectionQueryDelegate.getResultList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.SelectQueryPlan;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.results.spi.ResultsConsumer;

import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
Expand Down Expand Up @@ -44,7 +45,7 @@ default <T> T executeQuery(DomainQueryExecutionContext executionContext, Results
/**
* Execute the query
*/
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ResultsConsumer<T, R> resultsConsumer) {
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ReactiveResultsConsumer<T, R> resultsConsumer) {
return failedFuture( new UnsupportedOperationException() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public void finishUpRow(final RowProcessingState rowProcessingState) {
}
}

public void startLoading(final RowProcessingState rowProcessingState) {
for ( int i = initializers.length - 1; i >= 0; i-- ) {
initializers[i].startLoading( rowProcessingState );
}
}

public CompletionStage<Void> initializeInstance(final ReactiveRowProcessingState rowProcessingState) {
return loop( initializers, initializer -> {
if ( initializer instanceof ReactiveInitializer ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.sql.results.spi;

import java.util.concurrent.CompletionStage;

import org.hibernate.Incubating;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
import org.hibernate.sql.results.jdbc.internal.JdbcValuesSourceProcessingStateStandardImpl;
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingOptions;

@Incubating
public class ReactiveSingleResultConsumer<T> implements ReactiveResultsConsumer<T, T> {

@Override
public CompletionStage<T> consume(
ReactiveValuesResultSet jdbcValues,
SharedSessionContractImplementor session,
JdbcValuesSourceProcessingOptions processingOptions,
JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState,
ReactiveRowProcessingState rowProcessingState,
ReactiveRowReader<T> rowReader) {
rowReader.getReactiveInitializersList().startLoading( rowProcessingState );
return rowProcessingState.next()
.thenCompose( hasNext -> rowReader
.reactiveReadRow( rowProcessingState, processingOptions )
.thenApply( result -> {
rowProcessingState.finishRowProcessing( true );
rowReader.finishUp( jdbcValuesSourceProcessingState );
jdbcValuesSourceProcessingState.finishUp( false );
return result;
} )
);
}

@Override
public boolean canResultsBeCached() {
return false;
}

}
Loading

0 comments on commit 8f05a8c

Please sign in to comment.