Skip to content

Commit

Permalink
[hibernate#1932] Support ReactiveSelectionQuery#getResultCount for HQL
Browse files Browse the repository at this point in the history
  • Loading branch information
DavideD committed Jun 11, 2024
1 parent 46291aa commit 81f983a
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ interface SelectionQuery<R> extends AbstractQuery {
*/
Uni<R> getSingleResultOrNull();

/**
* Determine the size of the query result list that would be
* returned by calling {@link #getResultList()} with no
* {@linkplain #getFirstResult() offset} or
* {@linkplain #getMaxResults() limit} applied to the query.
*
* @return the size of the list that would be returned
*/
@Incubating
Uni<Long> getResultCount();

/**
* Asynchronously execute this query, returning the query results
* as a {@link List}, via a {@link Uni}. If the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ default CompletionStage<List<R>> getReactiveResultList() {

CompletionStage<R> getReactiveSingleResultOrNull();

CompletionStage<Long> getReactiveResultCount();

CompletionStage<R> reactiveUnique();

CompletionStage<Optional<R>> reactiveUniqueResultOptional();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.query.IllegalQueryOperationException;
import org.hibernate.query.hql.internal.QuerySplitter;
import org.hibernate.query.internal.DelegatingDomainQueryExecutionContext;
import org.hibernate.query.spi.DomainQueryExecutionContext;
import org.hibernate.query.spi.QueryInterpretationCache;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.query.sqm.internal.DomainParameterXref;
Expand All @@ -33,6 +35,7 @@
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
import org.hibernate.sql.results.internal.TupleMetadata;

import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -146,6 +149,17 @@ public CompletionStage<R> getReactiveSingleResult() {
.exceptionally( this::convertException );
}

public CompletionStage<Long> getReactiveResultsCount(SqmSelectStatement<?> sqmStatement, DomainQueryExecutionContext domainQueryExecutionContext) {
final DelegatingDomainQueryExecutionContext context = new DelegatingDomainQueryExecutionContext( domainQueryExecutionContext ) {
@Override
public QueryOptions getQueryOptions() {
return QueryOptions.NONE;
}
};
return buildConcreteSelectQueryPlan( sqmStatement.createCountQuery(), Long.class, getQueryOptions() )
.reactiveExecuteQuery( context, new ReactiveSingleResultConsumer<>() );
}

private R reactiveSingleResult(List<R> list) {
if ( list.isEmpty() ) {
throw new NoResultException( String.format( "No result found for query [%s]", getQueryString() ) );
Expand Down Expand Up @@ -269,7 +283,7 @@ private ReactiveSelectQueryPlan<R> buildAggregatedSelectQueryPlan(SqmSelectState
return new AggregatedSelectReactiveQueryPlan<>( aggregatedQueryPlans );
}

private <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
public <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
SqmSelectStatement<?> concreteSqmStatement,
Class<T> resultType,
QueryOptions queryOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.ast.SqlAstTranslator;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.spi.FromClauseAccess;
Expand All @@ -59,6 +60,7 @@
public class ConcreteSqmSelectReactiveQueryPlan<R> extends ConcreteSqmSelectQueryPlan<R>
implements ReactiveSelectQueryPlan<R> {

private final SqmInterpreter<Object, ReactiveResultsConsumer<Object, R>> executeQueryInterpreter;
private final SqmInterpreter<List<R>, Void> listInterpreter;
private final RowTransformer<R> rowTransformer;

Expand All @@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
this.rowTransformer = determineRowTransformer( sqm, resultType, tupleMetadata, queryOptions );
this.listInterpreter = (unused, executionContext, sqmInterpretation, jdbcParameterBindings) ->
listInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer );
this.executeQueryInterpreter = (resultsConsumer, executionContext, sqmInterpretation, jdbcParameterBindings) ->
executeQueryInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer, resultsConsumer );
}

private static <R> CompletionStage<List<R>> listInterpreter(
Expand Down Expand Up @@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

private static <R> CompletionStage<Object> executeQueryInterpreter(
String hql,
DomainParameterXref domainParameterXref,
DomainQueryExecutionContext executionContext,
CacheableSqmInterpretation sqmInterpretation,
JdbcParameterBindings jdbcParameterBindings,
RowTransformer<R> rowTransformer,
ReactiveResultsConsumer<Object, R> resultsConsumer) {
final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor) executionContext.getSession();
final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation.getJdbcSelect();
// I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
Supplier<SubselectFetch.RegistrationHandler> fetchHandlerSupplier = () -> SubselectFetch
.createRegistrationHandler( session.getPersistenceContext().getBatchFetchQueue(), sqmInterpretation.selectStatement, JdbcParametersList.empty(), jdbcParameterBindings );
return completedFuture( fetchHandlerSupplier )
.thenApply( Supplier::get )
.thenCompose( subSelectFetchKeyHandler -> session
.reactiveAutoFlushIfRequired( jdbcSelect.getAffectedTableNames() )
.thenCompose( required -> StandardReactiveSelectExecutor.INSTANCE
.executeQuery( jdbcSelect,
jdbcParameterBindings,
ConcreteSqmSelectQueryPlan.listInterpreterExecutionContext( hql, executionContext, jdbcSelect, subSelectFetchKeyHandler ),
rowTransformer,
null,
sql -> executionContext.getSession()
.getJdbcCoordinator()
.getStatementPreparer()
.prepareQueryStatement( sql, false, null ),
resultsConsumer
)
)
)
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

@Override
public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, DomainQueryExecutionContext executionContext) {
throw new UnsupportedOperationException();
Expand All @@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
public CompletionStage<List<R>> reactivePerformList(DomainQueryExecutionContext executionContext) {
return executionContext.getQueryOptions().getEffectiveLimit().getMaxRowsJpa() == 0
? completedFuture( emptyList() )
: withCacheableSqmInterpretation( executionContext, listInterpreter );
: withCacheableSqmInterpretation( executionContext, null, listInterpreter );
}

@Override
public <T> CompletionStage<T> reactiveExecuteQuery(
DomainQueryExecutionContext executionContext,
ReactiveResultsConsumer<T, R> resultsConsumer) {
return withCacheableSqmInterpretation(
executionContext,
resultsConsumer,
(SqmInterpreter<T, ReactiveResultsConsumer<T, R>>) (SqmInterpreter) executeQueryInterpreter
);
}

private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, SqmInterpreter<T, X> interpreter) {
private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, X context, SqmInterpreter<T, X> interpreter) {
// NOTE : VERY IMPORTANT - intentional double-lock checking
// The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
// to protect access. However, synchronized is much simpler here. We will verify
Expand Down Expand Up @@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
jdbcParameterBindings = createJdbcParameterBindings( localCopy, executionContext );
}

return interpreter.interpret( null, executionContext, localCopy, jdbcParameterBindings );
return interpreter.interpret( context, executionContext, localCopy, jdbcParameterBindings );
}

private JdbcParameterBindings createJdbcParameterBindings(CacheableSqmInterpretation sqmInterpretation, DomainQueryExecutionContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public CompletionStage<R> getReactiveSingleResult() {
return selectionQueryDelegate.getReactiveSingleResult();
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( ( (SqmSelectStatement<?>) getSqmStatement() ).createCountQuery(), this );
}

@Override
public CompletionStage<R> getReactiveSingleResultOrNull() {
return selectionQueryDelegate.getReactiveSingleResultOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public R getSingleResultOrNull() {
return selectionQueryDelegate.getSingleResultOrNull();
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( getSqmStatement().createCountQuery(), this );
}

@Override
public List<R> getResultList() {
return selectionQueryDelegate.getResultList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.SelectQueryPlan;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.results.spi.ResultsConsumer;

import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
Expand Down Expand Up @@ -44,7 +45,7 @@ default <T> T executeQuery(DomainQueryExecutionContext executionContext, Results
/**
* Execute the query
*/
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ResultsConsumer<T, R> resultsConsumer) {
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ReactiveResultsConsumer<T, R> resultsConsumer) {
return failedFuture( new UnsupportedOperationException() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public void finishUpRow(final RowProcessingState rowProcessingState) {
}
}

public void startLoading(final RowProcessingState rowProcessingState) {
for ( int i = initializers.length - 1; i >= 0; i-- ) {
initializers[i].startLoading( rowProcessingState );
}
}

public CompletionStage<Void> initializeInstance(final ReactiveRowProcessingState rowProcessingState) {
return loop( initializers, initializer -> {
if ( initializer instanceof ReactiveInitializer ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.sql.results.spi;

import java.util.concurrent.CompletionStage;

import org.hibernate.Incubating;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
import org.hibernate.sql.results.jdbc.internal.JdbcValuesSourceProcessingStateStandardImpl;
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingOptions;

@Incubating
public class ReactiveSingleResultConsumer<T> implements ReactiveResultsConsumer<T, T> {

@Override
public CompletionStage<T> consume(
ReactiveValuesResultSet jdbcValues,
SharedSessionContractImplementor session,
JdbcValuesSourceProcessingOptions processingOptions,
JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState,
ReactiveRowProcessingState rowProcessingState,
ReactiveRowReader<T> rowReader) {
rowReader.getReactiveInitializersList().startLoading( rowProcessingState );
return rowProcessingState.next()
.thenCompose( hasNext -> rowReader
.reactiveReadRow( rowProcessingState, processingOptions )
.thenApply( result -> {
rowProcessingState.finishRowProcessing( true );
rowReader.finishUp( jdbcValuesSourceProcessingState );
jdbcValuesSourceProcessingState.finishUp( false );
return result;
} )
);
}

@Override
public boolean canResultsBeCached() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
*/
package org.hibernate.reactive.stage;

import jakarta.persistence.CacheRetrieveMode;
import jakarta.persistence.CacheStoreMode;
import jakarta.persistence.EntityGraph;
import jakarta.persistence.FlushModeType;
import jakarta.persistence.LockModeType;
import jakarta.persistence.Parameter;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
import jakarta.persistence.metamodel.Attribute;
import jakarta.persistence.metamodel.Metamodel;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.hibernate.Cache;
import org.hibernate.*;
import org.hibernate.CacheMode;
import org.hibernate.Filter;
import org.hibernate.FlushMode;
import org.hibernate.Incubating;
import org.hibernate.LockMode;
import org.hibernate.bytecode.enhance.spi.interceptor.EnhancementAsProxyLazinessInterceptor;
import org.hibernate.collection.spi.AbstractPersistentCollection;
import org.hibernate.collection.spi.PersistentCollection;
Expand All @@ -38,16 +36,25 @@
import org.hibernate.reactive.util.impl.CompletionStages;
import org.hibernate.stat.Statistics;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
import jakarta.persistence.CacheRetrieveMode;
import jakarta.persistence.CacheStoreMode;
import jakarta.persistence.EntityGraph;
import jakarta.persistence.FlushModeType;
import jakarta.persistence.LockModeType;
import jakarta.persistence.Parameter;
import jakarta.persistence.criteria.CriteriaBuilder;
import jakarta.persistence.criteria.CriteriaDelete;
import jakarta.persistence.criteria.CriteriaQuery;
import jakarta.persistence.criteria.CriteriaUpdate;
import jakarta.persistence.metamodel.Attribute;
import jakarta.persistence.metamodel.Metamodel;

import static org.hibernate.engine.internal.ManagedTypeHelper.asPersistentAttributeInterceptable;
import static org.hibernate.engine.internal.ManagedTypeHelper.isPersistentAttributeInterceptable;
import static org.hibernate.internal.util.LockModeConverter.convertToLockMode;
import static org.hibernate.jpa.internal.util.CacheModeHelper.*;
import static org.hibernate.jpa.internal.util.CacheModeHelper.interpretCacheMode;
import static org.hibernate.jpa.internal.util.CacheModeHelper.interpretCacheRetrieveMode;
import static org.hibernate.jpa.internal.util.CacheModeHelper.interpretCacheStoreMode;

/**
* An API for Hibernate Reactive where non-blocking operations are
Expand Down Expand Up @@ -188,6 +195,17 @@ interface SelectionQuery<R> extends AbstractQuery {
*/
CompletionStage<R> getSingleResultOrNull();

/**
* Determine the size of the query result list that would be
* returned by calling {@link #getResultList()} with no
* {@linkplain #getFirstResult() offset} or
* {@linkplain #getMaxResults() limit} applied to the query.
*
* @return the size of the list that would be returned
*/
@Incubating
CompletionStage<Long> getResultCount();

/**
* Asynchronously execute this query, returning the query results
* as a {@link List}, via a {@link CompletionStage}. If the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public CompletionStage<Long> getResultCount() {
return delegate.getReactiveResultCount();
}

@Override
public CompletionStage<List<R>> getResultList() {
return delegate.getReactiveResultList();
Expand Down
Loading

0 comments on commit 81f983a

Please sign in to comment.