From 4b4ce5e7499341a391e87c9bb30a9e6a27975278 Mon Sep 17 00:00:00 2001 From: Gavin King Date: Mon, 24 Aug 2020 00:16:39 +0200 Subject: [PATCH 1/3] support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockModes These LockModes force a version check or upgrade right at the end of the transaction. This required building infrastructure for reactive before/after transaction completion events. Fixes #201 --- .../reactive/engine/ReactiveActionQueue.java | 57 ++++++++++++--- ...tiveAfterTransactionCompletionProcess.java | 25 +++++++ ...iveBeforeTransactionCompletionProcess.java | 24 +++++++ ...ReactiveEntityIncrementVersionProcess.java | 59 ++++++++++++++++ .../ReactiveEntityVerifyVersionProcess.java | 58 +++++++++++++++ .../DefaultReactiveLockEventListener.java | 16 +++-- .../DefaultReactivePostLoadEventListener.java | 70 +++++++++++++++++++ .../impl/ReactiveAbstractEntityPersister.java | 36 +++++++++- .../entity/impl/ReactiveEntityPersister.java | 5 +- .../provider/impl/ReactiveIntegrator.java | 1 + .../reactive/stage/impl/StageSessionImpl.java | 9 ++- .../reactive/ReactiveSessionTest.java | 42 +++++++++++ 12 files changed, 379 insertions(+), 23 deletions(-) create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java index 7ddca1efb..df2b72e35 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java @@ -517,6 +517,20 @@ public void registerProcess(BeforeTransactionCompletionProcess process) { beforeTransactionProcesses.register( process ); } + public void registerProcess(ReactiveAfterTransactionCompletionProcess process) { + if ( afterTransactionProcesses == null ) { + afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); + } + afterTransactionProcesses.registerReactive( process ); + } + + public void registerProcess(ReactiveBeforeTransactionCompletionProcess process) { + if ( beforeTransactionProcesses == null ) { + beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); + } + beforeTransactionProcesses.registerReactive( process ); + } + /** * Perform all currently queued entity-insertion actions. * @@ -576,25 +590,27 @@ private void prepareActions(ExecutableList queue) throws HibernateException { * * @param success Was the transaction successful. */ - public void afterTransactionCompletion(boolean success) { + public CompletionStage afterTransactionCompletion(boolean success) { if ( !isTransactionCoordinatorShared ) { // Execute completion actions only in transaction owner (aka parent session). if ( afterTransactionProcesses != null ) { - afterTransactionProcesses.afterTransactionCompletion( success ); + return afterTransactionProcesses.afterTransactionCompletion( success ); } } + return CompletionStages.voidFuture(); } /** * Execute any registered {@link org.hibernate.action.spi.BeforeTransactionCompletionProcess} */ - public void beforeTransactionCompletion() { + public CompletionStage beforeTransactionCompletion() { if ( !isTransactionCoordinatorShared ) { // Execute completion actions only in transaction owner (aka parent session). if ( beforeTransactionProcesses != null ) { - beforeTransactionProcesses.beforeTransactionCompletion(); + return beforeTransactionProcesses.beforeTransactionCompletion(); } } + return CompletionStages.voidFuture(); } /** @@ -917,11 +933,12 @@ public void serialize(ObjectOutputStream oos) throws IOException { } } - private abstract static class AbstractTransactionCompletionProcessQueue { + private abstract static class AbstractTransactionCompletionProcessQueue { protected SessionImplementor session; // Concurrency handling required when transaction completion process is dynamically registered // inside event listener (HHH-7478). protected Queue processes = new ConcurrentLinkedQueue<>(); + protected Queue reactiveProcesses = new ConcurrentLinkedQueue<>(); private AbstractTransactionCompletionProcessQueue(SessionImplementor session) { this.session = session; @@ -934,8 +951,15 @@ public void register(T process) { processes.add( process ); } + public void registerReactive(U process) { + if ( process == null ) { + return; + } + reactiveProcesses.add( process ); + } + public boolean hasActions() { - return !processes.isEmpty(); + return !processes.isEmpty() && !reactiveProcesses.isEmpty(); } } @@ -943,12 +967,13 @@ public boolean hasActions() { * Encapsulates behavior needed for before transaction processing */ private static class BeforeTransactionCompletionProcessQueue - extends AbstractTransactionCompletionProcessQueue { + extends AbstractTransactionCompletionProcessQueue { private BeforeTransactionCompletionProcessQueue(SessionImplementor session) { super( session ); } - public void beforeTransactionCompletion() { + public CompletionStage beforeTransactionCompletion() { while ( !processes.isEmpty() ) { try { processes.poll().doBeforeTransactionCompletion( session ); @@ -960,6 +985,10 @@ public void beforeTransactionCompletion() { throw new AssertionFailure( "Unable to perform beforeTransactionCompletion callback", e ); } } + return CompletionStages.loop( + reactiveProcesses, + process -> process.doBeforeTransactionCompletion( session ) + ).whenComplete( (v, e) -> reactiveProcesses.clear() ); } } @@ -967,8 +996,9 @@ public void beforeTransactionCompletion() { * Encapsulates behavior needed for after transaction processing */ private static class AfterTransactionCompletionProcessQueue - extends AbstractTransactionCompletionProcessQueue { - private Set querySpacesToInvalidate = new HashSet<>(); + extends AbstractTransactionCompletionProcessQueue { + private final Set querySpacesToInvalidate = new HashSet<>(); private AfterTransactionCompletionProcessQueue(SessionImplementor session) { super( session ); @@ -978,7 +1008,7 @@ public void addSpaceToInvalidate(Serializable space) { querySpacesToInvalidate.add( space ); } - public void afterTransactionCompletion(boolean success) { + public CompletionStage afterTransactionCompletion(boolean success) { while ( !processes.isEmpty() ) { try { processes.poll().doAfterTransactionCompletion( success, session ); @@ -999,6 +1029,11 @@ public void afterTransactionCompletion(boolean success) { ); } querySpacesToInvalidate.clear(); + + return CompletionStages.loop( + reactiveProcesses, + process -> process.doAfterTransactionCompletion( success, session ) + ).whenComplete( (v, e) -> reactiveProcesses.clear() ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java new file mode 100644 index 000000000..c0f26c622 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java @@ -0,0 +1,25 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine; + +import org.hibernate.engine.spi.SharedSessionContractImplementor; + +import java.util.concurrent.CompletionStage; + +/** + * Contract representing some process that needs to occur during after transaction completion. + * + * @author Steve Ebersole + */ +public interface ReactiveAfterTransactionCompletionProcess { + /** + * Perform whatever processing is encapsulated here after completion of the transaction. + * + * @param success Did the transaction complete successfully? True means it did. + * @param session The session on which the transaction is completing. + */ + CompletionStage doAfterTransactionCompletion(boolean success, SharedSessionContractImplementor session); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java new file mode 100644 index 000000000..6989bbd0e --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java @@ -0,0 +1,24 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine; + +import org.hibernate.engine.spi.SessionImplementor; + +import java.util.concurrent.CompletionStage; + +/** + * Contract representing some process that needs to occur during before transaction completion. + * + * @author Steve Ebersole + */ +public interface ReactiveBeforeTransactionCompletionProcess { + /** + * Perform whatever processing is encapsulated here before completion of the transaction. + * + * @param session The session on which the transaction is preparing to complete. + */ + CompletionStage doBeforeTransactionCompletion(SessionImplementor session); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java new file mode 100644 index 000000000..6bcb94401 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java @@ -0,0 +1,59 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine.impl; + +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; +import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.util.impl.CompletionStages; + +import java.util.concurrent.CompletionStage; + +/** + * A BeforeTransactionCompletionProcess impl to verify and increment an entity version as party + * of before-transaction-completion processing + * + * @author Scott Marlow + * @author Gavin King + */ +public class ReactiveEntityIncrementVersionProcess implements ReactiveBeforeTransactionCompletionProcess { + private final Object object; + + /** + * Constructs an EntityIncrementVersionProcess for the given entity. + * + * @param object The entity instance + */ + public ReactiveEntityIncrementVersionProcess(Object object) { + this.object = object; + } + + /** + * Perform whatever processing is encapsulated here before completion of the transaction. + * + * @param session The session on which the transaction is preparing to complete. + */ + @Override + public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + final EntityEntry entry = session.getPersistenceContext().getEntry( object ); + // Don't increment version for an entity that is not in the PersistenceContext; + if ( entry == null ) { + return CompletionStages.voidFuture(); + } + + return ( (ReactiveEntityPersister) entry.getPersister() ) + .lockReactive( + entry.getId(), + entry.getVersion(), + object, + new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT), + session + ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java new file mode 100644 index 000000000..6ffe568ec --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java @@ -0,0 +1,58 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine.impl; + +import org.hibernate.dialect.lock.OptimisticEntityLockException; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.pretty.MessageHelper; +import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; +import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.util.impl.CompletionStages; + +import java.util.concurrent.CompletionStage; + +/** + * A BeforeTransactionCompletionProcess impl to verify an entity version as part of + * before-transaction-completion processing + * + * @author Scott Marlow + * @author Gavin King + */ +public class ReactiveEntityVerifyVersionProcess implements ReactiveBeforeTransactionCompletionProcess { + private final Object object; + + /** + * Constructs an EntityVerifyVersionProcess + * + * @param object The entity instance + */ + public ReactiveEntityVerifyVersionProcess(Object object) { + this.object = object; + } + + @Override + public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + final EntityEntry entry = session.getPersistenceContext().getEntry( object ); + // Don't check version for an entity that is not in the PersistenceContext; + if ( entry == null ) { + return CompletionStages.voidFuture(); + } + + return ( (ReactiveEntityPersister) entry.getPersister() ) + .reactiveGetCurrentVersion( entry.getId(), session ) + .thenAccept( latestVersion -> { + if ( !entry.getVersion().equals( latestVersion ) ) { + throw new OptimisticEntityLockException( + object, + "Newer version [" + latestVersion + + "] of entity [" + MessageHelper.infoString( entry.getEntityName(), entry.getId() ) + + "] found in database" + ); + } + } ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java index a9ac7797e..659d91968 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java @@ -164,13 +164,15 @@ protected CompletionStage upgradeLock(Object object, EntityEntry entry, ck = null; } - return ((ReactiveEntityPersister) persister).lockReactive( - entry.getId(), - entry.getVersion(), - object, - lockOptions, - source - ).thenAccept( v -> entry.setLockMode(requestedLockMode) ) + return ((ReactiveEntityPersister) persister) + .lockReactive( + entry.getId(), + entry.getVersion(), + object, + lockOptions, + source + ) + .thenAccept( v -> entry.setLockMode(requestedLockMode) ) .whenComplete( (r, e) -> { // the database now holds a lock + the object is flushed from the cache, // so release the soft lock diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java new file mode 100644 index 000000000..a191bf1d8 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java @@ -0,0 +1,70 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.event.impl; + +import org.hibernate.AssertionFailure; +import org.hibernate.LockMode; +import org.hibernate.classic.Lifecycle; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.event.spi.PostLoadEvent; +import org.hibernate.event.spi.PostLoadEventListener; +import org.hibernate.jpa.event.spi.CallbackRegistry; +import org.hibernate.jpa.event.spi.CallbackRegistryConsumer; +import org.hibernate.persister.entity.EntityPersister; +import org.hibernate.reactive.engine.impl.ReactiveEntityIncrementVersionProcess; +import org.hibernate.reactive.engine.impl.ReactiveEntityVerifyVersionProcess; +import org.hibernate.reactive.session.ReactiveSession; + +/** + * We do 2 things here:
    + *
  • Call {@link Lifecycle} interface if necessary
  • + *
  • Perform needed {@link EntityEntry#getLockMode()} related processing
  • + *
+ * + * @author Gavin King + * @author Steve Ebersole + */ +public class DefaultReactivePostLoadEventListener implements PostLoadEventListener, CallbackRegistryConsumer { + private CallbackRegistry callbackRegistry; + + @Override + public void injectCallbackRegistry(CallbackRegistry callbackRegistry) { + this.callbackRegistry = callbackRegistry; + } + + @Override + public void onPostLoad(PostLoadEvent event) { + final Object entity = event.getEntity(); + + callbackRegistry.postLoad( entity ); + + final SessionImplementor session = event.getSession(); + final EntityEntry entry = session.getPersistenceContextInternal().getEntry( entity ); + if ( entry == null ) { + throw new AssertionFailure( "possible non-threadsafe access to the session" ); + } + + final LockMode lockMode = entry.getLockMode(); + if ( LockMode.PESSIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { + final EntityPersister persister = entry.getPersister(); + final Object nextVersion = persister.forceVersionIncrement( + entry.getId(), + entry.getVersion(), + session + ); + entry.forceLocked( entity, nextVersion ); + } + else if ( LockMode.OPTIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { + ((ReactiveSession) session).getReactiveActionQueue() + .registerProcess( new ReactiveEntityIncrementVersionProcess( entity ) ); + } + else if ( LockMode.OPTIMISTIC.equals( lockMode ) ) { + ((ReactiveSession) session).getReactiveActionQueue() + .registerProcess( new ReactiveEntityVerifyVersionProcess( entity ) ); + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java index 813319db4..331a172b4 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java @@ -696,7 +696,7 @@ default String generateUpdateLockString(LockOptions lockOptions) { } @Override - default CompletionStage lockReactive( + default CompletionStage lockReactive( Serializable id, Object version, Object object, @@ -834,6 +834,7 @@ default CompletionStage> reactiveMultiLoad(Serializable[] ids, Sess // .thenApply( resultSet -> !resultSet.hasNext() ); // } + @Override default CompletionStage reactiveGetDatabaseSnapshot(Serializable id, SharedSessionContractImplementor session) { if ( log.isTraceEnabled() ) { @@ -852,6 +853,39 @@ default CompletionStage reactiveGetDatabaseSnapshot(Serializable id, .thenApply( (resultSet) -> processSnapshot(session, resultSet) ); } + @Override + default CompletionStage reactiveGetCurrentVersion(Serializable id, + SharedSessionContractImplementor session) { + if ( log.isTraceEnabled() ) { + log.tracev( + "Getting version: {0}", + infoString( this, id, getFactory() ) + ); + } + + Object[] params = PreparedStatementAdaptor.bind( + statement -> getIdentifierType().nullSafeSet( statement, id, 1, session ) + ); + + return getReactiveConnection( session ) + .selectJdbc( delegate().getVersionSelectString(), params ) + .thenApply( (resultSet) -> { + try { + if ( !resultSet.next() ) { + return null; + } + if ( !isVersioned() ) { + return this; + } + return getVersionType().nullSafeGet( resultSet, getVersionColumnName(), session, null ); + } + catch (SQLException sqle) { + //can never happen + throw new JDBCException("error reading version", sqle); + } + } ); + } + //would be nice of we could just reuse this code from AbstractEntityPersister default Object[] processSnapshot(SharedSessionContractImplementor session, ResultSet resultSet) { try { diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java index d07b6e814..cc8bfa0d7 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java @@ -78,7 +78,7 @@ CompletionStage updateReactive( /** * Obtain a pessimistic lock without blocking */ - CompletionStage lockReactive( + CompletionStage lockReactive( Serializable id, Object version, Object object, @@ -105,6 +105,9 @@ CompletionStage reactiveLoad(Serializable id, ReactiveUniqueEntityLoader getAppropriateLoader(LockOptions lockOptions, SharedSessionContractImplementor session); + CompletionStage reactiveGetCurrentVersion(Serializable id, + SharedSessionContractImplementor session); + /** * Get the current database state of the object, in a "hydrated" form, without * resolving identifiers diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java index e96058d1e..bda4f1ff6 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java @@ -56,6 +56,7 @@ private void attachEventContextManagingListenersIfRequired(SessionFactoryService eventListenerRegistry.getEventListenerGroup( EventType.LOCK ).appendListener( new DefaultReactiveLockEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.LOAD ).appendListener( new DefaultReactiveLoadEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.INIT_COLLECTION ).appendListener( new DefaultReactiveInitializeCollectionEventListener() ); + eventListenerRegistry.getEventListenerGroup( EventType.POST_LOAD ).appendListener( new DefaultReactivePostLoadEventListener() ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index 617c65b08..b93702370 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -13,6 +13,7 @@ import org.hibernate.graph.GraphSemantic; import org.hibernate.graph.spi.RootGraphImplementor; import org.hibernate.reactive.common.ResultSetMapping; +import org.hibernate.reactive.engine.ReactiveActionQueue; import org.hibernate.reactive.session.Criteria; import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.stage.Stage; @@ -402,9 +403,11 @@ CompletionStage begin() { } CompletionStage end() { - return rollback - ? delegate.getReactiveConnection().rollbackTransaction() - : delegate.getReactiveConnection().commitTransaction(); + ReactiveActionQueue actionQueue = delegate.getReactiveActionQueue(); + return actionQueue.beforeTransactionCompletion() + .thenApply( v -> delegate.getReactiveConnection() ) + .thenCompose( c -> rollback ? c.rollbackTransaction() : c.commitTransaction() ) + .thenCompose( v -> actionQueue.afterTransactionCompletion( !rollback ) ); } R processError(R result, Throwable e) { diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index 741d3a654..1b41b8ee8 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -226,6 +226,48 @@ public void reactiveFindThenForceLock(TestContext context) { ); } + @Test + public void reactiveFindWithOptimisticIncrementLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC_FORCE_INCREMENT ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC_FORCE_INCREMENT ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 1, actualPig.version ) ) + ); + } + + @Test + public void reactiveFindWithOptimisticVerifyLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + @Test public void reactiveQueryWithLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); From dbdfafe09e062f5cf079ef553e8a1c66e0d18b98 Mon Sep 17 00:00:00 2001 From: Gavin King Date: Mon, 24 Aug 2020 01:33:16 +0200 Subject: [PATCH 2/3] support for PESSIMISTIC_FORCE_INCREMENT LockMode in find() --- .../DefaultReactiveLoadEventListener.java | 36 +++++++++++++------ .../DefaultReactivePostLoadEventListener.java | 14 ++------ .../impl/ReactiveAbstractEntityPersister.java | 10 ++++-- .../reactive/ReactiveSessionTest.java | 21 +++++++++++ 4 files changed, 55 insertions(+), 26 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java index 2ea51b3a8..f62e0278c 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java @@ -126,27 +126,41 @@ public CompletionStage reactiveOnLoad( final LoadEvent event, final LoadEventListener.LoadType loadType) throws HibernateException { - final EntityPersister persister = getPersister( event ); - + final ReactiveEntityPersister persister = (ReactiveEntityPersister) getPersister( event ); if ( persister == null ) { throw new HibernateException( "Unable to locate persister: " + event.getEntityClassName() ); } - return checkId( event, loadType, persister ).thenCompose( + CompletionStage result = checkId( event, loadType, persister ).thenCompose( vd -> doOnLoad( persister, event, loadType ) .thenAccept( event::setResult ) .handle( (v, x) -> { - if ( x instanceof HibernateException ) { - LOG.unableToLoadCommand( (HibernateException) x ); - } - CompletionStages.returnNullorRethrow( x ); - if ( event.getResult() instanceof CompletionStage ) { - throw new AssertionFailure( "Unexpected CompletionStage" ); + throw new AssertionFailure("Unexpected CompletionStage"); } - return v; - } )); + if (x instanceof HibernateException) { + LOG.unableToLoadCommand( (HibernateException) x ); + } + return CompletionStages.returnNullorRethrow( x ); + } ) + ); + + if ( event.getLockMode() == LockMode.PESSIMISTIC_FORCE_INCREMENT + || event.getLockMode() == LockMode.FORCE ) { + return result.thenCompose( + v -> persister.lockReactive( + event.getEntityId(), + persister.getVersion( event.getResult() ), + event.getResult(), + event.getLockOptions(), + event.getSession() + ) + ); + } + else { + return result; + } } private CompletionStage checkId(LoadEvent event, LoadType loadType, EntityPersister persister) { diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java index a191bf1d8..aa120cc0e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java @@ -14,7 +14,6 @@ import org.hibernate.event.spi.PostLoadEventListener; import org.hibernate.jpa.event.spi.CallbackRegistry; import org.hibernate.jpa.event.spi.CallbackRegistryConsumer; -import org.hibernate.persister.entity.EntityPersister; import org.hibernate.reactive.engine.impl.ReactiveEntityIncrementVersionProcess; import org.hibernate.reactive.engine.impl.ReactiveEntityVerifyVersionProcess; import org.hibernate.reactive.session.ReactiveSession; @@ -48,17 +47,8 @@ public void onPostLoad(PostLoadEvent event) { throw new AssertionFailure( "possible non-threadsafe access to the session" ); } - final LockMode lockMode = entry.getLockMode(); - if ( LockMode.PESSIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { - final EntityPersister persister = entry.getPersister(); - final Object nextVersion = persister.forceVersionIncrement( - entry.getId(), - entry.getVersion(), - session - ); - entry.forceLocked( entity, nextVersion ); - } - else if ( LockMode.OPTIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { + LockMode lockMode = entry.getLockMode(); + if ( LockMode.OPTIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { ((ReactiveSession) session).getReactiveActionQueue() .registerProcess( new ReactiveEntityIncrementVersionProcess( entity ) ); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java index 331a172b4..e0d0e5aef 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java @@ -802,15 +802,19 @@ default Object nextVersionForLock(LockMode lockMode, Serializable id, Object ver } } - default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session) { + default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, + SharedSessionContractImplementor session) { return reactiveLoad( id, optionalObject, lockOptions, session, null ); } - default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, SharedSessionContractImplementor session, Boolean readOnly) { + default CompletionStage reactiveLoad(Serializable id, Object optionalObject, LockOptions lockOptions, + SharedSessionContractImplementor session, Boolean readOnly) { if ( log.isTraceEnabled() ) { log.tracev( "Fetching entity: {0}", infoString( this, id, getFactory() ) ); } - return getAppropriateLoader( lockOptions, session ).load( id, optionalObject, session, lockOptions, readOnly ); + + return getAppropriateLoader( lockOptions, session ) + .load( id, optionalObject, session, lockOptions, readOnly ); } @Override diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index 1b41b8ee8..d4cd42542 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -226,6 +226,27 @@ public void reactiveFindThenForceLock(TestContext context) { ); } + @Test + public void reactiveFindWithPessimisticIncrementLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_FORCE_INCREMENT ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.FORCE ); //grrr, lame + context.assertEquals( 1, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 1, actualPig.version ) ) + ); + } + @Test public void reactiveFindWithOptimisticIncrementLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); From 1d93ccbd222c33f3bef77c669e7274ba80e73afe Mon Sep 17 00:00:00 2001 From: Gavin King Date: Mon, 24 Aug 2020 12:26:52 +0200 Subject: [PATCH 3/3] support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockMode in lock() Also big cleanup to related code, and a couple of minor bugfixes to implementation of lock() operation. Also tests. For #329 --- .../reactive/engine/ReactiveActionQueue.java | 302 ++++++++---------- ...tiveAfterTransactionCompletionProcess.java | 5 +- ...iveBeforeTransactionCompletionProcess.java | 5 +- ...ReactiveEntityIncrementVersionProcess.java | 6 +- .../ReactiveEntityVerifyVersionProcess.java | 6 +- .../DefaultReactiveLoadEventListener.java | 20 +- .../DefaultReactiveLockEventListener.java | 81 +++-- .../impl/ReactiveAbstractEntityPersister.java | 129 +++++--- .../reactive/session/ReactiveSession.java | 7 + .../session/impl/ReactiveSessionImpl.java | 4 +- .../reactive/ReactiveSessionTest.java | 161 ++++++++++ 11 files changed, 478 insertions(+), 248 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java index df2b72e35..a414a8f1e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java @@ -21,13 +21,10 @@ import org.hibernate.proxy.HibernateProxy; import org.hibernate.proxy.LazyInitializer; import org.hibernate.reactive.engine.impl.*; -import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.util.impl.CompletionStages; import org.hibernate.type.*; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.*; import java.util.concurrent.CompletionStage; @@ -162,7 +159,7 @@ ExecutableList init(ReactiveActionQueue instance) { // NOTE: ExecutableList fields must be instantiated via ListProvider#init or #getOrInit // to ensure that they are instantiated consistently. - private SessionImplementor session; + private final ReactiveSession session; private UnresolvedEntityInsertActions unresolvedInsertions; // Object insertions, updates, and deletions have list semantics because // they must happen in the right order so as to respect referential @@ -190,7 +187,7 @@ ExecutableList init(ReactiveActionQueue instance) { * * @param session The session "owning" this queue. */ - public ReactiveActionQueue(SessionImplementor session) { + public ReactiveActionQueue(ReactiveSession session) { this.session = session; isTransactionCoordinatorShared = false; } @@ -238,45 +235,46 @@ private static String toString(ExecutableList q) { return q == null ? "ExecutableList{size=0}" : q.toString(); } - /** - * Used by the owning session to explicitly control deserialization of the action queue. - * - * @param ois The stream from which to read the action queue - * @param session The session to which the action queue belongs - * - * @return The deserialized action queue - * - * @throws IOException indicates a problem reading from the stream - * @throws ClassNotFoundException Generally means we were unable to locate user classes. - */ - public static ReactiveActionQueue deserialize(ObjectInputStream ois, SessionImplementor session) - throws IOException, ClassNotFoundException { - final boolean traceEnabled = LOG.isTraceEnabled(); - if ( traceEnabled ) { - LOG.trace( "Deserializing action-queue" ); - } - ReactiveActionQueue rtn = new ReactiveActionQueue( session ); - - rtn.unresolvedInsertions = UnresolvedEntityInsertActions.deserialize( ois, session ); - - for ( ListProvider provider : EXECUTABLE_LISTS_MAP.values() ) { - ExecutableList l = provider.get( rtn ); - boolean notNull = ois.readBoolean(); - if ( notNull ) { - if ( l == null ) { - l = provider.init( rtn ); - } - l.readExternal( ois ); - - if ( traceEnabled ) { - LOG.tracev( "Deserialized [{0}] entries", l.size() ); - } - l.afterDeserialize( session ); - } - } - - return rtn; - } +// /** +// * Used by the owning session to explicitly control deserialization of the action queue. +// * +// * @param ois The stream from which to read the action queue +// * @param session The session to which the action queue belongs +// * +// * @return The deserialized action queue +// * +// * @throws IOException indicates a problem reading from the stream +// * @throws ClassNotFoundException Generally means we were unable to locate user classes. +// */ +// public static ReactiveActionQueue deserialize(ObjectInputStream ois, +// SessionImplementor session, ReactiveSession session) +// throws IOException, ClassNotFoundException { +// final boolean traceEnabled = LOG.isTraceEnabled(); +// if ( traceEnabled ) { +// LOG.trace( "Deserializing action-queue" ); +// } +// ReactiveActionQueue rtn = new ReactiveActionQueue( session, session ); +// +// rtn.unresolvedInsertions = UnresolvedEntityInsertActions.deserialize( ois, session ); +// +// for ( ListProvider provider : EXECUTABLE_LISTS_MAP.values() ) { +// ExecutableList l = provider.get( rtn ); +// boolean notNull = ois.readBoolean(); +// if ( notNull ) { +// if ( l == null ) { +// l = provider.init( rtn ); +// } +// l.readExternal( ois ); +// +// if ( traceEnabled ) { +// LOG.tracev( "Deserialized [{0}] entries", l.size() ); +// } +// l.afterDeserialize( session ); +// } +// } +// +// return rtn; +// } public void clear() { for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) { @@ -344,15 +342,15 @@ private CompletionStage addResolvedEntityInsertAction(ReactiveEntityInsert return ret.thenCompose( v -> { if ( !insert.isVeto() ) { CompletionStage comp = insert.reactiveMakeEntityManaged(); - if ( unresolvedInsertions != null ) { - for ( AbstractEntityInsertAction resolvedAction : unresolvedInsertions.resolveDependentActions( - insert.getInstance(), - session - ) ) { - comp = comp.thenCompose( v2 -> addResolvedEntityInsertAction( (ReactiveEntityRegularInsertAction) resolvedAction ) ); - } + if ( unresolvedInsertions == null ) { + return comp; + } + else { + return comp.thenCompose( vv -> CompletionStages.loop( + unresolvedInsertions.resolveDependentActions( insert.getInstance(), session.getSharedContract() ), + resolvedAction -> addResolvedEntityInsertAction( (ReactiveEntityRegularInsertAction) resolvedAction ) + ) ); } - return comp; } else { throw new EntityActionVetoException( @@ -461,18 +459,14 @@ public void addAction(BulkOperationCleanupAction action) { private void registerCleanupActions(Executable executable) { if ( executable.getBeforeTransactionCompletionProcess() != null ) { - if ( beforeTransactionProcesses == null ) { - beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); - } + beforeTransactionProcesses(); beforeTransactionProcesses.register( executable.getBeforeTransactionCompletionProcess() ); } if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) { invalidateSpaces( convertTimestampSpaces( executable.getPropertySpaces() ) ); } if ( executable.getAfterTransactionCompletionProcess() != null ) { - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } + afterTransactionProcesses(); afterTransactionProcesses.register( executable.getAfterTransactionCompletionProcess() ); } } @@ -504,31 +498,33 @@ public void checkNoUnresolvedActionsAfterOperation() throws PropertyValueExcepti } public void registerProcess(AfterTransactionCompletionProcess process) { - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } - afterTransactionProcesses.register( process ); + afterTransactionProcesses().register( process ); } public void registerProcess(BeforeTransactionCompletionProcess process) { - if ( beforeTransactionProcesses == null ) { - beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); - } - beforeTransactionProcesses.register( process ); + beforeTransactionProcesses().register( process ); } public void registerProcess(ReactiveAfterTransactionCompletionProcess process) { - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } - afterTransactionProcesses.registerReactive( process ); + afterTransactionProcesses().registerReactive( process ); } public void registerProcess(ReactiveBeforeTransactionCompletionProcess process) { - if ( beforeTransactionProcesses == null ) { + beforeTransactionProcesses().registerReactive( process ); + } + + private BeforeTransactionCompletionProcessQueue beforeTransactionProcesses() { + if (beforeTransactionProcesses == null) { beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); } - beforeTransactionProcesses.registerReactive( process ); + return beforeTransactionProcesses; + } + + private AfterTransactionCompletionProcessQueue afterTransactionProcesses() { + if (afterTransactionProcesses == null) { + afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); + } + return afterTransactionProcesses; } /** @@ -662,16 +658,10 @@ private CompletionStage executeActions( for ( E e : list ) { ret = ret.thenCompose( v -> e.reactiveExecute().whenComplete( (v2, x) -> { if ( e.getBeforeTransactionCompletionProcess() != null ) { - if ( beforeTransactionProcesses == null ) { - beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); - } - beforeTransactionProcesses.register( e.getBeforeTransactionCompletionProcess() ); + beforeTransactionProcesses().register( e.getBeforeTransactionCompletionProcess() ); } if ( e.getAfterTransactionCompletionProcess() != null ) { - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } - afterTransactionProcesses.register( e.getAfterTransactionCompletionProcess() ); + afterTransactionProcesses().register( e.getAfterTransactionCompletionProcess() ); } } ) ); } @@ -686,7 +676,7 @@ private CompletionStage executeActions( } ).thenRun( () -> { list.clear(); // session.getJdbcCoordinator().executeBatch(); - } ).thenCompose( v -> ( (ReactiveConnectionSupplier) session ).getReactiveConnection().executeBatch() ); + } ).thenCompose( v -> session.getReactiveConnection().executeBatch() ); } /** @@ -705,13 +695,10 @@ public CompletionStage execute(E executable private void invalidateSpaces(Serializable[] spaces) { if ( spaces != null && spaces.length > 0 ) { for ( Serializable s : spaces ) { - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } - afterTransactionProcesses.addSpaceToInvalidate( s ); + afterTransactionProcesses().addSpaceToInvalidate( s ); } // Performance win: If we are processing an ExecutableList, this will only be called once - session.getFactory().getCache().getTimestampsCache().preInvalidate( spaces, session ); + session.getFactory().getCache().getTimestampsCache().preInvalidate( spaces, session.getSharedContract() ); } } @@ -775,31 +762,25 @@ public int numberOfInsertions() { return insertions.size(); } - public TransactionCompletionProcesses getTransactionCompletionProcesses() { - if ( beforeTransactionProcesses == null ) { - beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); - } - if ( afterTransactionProcesses == null ) { - afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); - } - return new TransactionCompletionProcesses( beforeTransactionProcesses, afterTransactionProcesses ); - } - - /** - * Bind transaction completion processes to make them shared between primary and secondary session. - * Transaction completion processes are always executed by transaction owner (primary session), - * but can be registered using secondary session too. - * - * @param processes Transaction completion processes. - * @param isTransactionCoordinatorShared Flag indicating shared transaction context. - */ - public void setTransactionCompletionProcesses( - TransactionCompletionProcesses processes, - boolean isTransactionCoordinatorShared) { - this.isTransactionCoordinatorShared = isTransactionCoordinatorShared; - this.beforeTransactionProcesses = processes.beforeTransactionCompletionProcesses; - this.afterTransactionProcesses = processes.afterTransactionCompletionProcesses; - } +// public TransactionCompletionProcesses getTransactionCompletionProcesses() { +// return new TransactionCompletionProcesses( beforeTransactionProcesses(), afterTransactionProcesses() ); +// } +// +// /** +// * Bind transaction completion processes to make them shared between primary and secondary session. +// * Transaction completion processes are always executed by transaction owner (primary session), +// * but can be registered using secondary session too. +// * +// * @param processes Transaction completion processes. +// * @param isTransactionCoordinatorShared Flag indicating shared transaction context. +// */ +// public void setTransactionCompletionProcesses( +// TransactionCompletionProcesses processes, +// boolean isTransactionCoordinatorShared) { +// this.isTransactionCoordinatorShared = isTransactionCoordinatorShared; +// this.beforeTransactionProcesses = processes.beforeTransactionCompletionProcesses; +// this.afterTransactionProcesses = processes.afterTransactionCompletionProcesses; +// } public void sortCollectionActions() { if ( isOrderUpdatesEnabled() ) { @@ -883,7 +864,7 @@ public void unScheduleDeletion(EntityEntry entry, Object rescuedEntity) { if ( rescuedEntity instanceof HibernateProxy ) { LazyInitializer initializer = ( (HibernateProxy) rescuedEntity ).getHibernateLazyInitializer(); if ( !initializer.isUninitialized() ) { - rescuedEntity = initializer.getImplementation( session ); + rescuedEntity = initializer.getImplementation( session.getSharedContract() ); } } if ( deletions != null ) { @@ -907,40 +888,41 @@ public void unScheduleDeletion(EntityEntry entry, Object rescuedEntity) { throw new AssertionFailure( "Unable to perform un-delete for instance " + entry.getEntityName() ); } - /** - * Used by the owning session to explicitly control serialization of the action queue - * - * @param oos The stream to which the action queue should get written - * - * @throws IOException Indicates an error writing to the stream - */ - public void serialize(ObjectOutputStream oos) throws IOException { - LOG.trace( "Serializing action-queue" ); - if ( unresolvedInsertions == null ) { - unresolvedInsertions = new UnresolvedEntityInsertActions(); - } - unresolvedInsertions.serialize( oos ); - - for ( ListProvider p : EXECUTABLE_LISTS_MAP.values() ) { - ExecutableList l = p.get( this ); - if ( l == null ) { - oos.writeBoolean( false ); - } - else { - oos.writeBoolean( true ); - l.writeExternal( oos ); - } - } - } +// /** +// * Used by the owning session to explicitly control serialization of the action queue +// * +// * @param oos The stream to which the action queue should get written +// * +// * @throws IOException Indicates an error writing to the stream +// */ +// public void serialize(ObjectOutputStream oos) throws IOException { +// LOG.trace( "Serializing action-queue" ); +// if ( unresolvedInsertions == null ) { +// unresolvedInsertions = new UnresolvedEntityInsertActions(); +// } +// unresolvedInsertions.serialize( oos ); +// +// for ( ListProvider p : EXECUTABLE_LISTS_MAP.values() ) { +// ExecutableList l = p.get( this ); +// if ( l == null ) { +// oos.writeBoolean( false ); +// } +// else { +// oos.writeBoolean( true ); +// l.writeExternal( oos ); +// } +// } +// } private abstract static class AbstractTransactionCompletionProcessQueue { - protected SessionImplementor session; + final ReactiveSession session; + // Concurrency handling required when transaction completion process is dynamically registered // inside event listener (HHH-7478). protected Queue processes = new ConcurrentLinkedQueue<>(); protected Queue reactiveProcesses = new ConcurrentLinkedQueue<>(); - private AbstractTransactionCompletionProcessQueue(SessionImplementor session) { + private AbstractTransactionCompletionProcessQueue(ReactiveSession session) { this.session = session; } @@ -959,7 +941,7 @@ public void registerReactive(U process) { } public boolean hasActions() { - return !processes.isEmpty() && !reactiveProcesses.isEmpty(); + return !processes.isEmpty() || !reactiveProcesses.isEmpty(); } } @@ -969,14 +951,14 @@ public boolean hasActions() { private static class BeforeTransactionCompletionProcessQueue extends AbstractTransactionCompletionProcessQueue { - private BeforeTransactionCompletionProcessQueue(SessionImplementor session) { + private BeforeTransactionCompletionProcessQueue(ReactiveSession session) { super( session ); } public CompletionStage beforeTransactionCompletion() { while ( !processes.isEmpty() ) { try { - processes.poll().doBeforeTransactionCompletion( session ); + processes.poll().doBeforeTransactionCompletion( session.getSharedContract() ); } catch (HibernateException he) { throw he; @@ -1000,7 +982,7 @@ private static class AfterTransactionCompletionProcessQueue ReactiveAfterTransactionCompletionProcess> { private final Set querySpacesToInvalidate = new HashSet<>(); - private AfterTransactionCompletionProcessQueue(SessionImplementor session) { + private AfterTransactionCompletionProcessQueue(ReactiveSession session) { super( session ); } @@ -1011,7 +993,7 @@ public void addSpaceToInvalidate(Serializable space) { public CompletionStage afterTransactionCompletion(boolean success) { while ( !processes.isEmpty() ) { try { - processes.poll().doAfterTransactionCompletion( success, session ); + processes.poll().doAfterTransactionCompletion( success, session.getSharedContract() ); } catch (CacheException ce) { LOG.unableToReleaseCacheLock( ce ); @@ -1025,7 +1007,7 @@ public CompletionStage afterTransactionCompletion(boolean success) { if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) { session.getFactory().getCache().getTimestampsCache().invalidate( querySpacesToInvalidate.toArray(new Serializable[0]), - session + session.getSharedContract() ); } querySpacesToInvalidate.clear(); @@ -1037,20 +1019,20 @@ public CompletionStage afterTransactionCompletion(boolean success) { } } - /** - * Wrapper class allowing to bind the same transaction completion process queues in different sessions. - */ - public static class TransactionCompletionProcesses { - private final BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcesses; - private final AfterTransactionCompletionProcessQueue afterTransactionCompletionProcesses; - - private TransactionCompletionProcesses( - BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcessQueue, - AfterTransactionCompletionProcessQueue afterTransactionCompletionProcessQueue) { - this.beforeTransactionCompletionProcesses = beforeTransactionCompletionProcessQueue; - this.afterTransactionCompletionProcesses = afterTransactionCompletionProcessQueue; - } - } +// /** +// * Wrapper class allowing to bind the same transaction completion process queues in different sessions. +// */ +// public static class TransactionCompletionProcesses { +// private final BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcesses; +// private final AfterTransactionCompletionProcessQueue afterTransactionCompletionProcesses; +// +// private TransactionCompletionProcesses( +// BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcessQueue, +// AfterTransactionCompletionProcessQueue afterTransactionCompletionProcessQueue) { +// this.beforeTransactionCompletionProcesses = beforeTransactionCompletionProcessQueue; +// this.afterTransactionCompletionProcesses = afterTransactionCompletionProcessQueue; +// } +// } /** * Order the {@link #insertions} queue such that we group inserts against the same entity together (without diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java index c0f26c622..95666aa39 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java @@ -5,13 +5,14 @@ */ package org.hibernate.reactive.engine; -import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.reactive.session.ReactiveSession; import java.util.concurrent.CompletionStage; /** * Contract representing some process that needs to occur during after transaction completion. * + * @author Gavin King * @author Steve Ebersole */ public interface ReactiveAfterTransactionCompletionProcess { @@ -21,5 +22,5 @@ public interface ReactiveAfterTransactionCompletionProcess { * @param success Did the transaction complete successfully? True means it did. * @param session The session on which the transaction is completing. */ - CompletionStage doAfterTransactionCompletion(boolean success, SharedSessionContractImplementor session); + CompletionStage doAfterTransactionCompletion(boolean success, ReactiveSession session); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java index 6989bbd0e..99061a837 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java @@ -5,13 +5,14 @@ */ package org.hibernate.reactive.engine; -import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.reactive.session.ReactiveSession; import java.util.concurrent.CompletionStage; /** * Contract representing some process that needs to occur during before transaction completion. * + * @author Gavin King * @author Steve Ebersole */ public interface ReactiveBeforeTransactionCompletionProcess { @@ -20,5 +21,5 @@ public interface ReactiveBeforeTransactionCompletionProcess { * * @param session The session on which the transaction is preparing to complete. */ - CompletionStage doBeforeTransactionCompletion(SessionImplementor session); + CompletionStage doBeforeTransactionCompletion(ReactiveSession session); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java index 6bcb94401..e2abe0c41 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java @@ -8,9 +8,9 @@ import org.hibernate.LockMode; import org.hibernate.LockOptions; import org.hibernate.engine.spi.EntityEntry; -import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.util.impl.CompletionStages; import java.util.concurrent.CompletionStage; @@ -40,7 +40,7 @@ public ReactiveEntityIncrementVersionProcess(Object object) { * @param session The session on which the transaction is preparing to complete. */ @Override - public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + public CompletionStage doBeforeTransactionCompletion(ReactiveSession session) { final EntityEntry entry = session.getPersistenceContext().getEntry( object ); // Don't increment version for an entity that is not in the PersistenceContext; if ( entry == null ) { @@ -53,7 +53,7 @@ public CompletionStage doBeforeTransactionCompletion(SessionImplementor se entry.getVersion(), object, new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT), - session + session.getSharedContract() ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java index 6ffe568ec..4d63aa1a1 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java @@ -7,10 +7,10 @@ import org.hibernate.dialect.lock.OptimisticEntityLockException; import org.hibernate.engine.spi.EntityEntry; -import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.pretty.MessageHelper; import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.util.impl.CompletionStages; import java.util.concurrent.CompletionStage; @@ -35,7 +35,7 @@ public ReactiveEntityVerifyVersionProcess(Object object) { } @Override - public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + public CompletionStage doBeforeTransactionCompletion(ReactiveSession session) { final EntityEntry entry = session.getPersistenceContext().getEntry( object ); // Don't check version for an entity that is not in the PersistenceContext; if ( entry == null ) { @@ -43,7 +43,7 @@ public CompletionStage doBeforeTransactionCompletion(SessionImplementor se } return ( (ReactiveEntityPersister) entry.getPersister() ) - .reactiveGetCurrentVersion( entry.getId(), session ) + .reactiveGetCurrentVersion( entry.getId(), session.getSharedContract() ) .thenAccept( latestVersion -> { if ( !entry.getVersion().equals( latestVersion ) ) { throw new OptimisticEntityLockException( diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java index f62e0278c..38eaab649 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLoadEventListener.java @@ -146,8 +146,11 @@ public CompletionStage reactiveOnLoad( } ) ); + // if a pessimistic version increment was requested, we need + // to go back to the database immediately and update the row if ( event.getLockMode() == LockMode.PESSIMISTIC_FORCE_INCREMENT || event.getLockMode() == LockMode.FORCE ) { + // TODO: should we call CachedDomainDataAccess.lockItem() ? return result.thenCompose( v -> persister.lockReactive( event.getEntityId(), @@ -519,21 +522,22 @@ private CompletionStage lockAndLoad( final EntityKey keyToLoad, final LoadEventListener.LoadType options, final SessionImplementor source) { - final SoftLock lock; - final Object ck; - final EntityDataAccess cache = persister.getCacheAccessStrategy(); + final boolean canWriteToCache = persister.canWriteToCache(); + final SoftLock lock; + final Object cacheKey; if ( canWriteToCache ) { - ck = cache.generateCacheKey( + EntityDataAccess cache = persister.getCacheAccessStrategy(); + cacheKey = cache.generateCacheKey( event.getEntityId(), persister, source.getFactory(), source.getTenantIdentifier() ); - lock = cache.lockItem( source, ck, null ); + lock = cache.lockItem( source, cacheKey, null ); } else { - ck = null; + cacheKey = null; lock = null; } @@ -541,7 +545,7 @@ private CompletionStage lockAndLoad( return load( event, persister, keyToLoad, options ) .whenComplete( (v, x) -> { if ( canWriteToCache ) { - cache.unlockItem( source, ck, lock ); + persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock ); } } ) .thenApply( entity -> source.getPersistenceContextInternal().proxyFor( persister, keyToLoad, entity ) ); @@ -549,7 +553,7 @@ private CompletionStage lockAndLoad( catch (HibernateException he) { //in case load() throws an exception if ( canWriteToCache ) { - cache.unlockItem( source, ck, lock ); + persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock ); } throw he; } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java index 659d91968..e89c0bc5b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java @@ -28,8 +28,11 @@ import org.hibernate.reactive.engine.impl.Cascade; import org.hibernate.reactive.engine.impl.CascadingActions; import org.hibernate.reactive.engine.impl.ForeignKeys; +import org.hibernate.reactive.engine.impl.ReactiveEntityIncrementVersionProcess; +import org.hibernate.reactive.engine.impl.ReactiveEntityVerifyVersionProcess; import org.hibernate.reactive.event.ReactiveLockEventListener; import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.util.impl.CompletionStages; import org.jboss.logging.Logger; @@ -136,34 +139,59 @@ protected CompletionStage upgradeLock(Object object, EntityEntry entry, ); } - final EntityPersister persister = entry.getPersister(); - if ( log.isTraceEnabled() ) { log.tracev( "Locking {0} in mode: {1}", - MessageHelper.infoString( persister, entry.getId(), source.getFactory() ), + MessageHelper.infoString( entry.getPersister(), entry.getId(), source.getFactory() ), requestedLockMode ); } - final boolean cachingEnabled = persister.canWriteToCache(); - final SoftLock lock; - final Object ck; - if ( cachingEnabled ) { - EntityDataAccess cache = persister.getCacheAccessStrategy(); - ck = cache.generateCacheKey( - entry.getId(), - persister, - source.getFactory(), - source.getTenantIdentifier() - ); - lock = cache.lockItem( source, ck, entry.getVersion() ); - } - else { - lock = null; - ck = null; + switch (requestedLockMode) { + case OPTIMISTIC: + ( (ReactiveSession) source ).getReactiveActionQueue() + .registerProcess( new ReactiveEntityVerifyVersionProcess(object) ); + entry.setLockMode( requestedLockMode ); + return CompletionStages.voidFuture(); + case OPTIMISTIC_FORCE_INCREMENT: + ( (ReactiveSession) source ).getReactiveActionQueue() + .registerProcess( new ReactiveEntityIncrementVersionProcess(object) ); + entry.setLockMode( requestedLockMode ); + return CompletionStages.voidFuture(); + default: + return doUpgradeLock( object, entry, lockOptions, source ); } + } + else { + return CompletionStages.voidFuture(); + } + } + + private CompletionStage doUpgradeLock(Object object, EntityEntry entry, + LockOptions lockOptions, + EventSource source) { + + final EntityPersister persister = entry.getPersister(); + final boolean canWriteToCache = persister.canWriteToCache(); + final SoftLock lock; + final Object cacheKey; + if ( canWriteToCache ) { + EntityDataAccess cache = persister.getCacheAccessStrategy(); + cacheKey = cache.generateCacheKey( + entry.getId(), + persister, + source.getFactory(), + source.getTenantIdentifier() + ); + lock = cache.lockItem( source, cacheKey, entry.getVersion() ); + } + else { + cacheKey = null; + lock = null; + } + + try { return ((ReactiveEntityPersister) persister) .lockReactive( entry.getId(), @@ -172,18 +200,21 @@ protected CompletionStage upgradeLock(Object object, EntityEntry entry, lockOptions, source ) - .thenAccept( v -> entry.setLockMode(requestedLockMode) ) + .thenAccept( v -> entry.setLockMode( lockOptions.getLockMode() ) ) .whenComplete( (r, e) -> { // the database now holds a lock + the object is flushed from the cache, // so release the soft lock - if ( cachingEnabled ) { - persister.getCacheAccessStrategy().unlockItem( source, ck, lock ); + if ( canWriteToCache ) { + persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock ); } } ); - } - else { - return CompletionStages.voidFuture(); + catch (HibernateException he) { + //in case lockReactive() throws an exception + if ( canWriteToCache ) { + persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock ); + } + throw he; } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java index e0d0e5aef..602a9cf55 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java @@ -640,12 +640,26 @@ default CompletionStage updateOrInsertReactive( if ( delegate().isNullableTable( j ) && delegate().isAllNull( oldFields, j ) && oldFields != null ) { // don't bother trying to update, we know there is no row there yet if ( !delegate().isAllNull( fields, j ) ) { - return insertReactive( id, fields, delegate().getPropertyInsertability(), j, delegate().getSQLInsertStrings()[j], session ); + return insertReactive( + id, + fields, + delegate().getPropertyInsertability(), + j, + delegate().getSQLInsertStrings()[j], + session + ); } } else if ( delegate().isNullableTable( j ) && delegate().isAllNull( fields, j ) ) { // All fields are null, we can just delete the row - return deleteReactive( id, oldVersion, j, delegate().getSQLDeleteStrings()[j], session, null ); + return deleteReactive( + id, + oldVersion, + j, + delegate().getSQLDeleteStrings()[j], + session, + null + ); } else { return updateReactive( id, fields, oldFields, rowId, includeProperty, j, oldVersion, sql, session ) @@ -653,7 +667,14 @@ else if ( delegate().isNullableTable( j ) && delegate().isAllNull( fields, j ) ) if ( !updated && !delegate().isAllNull( fields, j ) ) { // Nothing has been updated because the row isn't in the db // Run an insert instead - return insertReactive( id, fields, delegate().getPropertyInsertability(), j, delegate().getSQLInsertStrings()[j], session ); + return insertReactive( + id, + fields, + delegate().getPropertyInsertability(), + j, + delegate().getSQLInsertStrings()[j], + session + ); } return null; } ); @@ -701,7 +722,8 @@ default CompletionStage lockReactive( Object version, Object object, LockOptions lockOptions, - SharedSessionContractImplementor session) throws HibernateException { + SharedSessionContractImplementor session) + throws HibernateException { LockMode lockMode = lockOptions.getLockMode(); @@ -710,29 +732,47 @@ default CompletionStage lockReactive( String sql; boolean writeLock; switch (lockMode) { - case READ: + // 0) noop + case NONE: + return CompletionStages.voidFuture(); + // 1) select ... for share case PESSIMISTIC_READ: + // 2) select ... for update case PESSIMISTIC_WRITE: + case UPGRADE: + // 3) select ... for nowait case UPGRADE_NOWAIT: + // 4) select ... for update skip locked case UPGRADE_SKIPLOCKED: - case UPGRADE: + // TODO: introduce separate support for PESSIMISTIC_READ + // the current implementation puts the version number in + // the where clause and the id in the select list, whereas + // it would be better to actually select and check the + // version number (same problem in hibernate-core) sql = generateSelectLockString( lockOptions ); writeLock = false; break; + // 5) update ... set version case PESSIMISTIC_FORCE_INCREMENT: case FORCE: - case WRITE: sql = generateUpdateLockString( lockOptions ); writeLock = true; break; - case NONE: - return CompletionStages.voidFuture(); + // 6) OPTIMISTIC locks are converted to pessimistic + // locks obtained in the before completion phase + case OPTIMISTIC: + case OPTIMISTIC_FORCE_INCREMENT: + throw new AssertionFailure("optimistic lock mode is not supported here"); + // 7) READ and WRITE are obtained implicitly by + // other operations + case READ: + case WRITE: + throw new AssertionFailure("implicit lock mode is not supported here"); default: - throw new IllegalArgumentException("lock mode not supported"); + throw new AssertionFailure("illegal lock mode"); } - PreparedStatementAdaptor statement = new PreparedStatementAdaptor(); - try { + Object[] arguments = PreparedStatementAdaptor.bind( statement -> { int offset = 1; if ( writeLock ) { getVersionType().nullSafeSet( statement, nextVersion, offset, session ); @@ -743,23 +783,15 @@ default CompletionStage lockReactive( if ( isVersioned() ) { getVersionType().nullSafeSet( statement, version, offset, session ); } - } - catch ( SQLException e) { - throw new HibernateException( e ); - } - Object[] parameters = statement.getParametersAsArray(); + } ); ReactiveConnection connection = getReactiveConnection( session ); - CompletionStage lock; - if (writeLock) { - lock = connection.update(sql, parameters).thenApply(affected -> affected > 0); - } - else { - lock = connection.select(sql, parameters).thenApply(Iterator::hasNext); - } + CompletionStage lock = writeLock + ? connection.update( sql, arguments ).thenApply( affected -> affected > 0 ) + : connection.select( sql, arguments ).thenApply( Iterator::hasNext ); - return lock.thenAccept( found -> { - if (!found) { + return lock.thenAccept( rowExisted -> { + if ( !rowExisted ) { throw new StaleObjectStateException( getEntityName(), id ); } } ).handle( (r ,e) -> { @@ -818,7 +850,9 @@ default CompletionStage reactiveLoad(Serializable id, Object optionalObj } @Override - default CompletionStage> reactiveMultiLoad(Serializable[] ids, SessionImplementor session, MultiLoadOptions loadOptions) { + default CompletionStage> reactiveMultiLoad(Serializable[] ids, + SessionImplementor session, + MultiLoadOptions loadOptions) { return ReactiveDynamicBatchingEntityLoaderBuilder.INSTANCE.multiLoad(this, ids, session, loadOptions); } @@ -953,22 +987,27 @@ default CompletionStage reactiveInitializeLazyPropertiesFromDatastore( throw new AssertionFailure( "no lazy properties" ); } - final PersistentAttributeInterceptor interceptor = ( (PersistentAttributeInterceptable) entity ).$$_hibernate_getInterceptor(); - assert interceptor != null : "Expecting bytecode interceptor to be non-null"; + final PersistentAttributeInterceptor interceptor = + ( (PersistentAttributeInterceptable) entity ).$$_hibernate_getInterceptor(); + if ( interceptor == null ) { + throw new AssertionFailure( "Expecting bytecode interceptor to be non-null" ); + } log.tracef( "Initializing lazy properties from datastore (triggered for `%s`)", fieldName ); - final String fetchGroup = getEntityMetamodel().getBytecodeEnhancementMetadata() - .getLazyAttributesMetadata() - .getFetchGroupName( fieldName ); - final List fetchGroupAttributeDescriptors = getEntityMetamodel().getBytecodeEnhancementMetadata() - .getLazyAttributesMetadata() - .getFetchGroupAttributeDescriptors( fetchGroup ); + String fetchGroup = + getEntityMetamodel().getBytecodeEnhancementMetadata() + .getLazyAttributesMetadata() + .getFetchGroupName( fieldName ); + List fetchGroupAttributeDescriptors = + getEntityMetamodel().getBytecodeEnhancementMetadata() + .getLazyAttributesMetadata() + .getFetchGroupAttributeDescriptors( fetchGroup ); @SuppressWarnings("deprecation") - final Set initializedLazyAttributeNames = interceptor.getInitializedLazyAttributeNames(); + Set initializedLazyAttributeNames = interceptor.getInitializedLazyAttributeNames(); - Object[] params = PreparedStatementAdaptor.bind( + Object[] arguments = PreparedStatementAdaptor.bind( statement -> getIdentifierType().nullSafeSet( statement, id, 1, session ) ); @@ -989,7 +1028,7 @@ default CompletionStage reactiveInitializeLazyPropertiesFromDatastore( } return getReactiveConnection( session ) - .selectJdbc( lazySelect, params ) + .selectJdbc( lazySelect, arguments ) .thenApply( resultSet -> { try { resultSet.next(); @@ -1009,19 +1048,23 @@ default CompletionStage reactiveInitializeLazyPropertiesFromDatastore( } ); } - default Object initLazyProperty(String fieldName, Object entity, SharedSessionContractImplementor session, EntityEntry entry, PersistentAttributeInterceptor interceptor, List fetchGroupAttributeDescriptors, Set initializedLazyAttributeNames, ResultSet rs) { + default Object initLazyProperty(String fieldName, Object entity, + SharedSessionContractImplementor session, + EntityEntry entry, + PersistentAttributeInterceptor interceptor, + List fetchGroupAttributeDescriptors, + Set initializedLazyAttributeNames, + ResultSet resultSet) { for ( LazyAttributeDescriptor fetchGroupAttributeDescriptor: fetchGroupAttributeDescriptors ) { - final boolean previousInitialized = - initializedLazyAttributeNames.contains( fetchGroupAttributeDescriptor.getName() ); - if ( previousInitialized ) { + if ( initializedLazyAttributeNames.contains( fetchGroupAttributeDescriptor.getName() ) ) { continue; } final Object selectedValue; try { selectedValue = fetchGroupAttributeDescriptor.getType().nullSafeGet( - rs, + resultSet, getLazyPropertyColumnAliases()[ fetchGroupAttributeDescriptor.getLazyIndex() ], session, entity diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java index 07f3079e5..38a452678 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java @@ -11,6 +11,8 @@ import org.hibernate.Incubating; import org.hibernate.LockMode; import org.hibernate.UnknownProfileException; +import org.hibernate.engine.spi.PersistenceContext; +import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.event.internal.MergeContext; import org.hibernate.internal.util.collections.IdentitySet; import org.hibernate.reactive.engine.ReactiveActionQueue; @@ -37,6 +39,11 @@ public interface ReactiveSession extends ReactiveQueryExecutor { ReactiveActionQueue getReactiveActionQueue(); + PersistenceContext getPersistenceContext(); + + @Override + SessionImplementor getSharedContract(); + CompletionStage reactiveFetch(T association, boolean unproxy); CompletionStage reactiveFetch(E entity, Attribute field); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java index ceb0355d7..37bb8e6fc 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java @@ -27,7 +27,7 @@ import org.hibernate.engine.spi.NamedSQLQueryDefinition; import org.hibernate.engine.spi.QueryParameters; import org.hibernate.engine.spi.SessionFactoryImplementor; -import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.event.internal.MergeContext; import org.hibernate.event.service.spi.EventListenerRegistry; import org.hibernate.event.spi.AutoFlushEvent; @@ -126,7 +126,7 @@ public ReactiveSessionImpl(SessionFactoryImpl delegate, SessionCreationOptions o } @Override - public SharedSessionContractImplementor getSharedContract() { + public SessionImplementor getSharedContract() { return this; } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index d4cd42542..7e90fffd2 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -268,6 +268,52 @@ public void reactiveFindWithOptimisticIncrementLock(TestContext context) { ); } + @Test + public void reactiveLockWithOptimisticIncrement(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session.lock( actualPig, LockMode.OPTIMISTIC_FORCE_INCREMENT ) + .thenAccept( vv -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC_FORCE_INCREMENT ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 1, actualPig.version ) ) + ); + } + + @Test + public void reactiveLockWithIncrement(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session.lock( actualPig, LockMode.PESSIMISTIC_FORCE_INCREMENT ) + .thenAccept( vv -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_FORCE_INCREMENT ); + context.assertEquals( 1, actualPig.version ); + } ) + ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 1, actualPig.version ) ) + ); + } + @Test public void reactiveFindWithOptimisticVerifyLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); @@ -289,6 +335,121 @@ public void reactiveFindWithOptimisticVerifyLock(TestContext context) { ); } + @Test + public void reactiveLockWithOptimisticVerify(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session.lock( actualPig, LockMode.OPTIMISTIC ) + .thenAccept( vv -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + + @Test + public void reactiveFindWithPessimisticRead(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + // does a select ... for share + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_READ ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_READ ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + + @Test + public void reactiveLockWithPessimisticRead(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + // does a select ... for share + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session.lock( actualPig, LockMode.PESSIMISTIC_READ ) + .thenAccept( vv -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_READ ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + + @Test + public void reactiveFindWithPessimisticWrite(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + // does a select ... for update + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_WRITE ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_WRITE ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + + @Test + public void reactiveLockWithPessimisticWrite(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + // does a select ... for update + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session.lock( actualPig, LockMode.PESSIMISTIC_WRITE ) + .thenAccept( vv -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_WRITE ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + @Test public void reactiveQueryWithLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" );