Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
DavideD committed Mar 2, 2023
1 parent c37da1e commit a549064
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.hibernate.jdbc.Expectation;
import org.hibernate.persister.entity.mutation.EntityMutationTarget;
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
import org.hibernate.reactive.id.insert.ReactiveInsertGeneratedIdentifierDelegate;
import org.hibernate.sql.exec.spi.JdbcParameterBinder;
import org.hibernate.sql.model.MutationOperationGroup;
import org.hibernate.sql.model.MutationTarget;
Expand All @@ -25,7 +26,6 @@
import org.hibernate.sql.model.jdbc.JdbcInsertMutation;

import static org.hibernate.engine.jdbc.mutation.internal.ModelMutationHelper.identityPreparation;
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER;
import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER_TRACE_ENABLED;

Expand Down Expand Up @@ -63,15 +63,14 @@ public CompletionStage<Object> executeReactive(
TableInclusionChecker inclusionChecker,
OperationResultChecker resultChecker,
SharedSessionContractImplementor session) {
final Object id = mutationTarget.getIdentityInsertDelegate()
.performInsert( identityInsertStatementDetails, getJdbcValueBindings(), modelReference, session );

// We should change the signature of performInsert and always return a CompletionStage.
CompletionStage<Object> idStage = id instanceof CompletionStage
? (CompletionStage<Object>) id
: completedFuture( id );

return idStage.thenApply( this::logId );
return ( (ReactiveInsertGeneratedIdentifierDelegate) mutationTarget.getIdentityInsertDelegate() )
.reactivePerformInsert(
identityInsertStatementDetails,
getJdbcValueBindings(),
modelReference,
session
)
.thenApply( this::logId );
}

