Skip to content

Commit

Permalink
support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockModes
Browse files Browse the repository at this point in the history
These LockModes force a version check or upgrade right at the
end of the transaction.

This required building infrastructure for reactive before/after
transaction completion events.

Fixes #201
  • Loading branch information
gavinking committed Aug 23, 2020
1 parent d11e49c commit 90e200e
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ public void registerProcess(BeforeTransactionCompletionProcess process) {
beforeTransactionProcesses.register( process );
}

public void registerProcess(ReactiveAfterTransactionCompletionProcess process) {
if ( afterTransactionProcesses == null ) {
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
}
afterTransactionProcesses.registerReactive( process );
}

public void registerProcess(ReactiveBeforeTransactionCompletionProcess process) {
if ( beforeTransactionProcesses == null ) {
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
}
beforeTransactionProcesses.registerReactive( process );
}

/**
* Perform all currently queued entity-insertion actions.
*
Expand Down Expand Up @@ -576,25 +590,27 @@ private void prepareActions(ExecutableList<?> queue) throws HibernateException {
*
* @param success Was the transaction successful.
*/
public void afterTransactionCompletion(boolean success) {
public CompletionStage<Void> afterTransactionCompletion(boolean success) {
if ( !isTransactionCoordinatorShared ) {
// Execute completion actions only in transaction owner (aka parent session).
if ( afterTransactionProcesses != null ) {
afterTransactionProcesses.afterTransactionCompletion( success );
return afterTransactionProcesses.afterTransactionCompletion( success );
}
}
return CompletionStages.voidFuture();
}

/**
* Execute any registered {@link org.hibernate.action.spi.BeforeTransactionCompletionProcess}
*/
public void beforeTransactionCompletion() {
public CompletionStage<Void> beforeTransactionCompletion() {
if ( !isTransactionCoordinatorShared ) {
// Execute completion actions only in transaction owner (aka parent session).
if ( beforeTransactionProcesses != null ) {
beforeTransactionProcesses.beforeTransactionCompletion();
return beforeTransactionProcesses.beforeTransactionCompletion();
}
}
return CompletionStages.voidFuture();
}

/**
Expand Down Expand Up @@ -917,11 +933,12 @@ public void serialize(ObjectOutputStream oos) throws IOException {
}
}

private abstract static class AbstractTransactionCompletionProcessQueue<T> {
private abstract static class AbstractTransactionCompletionProcessQueue<T,U> {
protected SessionImplementor session;
// Concurrency handling required when transaction completion process is dynamically registered
// inside event listener (HHH-7478).
protected Queue<T> processes = new ConcurrentLinkedQueue<>();
protected Queue<U> reactiveProcesses = new ConcurrentLinkedQueue<>();

private AbstractTransactionCompletionProcessQueue(SessionImplementor session) {
this.session = session;
Expand All @@ -934,21 +951,29 @@ public void register(T process) {
processes.add( process );
}

public void registerReactive(U process) {
if ( process == null ) {
return;
}
reactiveProcesses.add( process );
}

public boolean hasActions() {
return !processes.isEmpty();
return !processes.isEmpty() && !reactiveProcesses.isEmpty();
}
}

/**
* Encapsulates behavior needed for before transaction processing
*/
private static class BeforeTransactionCompletionProcessQueue
extends AbstractTransactionCompletionProcessQueue<BeforeTransactionCompletionProcess> {
extends AbstractTransactionCompletionProcessQueue<BeforeTransactionCompletionProcess,
ReactiveBeforeTransactionCompletionProcess> {
private BeforeTransactionCompletionProcessQueue(SessionImplementor session) {
super( session );
}

public void beforeTransactionCompletion() {
public CompletionStage<Void> beforeTransactionCompletion() {
while ( !processes.isEmpty() ) {
try {
processes.poll().doBeforeTransactionCompletion( session );
Expand All @@ -960,14 +985,19 @@ public void beforeTransactionCompletion() {
throw new AssertionFailure( "Unable to perform beforeTransactionCompletion callback", e );
}
}
return CompletionStages.loop(
reactiveProcesses,
process -> process.doBeforeTransactionCompletion( session )
);
}
}

/**
* Encapsulates behavior needed for after transaction processing
*/
private static class AfterTransactionCompletionProcessQueue
extends AbstractTransactionCompletionProcessQueue<AfterTransactionCompletionProcess> {
extends AbstractTransactionCompletionProcessQueue<AfterTransactionCompletionProcess,
ReactiveAfterTransactionCompletionProcess> {
private Set<Serializable> querySpacesToInvalidate = new HashSet<>();

private AfterTransactionCompletionProcessQueue(SessionImplementor session) {
Expand All @@ -978,7 +1008,7 @@ public void addSpaceToInvalidate(Serializable space) {
querySpacesToInvalidate.add( space );
}

public void afterTransactionCompletion(boolean success) {
public CompletionStage<Void> afterTransactionCompletion(boolean success) {
while ( !processes.isEmpty() ) {
try {
processes.poll().doAfterTransactionCompletion( success, session );
Expand All @@ -999,6 +1029,12 @@ public void afterTransactionCompletion(boolean success) {
);
}
querySpacesToInvalidate.clear();

return CompletionStages.loop(
reactiveProcesses,
process -> process.doAfterTransactionCompletion( success, session )
);

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine;

import org.hibernate.engine.spi.SharedSessionContractImplementor;

import java.util.concurrent.CompletionStage;

/**
* Contract representing some process that needs to occur during after transaction completion.
*
* @author Steve Ebersole
*/
public interface ReactiveAfterTransactionCompletionProcess {
/**
* Perform whatever processing is encapsulated here after completion of the transaction.
*
* @param success Did the transaction complete successfully? True means it did.
* @param session The session on which the transaction is completing.
*/
CompletionStage<Void> doAfterTransactionCompletion(boolean success, SharedSessionContractImplementor session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine;

import org.hibernate.engine.spi.SessionImplementor;

import java.util.concurrent.CompletionStage;

/**
* Contract representing some process that needs to occur during before transaction completion.
*
* @author Steve Ebersole
*/
public interface ReactiveBeforeTransactionCompletionProcess {
/**
* Perform whatever processing is encapsulated here before completion of the transaction.
*
* @param session The session on which the transaction is preparing to complete.
*/
CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import org.hibernate.LockMode;
import org.hibernate.LockOptions;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.util.impl.CompletionStages;

import java.util.concurrent.CompletionStage;

/**
* A BeforeTransactionCompletionProcess impl to verify and increment an entity version as party
* of before-transaction-completion processing
*
* @author Scott Marlow
* @author Gavin King
*/
public class ReactiveEntityIncrementVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
private final Object object;

/**
* Constructs an EntityIncrementVersionProcess for the given entity.
*
* @param object The entity instance
*/
public ReactiveEntityIncrementVersionProcess(Object object) {
this.object = object;
}

/**
* Perform whatever processing is encapsulated here before completion of the transaction.
*
* @param session The session on which the transaction is preparing to complete.
*/
@Override
public CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session) {
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
// Don't increment version for an entity that is not in the PersistenceContext;
if ( entry == null ) {
return CompletionStages.voidFuture();
}

return ( (ReactiveEntityPersister) entry.getPersister() )
.lockReactive(
entry.getId(),
entry.getVersion(),
object,
new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT),
session
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import org.hibernate.dialect.lock.OptimisticEntityLockException;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.pretty.MessageHelper;
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.util.impl.CompletionStages;

import java.util.concurrent.CompletionStage;

/**
* A BeforeTransactionCompletionProcess impl to verify an entity version as part of
* before-transaction-completion processing
*
* @author Scott Marlow
* @author Gavin King
*/
public class ReactiveEntityVerifyVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
private final Object object;

/**
* Constructs an EntityVerifyVersionProcess
*
* @param object The entity instance
*/
public ReactiveEntityVerifyVersionProcess(Object object) {
this.object = object;
}

@Override
public CompletionStage<Void> doBeforeTransactionCompletion(SessionImplementor session) {
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
// Don't check version for an entity that is not in the PersistenceContext;
if ( entry == null ) {
return CompletionStages.voidFuture();
}

final ReactiveEntityPersister persister = (ReactiveEntityPersister) entry.getPersister();
//TODO: optimize this to only fetch the version number!
return persister.reactiveGetDatabaseSnapshot( entry.getId(), session )
.thenApply( snapshot -> snapshot[ persister.getVersionProperty() ] )
.thenAccept( latestVersion -> {
if ( !entry.getVersion().equals( latestVersion ) ) {
throw new OptimisticEntityLockException(
object,
"Newer version [" + latestVersion +
"] of entity [" + MessageHelper.infoString( entry.getEntityName(), entry.getId() ) +
"] found in database"
);
}
} );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,15 @@ protected CompletionStage<Void> upgradeLock(Object object, EntityEntry entry,
ck = null;
}

return ((ReactiveEntityPersister) persister).lockReactive(
entry.getId(),
entry.getVersion(),
object,
lockOptions,
source
).thenAccept( v -> entry.setLockMode(requestedLockMode) )
return ((ReactiveEntityPersister) persister)
.lockReactive(
entry.getId(),
entry.getVersion(),
object,
lockOptions,
source
)
.thenAccept( v -> entry.setLockMode(requestedLockMode) )
.whenComplete( (r, e) -> {
// the database now holds a lock + the object is flushed from the cache,
// so release the soft lock
Expand Down
Loading

0 comments on commit 90e200e

Please sign in to comment.