From 90e200e333adfb22de6c243ae7e74a2ed0741ecd Mon Sep 17 00:00:00 2001 From: Gavin King Date: Mon, 24 Aug 2020 00:16:39 +0200 Subject: [PATCH] support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockModes These LockModes force a version check or upgrade right at the end of the transaction. This required building infrastructure for reactive before/after transaction completion events. Fixes #201 --- .../reactive/engine/ReactiveActionQueue.java | 56 ++++++++++++--- ...tiveAfterTransactionCompletionProcess.java | 25 +++++++ ...iveBeforeTransactionCompletionProcess.java | 24 +++++++ ...ReactiveEntityIncrementVersionProcess.java | 59 ++++++++++++++++ .../ReactiveEntityVerifyVersionProcess.java | 60 ++++++++++++++++ .../DefaultReactiveLockEventListener.java | 16 +++-- .../DefaultReactivePostLoadEventListener.java | 70 +++++++++++++++++++ .../impl/ReactiveAbstractEntityPersister.java | 2 +- .../entity/impl/ReactiveEntityPersister.java | 2 +- .../provider/impl/ReactiveIntegrator.java | 1 + .../reactive/stage/impl/StageSessionImpl.java | 9 ++- .../reactive/ReactiveSessionTest.java | 42 +++++++++++ 12 files changed, 344 insertions(+), 22 deletions(-) create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java create mode 100644 hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java index 7ddca1efb7..deac2c8a53 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveActionQueue.java @@ -517,6 +517,20 @@ public void registerProcess(BeforeTransactionCompletionProcess process) { beforeTransactionProcesses.register( process ); } + public void registerProcess(ReactiveAfterTransactionCompletionProcess process) { + if ( afterTransactionProcesses == null ) { + afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); + } + afterTransactionProcesses.registerReactive( process ); + } + + public void registerProcess(ReactiveBeforeTransactionCompletionProcess process) { + if ( beforeTransactionProcesses == null ) { + beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); + } + beforeTransactionProcesses.registerReactive( process ); + } + /** * Perform all currently queued entity-insertion actions. * @@ -576,25 +590,27 @@ private void prepareActions(ExecutableList queue) throws HibernateException { * * @param success Was the transaction successful. */ - public void afterTransactionCompletion(boolean success) { + public CompletionStage afterTransactionCompletion(boolean success) { if ( !isTransactionCoordinatorShared ) { // Execute completion actions only in transaction owner (aka parent session). if ( afterTransactionProcesses != null ) { - afterTransactionProcesses.afterTransactionCompletion( success ); + return afterTransactionProcesses.afterTransactionCompletion( success ); } } + return CompletionStages.voidFuture(); } /** * Execute any registered {@link org.hibernate.action.spi.BeforeTransactionCompletionProcess} */ - public void beforeTransactionCompletion() { + public CompletionStage beforeTransactionCompletion() { if ( !isTransactionCoordinatorShared ) { // Execute completion actions only in transaction owner (aka parent session). if ( beforeTransactionProcesses != null ) { - beforeTransactionProcesses.beforeTransactionCompletion(); + return beforeTransactionProcesses.beforeTransactionCompletion(); } } + return CompletionStages.voidFuture(); } /** @@ -917,11 +933,12 @@ public void serialize(ObjectOutputStream oos) throws IOException { } } - private abstract static class AbstractTransactionCompletionProcessQueue { + private abstract static class AbstractTransactionCompletionProcessQueue { protected SessionImplementor session; // Concurrency handling required when transaction completion process is dynamically registered // inside event listener (HHH-7478). protected Queue processes = new ConcurrentLinkedQueue<>(); + protected Queue reactiveProcesses = new ConcurrentLinkedQueue<>(); private AbstractTransactionCompletionProcessQueue(SessionImplementor session) { this.session = session; @@ -934,8 +951,15 @@ public void register(T process) { processes.add( process ); } + public void registerReactive(U process) { + if ( process == null ) { + return; + } + reactiveProcesses.add( process ); + } + public boolean hasActions() { - return !processes.isEmpty(); + return !processes.isEmpty() && !reactiveProcesses.isEmpty(); } } @@ -943,12 +967,13 @@ public boolean hasActions() { * Encapsulates behavior needed for before transaction processing */ private static class BeforeTransactionCompletionProcessQueue - extends AbstractTransactionCompletionProcessQueue { + extends AbstractTransactionCompletionProcessQueue { private BeforeTransactionCompletionProcessQueue(SessionImplementor session) { super( session ); } - public void beforeTransactionCompletion() { + public CompletionStage beforeTransactionCompletion() { while ( !processes.isEmpty() ) { try { processes.poll().doBeforeTransactionCompletion( session ); @@ -960,6 +985,10 @@ public void beforeTransactionCompletion() { throw new AssertionFailure( "Unable to perform beforeTransactionCompletion callback", e ); } } + return CompletionStages.loop( + reactiveProcesses, + process -> process.doBeforeTransactionCompletion( session ) + ); } } @@ -967,7 +996,8 @@ public void beforeTransactionCompletion() { * Encapsulates behavior needed for after transaction processing */ private static class AfterTransactionCompletionProcessQueue - extends AbstractTransactionCompletionProcessQueue { + extends AbstractTransactionCompletionProcessQueue { private Set querySpacesToInvalidate = new HashSet<>(); private AfterTransactionCompletionProcessQueue(SessionImplementor session) { @@ -978,7 +1008,7 @@ public void addSpaceToInvalidate(Serializable space) { querySpacesToInvalidate.add( space ); } - public void afterTransactionCompletion(boolean success) { + public CompletionStage afterTransactionCompletion(boolean success) { while ( !processes.isEmpty() ) { try { processes.poll().doAfterTransactionCompletion( success, session ); @@ -999,6 +1029,12 @@ public void afterTransactionCompletion(boolean success) { ); } querySpacesToInvalidate.clear(); + + return CompletionStages.loop( + reactiveProcesses, + process -> process.doAfterTransactionCompletion( success, session ) + ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java new file mode 100644 index 0000000000..c0f26c622b --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveAfterTransactionCompletionProcess.java @@ -0,0 +1,25 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine; + +import org.hibernate.engine.spi.SharedSessionContractImplementor; + +import java.util.concurrent.CompletionStage; + +/** + * Contract representing some process that needs to occur during after transaction completion. + * + * @author Steve Ebersole + */ +public interface ReactiveAfterTransactionCompletionProcess { + /** + * Perform whatever processing is encapsulated here after completion of the transaction. + * + * @param success Did the transaction complete successfully? True means it did. + * @param session The session on which the transaction is completing. + */ + CompletionStage doAfterTransactionCompletion(boolean success, SharedSessionContractImplementor session); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java new file mode 100644 index 0000000000..6989bbd0e6 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/ReactiveBeforeTransactionCompletionProcess.java @@ -0,0 +1,24 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine; + +import org.hibernate.engine.spi.SessionImplementor; + +import java.util.concurrent.CompletionStage; + +/** + * Contract representing some process that needs to occur during before transaction completion. + * + * @author Steve Ebersole + */ +public interface ReactiveBeforeTransactionCompletionProcess { + /** + * Perform whatever processing is encapsulated here before completion of the transaction. + * + * @param session The session on which the transaction is preparing to complete. + */ + CompletionStage doBeforeTransactionCompletion(SessionImplementor session); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java new file mode 100644 index 0000000000..6bcb944016 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityIncrementVersionProcess.java @@ -0,0 +1,59 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine.impl; + +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; +import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.util.impl.CompletionStages; + +import java.util.concurrent.CompletionStage; + +/** + * A BeforeTransactionCompletionProcess impl to verify and increment an entity version as party + * of before-transaction-completion processing + * + * @author Scott Marlow + * @author Gavin King + */ +public class ReactiveEntityIncrementVersionProcess implements ReactiveBeforeTransactionCompletionProcess { + private final Object object; + + /** + * Constructs an EntityIncrementVersionProcess for the given entity. + * + * @param object The entity instance + */ + public ReactiveEntityIncrementVersionProcess(Object object) { + this.object = object; + } + + /** + * Perform whatever processing is encapsulated here before completion of the transaction. + * + * @param session The session on which the transaction is preparing to complete. + */ + @Override + public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + final EntityEntry entry = session.getPersistenceContext().getEntry( object ); + // Don't increment version for an entity that is not in the PersistenceContext; + if ( entry == null ) { + return CompletionStages.voidFuture(); + } + + return ( (ReactiveEntityPersister) entry.getPersister() ) + .lockReactive( + entry.getId(), + entry.getVersion(), + object, + new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT), + session + ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java new file mode 100644 index 0000000000..43875f20a5 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/impl/ReactiveEntityVerifyVersionProcess.java @@ -0,0 +1,60 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.engine.impl; + +import org.hibernate.dialect.lock.OptimisticEntityLockException; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.pretty.MessageHelper; +import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess; +import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister; +import org.hibernate.reactive.util.impl.CompletionStages; + +import java.util.concurrent.CompletionStage; + +/** + * A BeforeTransactionCompletionProcess impl to verify an entity version as part of + * before-transaction-completion processing + * + * @author Scott Marlow + * @author Gavin King + */ +public class ReactiveEntityVerifyVersionProcess implements ReactiveBeforeTransactionCompletionProcess { + private final Object object; + + /** + * Constructs an EntityVerifyVersionProcess + * + * @param object The entity instance + */ + public ReactiveEntityVerifyVersionProcess(Object object) { + this.object = object; + } + + @Override + public CompletionStage doBeforeTransactionCompletion(SessionImplementor session) { + final EntityEntry entry = session.getPersistenceContext().getEntry( object ); + // Don't check version for an entity that is not in the PersistenceContext; + if ( entry == null ) { + return CompletionStages.voidFuture(); + } + + final ReactiveEntityPersister persister = (ReactiveEntityPersister) entry.getPersister(); + //TODO: optimize this to only fetch the version number! + return persister.reactiveGetDatabaseSnapshot( entry.getId(), session ) + .thenApply( snapshot -> snapshot[ persister.getVersionProperty() ] ) + .thenAccept( latestVersion -> { + if ( !entry.getVersion().equals( latestVersion ) ) { + throw new OptimisticEntityLockException( + object, + "Newer version [" + latestVersion + + "] of entity [" + MessageHelper.infoString( entry.getEntityName(), entry.getId() ) + + "] found in database" + ); + } + } ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java index a9ac7797e7..659d91968b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactiveLockEventListener.java @@ -164,13 +164,15 @@ protected CompletionStage upgradeLock(Object object, EntityEntry entry, ck = null; } - return ((ReactiveEntityPersister) persister).lockReactive( - entry.getId(), - entry.getVersion(), - object, - lockOptions, - source - ).thenAccept( v -> entry.setLockMode(requestedLockMode) ) + return ((ReactiveEntityPersister) persister) + .lockReactive( + entry.getId(), + entry.getVersion(), + object, + lockOptions, + source + ) + .thenAccept( v -> entry.setLockMode(requestedLockMode) ) .whenComplete( (r, e) -> { // the database now holds a lock + the object is flushed from the cache, // so release the soft lock diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java new file mode 100644 index 0000000000..a191bf1d89 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/event/impl/DefaultReactivePostLoadEventListener.java @@ -0,0 +1,70 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: LGPL-2.1-or-later + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.event.impl; + +import org.hibernate.AssertionFailure; +import org.hibernate.LockMode; +import org.hibernate.classic.Lifecycle; +import org.hibernate.engine.spi.EntityEntry; +import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.event.spi.PostLoadEvent; +import org.hibernate.event.spi.PostLoadEventListener; +import org.hibernate.jpa.event.spi.CallbackRegistry; +import org.hibernate.jpa.event.spi.CallbackRegistryConsumer; +import org.hibernate.persister.entity.EntityPersister; +import org.hibernate.reactive.engine.impl.ReactiveEntityIncrementVersionProcess; +import org.hibernate.reactive.engine.impl.ReactiveEntityVerifyVersionProcess; +import org.hibernate.reactive.session.ReactiveSession; + +/** + * We do 2 things here:
    + *
  • Call {@link Lifecycle} interface if necessary
  • + *
  • Perform needed {@link EntityEntry#getLockMode()} related processing
  • + *
+ * + * @author Gavin King + * @author Steve Ebersole + */ +public class DefaultReactivePostLoadEventListener implements PostLoadEventListener, CallbackRegistryConsumer { + private CallbackRegistry callbackRegistry; + + @Override + public void injectCallbackRegistry(CallbackRegistry callbackRegistry) { + this.callbackRegistry = callbackRegistry; + } + + @Override + public void onPostLoad(PostLoadEvent event) { + final Object entity = event.getEntity(); + + callbackRegistry.postLoad( entity ); + + final SessionImplementor session = event.getSession(); + final EntityEntry entry = session.getPersistenceContextInternal().getEntry( entity ); + if ( entry == null ) { + throw new AssertionFailure( "possible non-threadsafe access to the session" ); + } + + final LockMode lockMode = entry.getLockMode(); + if ( LockMode.PESSIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { + final EntityPersister persister = entry.getPersister(); + final Object nextVersion = persister.forceVersionIncrement( + entry.getId(), + entry.getVersion(), + session + ); + entry.forceLocked( entity, nextVersion ); + } + else if ( LockMode.OPTIMISTIC_FORCE_INCREMENT.equals( lockMode ) ) { + ((ReactiveSession) session).getReactiveActionQueue() + .registerProcess( new ReactiveEntityIncrementVersionProcess( entity ) ); + } + else if ( LockMode.OPTIMISTIC.equals( lockMode ) ) { + ((ReactiveSession) session).getReactiveActionQueue() + .registerProcess( new ReactiveEntityVerifyVersionProcess( entity ) ); + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java index 813319db42..9fc120341f 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveAbstractEntityPersister.java @@ -696,7 +696,7 @@ default String generateUpdateLockString(LockOptions lockOptions) { } @Override - default CompletionStage lockReactive( + default CompletionStage lockReactive( Serializable id, Object version, Object object, diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java index d07b6e8140..513ff515e5 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveEntityPersister.java @@ -78,7 +78,7 @@ CompletionStage updateReactive( /** * Obtain a pessimistic lock without blocking */ - CompletionStage lockReactive( + CompletionStage lockReactive( Serializable id, Object version, Object object, diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java index e96058d1e8..bda4f1ff66 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveIntegrator.java @@ -56,6 +56,7 @@ private void attachEventContextManagingListenersIfRequired(SessionFactoryService eventListenerRegistry.getEventListenerGroup( EventType.LOCK ).appendListener( new DefaultReactiveLockEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.LOAD ).appendListener( new DefaultReactiveLoadEventListener() ); eventListenerRegistry.getEventListenerGroup( EventType.INIT_COLLECTION ).appendListener( new DefaultReactiveInitializeCollectionEventListener() ); + eventListenerRegistry.getEventListenerGroup( EventType.POST_LOAD ).appendListener( new DefaultReactivePostLoadEventListener() ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index 617c65b084..b937023702 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -13,6 +13,7 @@ import org.hibernate.graph.GraphSemantic; import org.hibernate.graph.spi.RootGraphImplementor; import org.hibernate.reactive.common.ResultSetMapping; +import org.hibernate.reactive.engine.ReactiveActionQueue; import org.hibernate.reactive.session.Criteria; import org.hibernate.reactive.session.ReactiveSession; import org.hibernate.reactive.stage.Stage; @@ -402,9 +403,11 @@ CompletionStage begin() { } CompletionStage end() { - return rollback - ? delegate.getReactiveConnection().rollbackTransaction() - : delegate.getReactiveConnection().commitTransaction(); + ReactiveActionQueue actionQueue = delegate.getReactiveActionQueue(); + return actionQueue.beforeTransactionCompletion() + .thenApply( v -> delegate.getReactiveConnection() ) + .thenCompose( c -> rollback ? c.rollbackTransaction() : c.commitTransaction() ) + .thenCompose( v -> actionQueue.afterTransactionCompletion( !rollback ) ); } R processError(R result, Throwable e) { diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index 741d3a6540..1b41b8ee8b 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -226,6 +226,48 @@ public void reactiveFindThenForceLock(TestContext context) { ); } + @Test + public void reactiveFindWithOptimisticIncrementLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC_FORCE_INCREMENT ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC_FORCE_INCREMENT ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 1, actualPig.version ) ) + ); + } + + @Test + public void reactiveFindWithOptimisticVerifyLock(TestContext context) { + final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); + test( + context, + populateDB() + .thenCompose( v -> getSessionFactory().withTransaction( + (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( context, expectedPig, actualPig ); + context.assertEquals( session.getLockMode( actualPig ), LockMode.OPTIMISTIC ); + context.assertEquals( 0, actualPig.version ); + } ) + ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> context.assertEquals( 0, actualPig.version ) ) + ); + } + @Test public void reactiveQueryWithLock(TestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" );