private Object logId(Object identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
*/
package org.hibernate.reactive.engine.jdbc.mutation.internal;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.batch.spi.BatchKey;
import org.hibernate.engine.jdbc.mutation.MutationExecutor;
import org.hibernate.engine.jdbc.mutation.TableInclusionChecker;
import org.hibernate.engine.jdbc.mutation.internal.MutationExecutorSingleBatched;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
import org.hibernate.sql.model.PreparableMutationOperation;
import org.hibernate.sql.model.ValuesAnalysis;

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveMutationExecutorSingleBatched extends MutationExecutorSingleBatched implements MutationExecutor {
public class ReactiveMutationExecutorSingleBatched extends MutationExecutorSingleBatched implements
ReactiveMutationExecutor {

public ReactiveMutationExecutorSingleBatched(
PreparableMutationOperation mutationOperation,
Expand All @@ -20,4 +27,12 @@ public ReactiveMutationExecutorSingleBatched(
SharedSessionContractImplementor session) {
super( mutationOperation, batchKey, batchSize, session );
}

@Override
public CompletionStage<Void> performReactiveBatchedOperations(
ValuesAnalysis valuesAnalysis,
TableInclusionChecker inclusionChecker) {
super.performBatchedOperations( valuesAnalysis, inclusionChecker );
return voidFuture();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.id.insert;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
import org.hibernate.engine.jdbc.spi.JdbcServices;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.id.PostInsertIdentityPersister;
import org.hibernate.id.insert.Binder;
import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;

import static java.util.function.Function.identity;

public interface ReactiveAbstractReturningDelegate extends ReactiveInsertGeneratedIdentifierDelegate {

Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() );

PostInsertIdentityPersister getPersister();

@Override
default CompletionStage<Object> reactivePerformInsert(PreparedStatementDetails insertStatementDetails, JdbcValueBindings jdbcValueBindings, Object entity, SharedSessionContractImplementor session) {
// FIXME: I should be able to generate the sql string beforehand
final Class<?> idType = getPersister().getIdentifierType().getReturnedClass();
final String identifierColumnName = getPersister().getIdentifierColumnNames()[0];
final String insertSql = insertStatementDetails.getSqlString() + " returning " + identifierColumnName;

final JdbcServices jdbcServices = session.getJdbcServices();
jdbcServices.getSqlStatementLogger().logStatement( insertSql );

Object[] params = PreparedStatementAdaptor.bind( statement -> {
PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( insertStatementDetails, statement, session.getJdbcServices() );
jdbcValueBindings.beforeStatement( details, session );
} );

ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
return reactiveConnection
.insertAndSelectIdentifier( insertSql, params, idType, identifierColumnName )
.thenApply( identity() );
}

@Override
default CompletionStage<Object> reactivePerformInsert(String insertSQL, SharedSessionContractImplementor session, Binder binder) {
throw LOG.notYetImplemented();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.id.insert;

import java.lang.invoke.MethodHandles;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
import org.hibernate.engine.jdbc.spi.JdbcCoordinator;
import org.hibernate.engine.jdbc.spi.JdbcServices;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.logging.impl.LoggerFactory;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;

import static org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor.bind;
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;

/**
* @see org.hibernate.id.insert.AbstractSelectingDelegate
*/
public interface ReactiveAbstractSelectingDelegate extends ReactiveInsertGeneratedIdentifierDelegate {
Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() );

String getSelectSQL();

void bindParameters(Object entity, PreparedStatement ps, SharedSessionContractImplementor session);

Object extractGeneratedValue(ResultSet resultSet, SharedSessionContractImplementor session);

@Override
default CompletionStage<Object> reactivePerformInsert(
PreparedStatementDetails insertStatementDetails,
JdbcValueBindings jdbcValueBindings,
Object entity,
SharedSessionContractImplementor session) {
final JdbcCoordinator jdbcCoordinator = session.getJdbcCoordinator();
final JdbcServices jdbcServices = session.getJdbcServices();

jdbcServices.getSqlStatementLogger().logStatement( insertStatementDetails.getSqlString() );

Object[] updateParams = bind( statement -> {
PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( insertStatementDetails, statement, session.getJdbcServices() );
jdbcValueBindings.beforeStatement( details, session );
} );

final String selectSQL = getSelectSQL();
ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
return reactiveConnection
.update( insertStatementDetails.getSqlString(), updateParams )
.thenCompose( updated -> {
Object[] selectParams = bind( statement -> bindParameters( entity, statement, session ) );
return reactiveConnection
.selectJdbc( selectSQL, selectParams )
.handle( (resultSet, e) -> {
if ( e != null ) {
throw LOG.unableToExecutePostInsertIdSelectionQuery( selectSQL, e );
}
return resultSet;
} );
} )
.thenCompose( resultSet -> {
try {
return completedFuture( extractGeneratedValue( resultSet, session ) );
}
catch (Throwable e) {
return failedFuture( LOG.bindParametersForPostInsertIdSelectQueryError( selectSQL, e ) );
}
} );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.id.insert;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletionStage;

import org.hibernate.dialect.Dialect;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.id.PostInsertIdentityPersister;
import org.hibernate.id.insert.BasicSelectingDelegate;
import org.hibernate.id.insert.Binder;

public class ReactiveBasicSelectingDelegate extends BasicSelectingDelegate implements ReactiveAbstractSelectingDelegate {

public ReactiveBasicSelectingDelegate(PostInsertIdentityPersister persister, Dialect dialect) {
super( persister, dialect );
}

@Override
public CompletionStage<Object> reactivePerformInsert(
String insertSQL,
SharedSessionContractImplementor session,
Binder binder) {
throw LOG.notYetImplemented();
}

@Override
public String getSelectSQL() {
return super.getSelectSQL();
}

@Override
public void bindParameters(Object entity, PreparedStatement ps, SharedSessionContractImplementor session) {
try {
super.bindParameters( entity, ps, session );
}
catch (SQLException e) {
throw new RuntimeException( e );
}
}

@Override
public Object extractGeneratedValue(ResultSet resultSet, SharedSessionContractImplementor session) {
try {
return super.extractGeneratedValue( resultSet, session );
}
catch (SQLException e) {
throw new RuntimeException( e );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,31 @@
package org.hibernate.reactive.id.insert;


import java.util.concurrent.CompletionStage;

import org.hibernate.dialect.Dialect;
import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
import org.hibernate.engine.jdbc.spi.JdbcServices;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.id.PostInsertIdentityPersister;
import org.hibernate.id.insert.Binder;
import org.hibernate.id.insert.GetGeneratedKeysDelegate;
import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor;
import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.session.ReactiveConnectionSupplier;


public class ReactiveGetGeneratedKeysDelegate extends GetGeneratedKeysDelegate {
public class ReactiveGetGeneratedKeysDelegate extends GetGeneratedKeysDelegate implements ReactiveAbstractReturningDelegate {

public ReactiveGetGeneratedKeysDelegate(PostInsertIdentityPersister persister, Dialect dialect, boolean inferredKeys) {
super( persister, dialect, inferredKeys );
}

@Override
public Object performInsert(PreparedStatementDetails insertStatementDetails, JdbcValueBindings jdbcValueBindings, Object entity, SharedSessionContractImplementor session) {
// FIXME: I should be able to generate the sql string beforehand
final Class<?> idType = getPersister().getIdentifierType().getReturnedClass();
final String identifierColumnName = getPersister().getIdentifierColumnNames()[0];
final String insertSql = adaptQuery( insertStatementDetails, identifierColumnName );

final JdbcServices jdbcServices = session.getJdbcServices();
jdbcServices.getSqlStatementLogger().logStatement( insertSql );

Object[] params = PreparedStatementAdaptor.bind( statement -> {
PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( insertStatementDetails, statement, session.getJdbcServices() );
jdbcValueBindings.beforeStatement( details, session );
} );

ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
return reactiveConnection.insertAndSelectIdentifier( insertSql, params, idType, identifierColumnName );
public PostInsertIdentityPersister getPersister() {
return super.getPersister();
}

private static String adaptQuery(PreparedStatementDetails insertStatementDetails, String identityColumnName) {
final String insertSql = insertStatementDetails.getSqlString();
return insertSql + " returning " + identityColumnName ;
@Override
public CompletionStage<Object> reactivePerformInsert(
String insertSQL,
SharedSessionContractImplementor session,
Binder binder) {
throw LOG.notYetImplemented();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.id.insert;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.mutation.JdbcValueBindings;
import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.id.insert.Binder;

/**
* @see org.hibernate.id.insert.InsertGeneratedIdentifierDelegate
*/
public interface ReactiveInsertGeneratedIdentifierDelegate {

CompletionStage<Object> reactivePerformInsert(
PreparedStatementDetails insertStatementDetails,
JdbcValueBindings valueBindings,
Object entity,
SharedSessionContractImplementor session);

CompletionStage<Object> reactivePerformInsert(String insertSQL, SharedSessionContractImplementor session, Binder binder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.id.insert;

import java.util.concurrent.CompletionStage;

import org.hibernate.dialect.Dialect;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.id.PostInsertIdentityPersister;
import org.hibernate.id.insert.Binder;
import org.hibernate.id.insert.InsertReturningDelegate;

public class ReactiveInsertReturningDelegate extends InsertReturningDelegate implements ReactiveAbstractReturningDelegate {

public ReactiveInsertReturningDelegate(PostInsertIdentityPersister persister, Dialect dialect) {
super( persister, dialect );
}

@Override
public PostInsertIdentityPersister getPersister() {
return super.getPersister();
}

@Override
public CompletionStage<Object> reactivePerformInsert(
String insertSQL,
SharedSessionContractImplementor session,
Binder binder) {
throw LOG.notYetImplemented();
}
}
Loading

0 comments on commit a549064

Please sign in to comment.