Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for OPTIMISTIC + OPTIMISTIC_FORCE_INCREMENT LockModes #329

Merged
merged 3 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine;

import org.hibernate.reactive.session.ReactiveSession;

import java.util.concurrent.CompletionStage;

/**
* Contract representing some process that needs to occur during after transaction completion.
*
* @author Gavin King
* @author Steve Ebersole
*/
public interface ReactiveAfterTransactionCompletionProcess {
/**
* Perform whatever processing is encapsulated here after completion of the transaction.
*
* @param success Did the transaction complete successfully? True means it did.
* @param session The session on which the transaction is completing.
*/
CompletionStage<Void> doAfterTransactionCompletion(boolean success, ReactiveSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine;

import org.hibernate.reactive.session.ReactiveSession;

import java.util.concurrent.CompletionStage;

/**
* Contract representing some process that needs to occur during before transaction completion.
*
* @author Gavin King
* @author Steve Ebersole
*/
public interface ReactiveBeforeTransactionCompletionProcess {
/**
* Perform whatever processing is encapsulated here before completion of the transaction.
*
* @param session The session on which the transaction is preparing to complete.
*/
CompletionStage<Void> doBeforeTransactionCompletion(ReactiveSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import org.hibernate.LockMode;
import org.hibernate.LockOptions;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.reactive.util.impl.CompletionStages;

import java.util.concurrent.CompletionStage;

/**
* A BeforeTransactionCompletionProcess impl to verify and increment an entity version as party
* of before-transaction-completion processing
*
* @author Scott Marlow
* @author Gavin King
*/
public class ReactiveEntityIncrementVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
private final Object object;

/**
* Constructs an EntityIncrementVersionProcess for the given entity.
*
* @param object The entity instance
*/
public ReactiveEntityIncrementVersionProcess(Object object) {
this.object = object;
}

/**
* Perform whatever processing is encapsulated here before completion of the transaction.
*
* @param session The session on which the transaction is preparing to complete.
*/
@Override
public CompletionStage<Void> doBeforeTransactionCompletion(ReactiveSession session) {
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
// Don't increment version for an entity that is not in the PersistenceContext;
if ( entry == null ) {
return CompletionStages.voidFuture();
}

return ( (ReactiveEntityPersister) entry.getPersister() )
.lockReactive(
entry.getId(),
entry.getVersion(),
object,
new LockOptions(LockMode.PESSIMISTIC_FORCE_INCREMENT),
session.getSharedContract()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.engine.impl;

import org.hibernate.dialect.lock.OptimisticEntityLockException;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.pretty.MessageHelper;
import org.hibernate.reactive.engine.ReactiveBeforeTransactionCompletionProcess;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.reactive.util.impl.CompletionStages;

import java.util.concurrent.CompletionStage;

/**
* A BeforeTransactionCompletionProcess impl to verify an entity version as part of
* before-transaction-completion processing
*
* @author Scott Marlow
* @author Gavin King
*/
public class ReactiveEntityVerifyVersionProcess implements ReactiveBeforeTransactionCompletionProcess {
private final Object object;

/**
* Constructs an EntityVerifyVersionProcess
*
* @param object The entity instance
*/
public ReactiveEntityVerifyVersionProcess(Object object) {
this.object = object;
}

@Override
public CompletionStage<Void> doBeforeTransactionCompletion(ReactiveSession session) {
final EntityEntry entry = session.getPersistenceContext().getEntry( object );
// Don't check version for an entity that is not in the PersistenceContext;
if ( entry == null ) {
return CompletionStages.voidFuture();
}

return ( (ReactiveEntityPersister) entry.getPersister() )
.reactiveGetCurrentVersion( entry.getId(), session.getSharedContract() )
.thenAccept( latestVersion -> {
if ( !entry.getVersion().equals( latestVersion ) ) {
throw new OptimisticEntityLockException(
object,
"Newer version [" + latestVersion +
"] of entity [" + MessageHelper.infoString( entry.getEntityName(), entry.getId() ) +
"] found in database"
);
}
} );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,44 @@ public CompletionStage<Void> reactiveOnLoad(
final LoadEvent event,
final LoadEventListener.LoadType loadType) throws HibernateException {

final EntityPersister persister = getPersister( event );

final ReactiveEntityPersister persister = (ReactiveEntityPersister) getPersister( event );
if ( persister == null ) {
throw new HibernateException( "Unable to locate persister: " + event.getEntityClassName() );
}

return checkId( event, loadType, persister ).thenCompose(
CompletionStage<Void> result = checkId( event, loadType, persister ).thenCompose(
vd -> doOnLoad( persister, event, loadType )
.thenAccept( event::setResult )
.handle( (v, x) -> {
if ( x instanceof HibernateException ) {
LOG.unableToLoadCommand( (HibernateException) x );
}
CompletionStages.returnNullorRethrow( x );

if ( event.getResult() instanceof CompletionStage ) {
throw new AssertionFailure( "Unexpected CompletionStage" );
throw new AssertionFailure("Unexpected CompletionStage");
}

return v;
} ));
if (x instanceof HibernateException) {
LOG.unableToLoadCommand( (HibernateException) x );
}
return CompletionStages.returnNullorRethrow( x );
} )
);

// if a pessimistic version increment was requested, we need
// to go back to the database immediately and update the row
if ( event.getLockMode() == LockMode.PESSIMISTIC_FORCE_INCREMENT
|| event.getLockMode() == LockMode.FORCE ) {
// TODO: should we call CachedDomainDataAccess.lockItem() ?
return result.thenCompose(
v -> persister.lockReactive(
event.getEntityId(),
persister.getVersion( event.getResult() ),
event.getResult(),
event.getLockOptions(),
event.getSession()
)
);
}
else {
return result;
}
}

private CompletionStage<Void> checkId(LoadEvent event, LoadType loadType, EntityPersister persister) {
Expand Down Expand Up @@ -505,37 +522,38 @@ private CompletionStage<Object> lockAndLoad(
final EntityKey keyToLoad,
final LoadEventListener.LoadType options,
final SessionImplementor source) {
final SoftLock lock;
final Object ck;
final EntityDataAccess cache = persister.getCacheAccessStrategy();

final boolean canWriteToCache = persister.canWriteToCache();
final SoftLock lock;
final Object cacheKey;
if ( canWriteToCache ) {
ck = cache.generateCacheKey(
EntityDataAccess cache = persister.getCacheAccessStrategy();
cacheKey = cache.generateCacheKey(
event.getEntityId(),
persister,
source.getFactory(),
source.getTenantIdentifier()
);
lock = cache.lockItem( source, ck, null );
lock = cache.lockItem( source, cacheKey, null );
}
else {
ck = null;
cacheKey = null;
lock = null;
}

try {
return load( event, persister, keyToLoad, options )
.whenComplete( (v, x) -> {
if ( canWriteToCache ) {
cache.unlockItem( source, ck, lock );
persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock );
}
} )
.thenApply( entity -> source.getPersistenceContextInternal().proxyFor( persister, keyToLoad, entity ) );
}
catch (HibernateException he) {
//in case load() throws an exception
if ( canWriteToCache ) {
cache.unlockItem( source, ck, lock );
persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock );
}
throw he;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.hibernate.reactive.engine.impl.Cascade;
import org.hibernate.reactive.engine.impl.CascadingActions;
import org.hibernate.reactive.engine.impl.ForeignKeys;
import org.hibernate.reactive.engine.impl.ReactiveEntityIncrementVersionProcess;
import org.hibernate.reactive.engine.impl.ReactiveEntityVerifyVersionProcess;
import org.hibernate.reactive.event.ReactiveLockEventListener;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.reactive.util.impl.CompletionStages;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -136,52 +139,82 @@ protected CompletionStage<Void> upgradeLock(Object object, EntityEntry entry,
);
}

final EntityPersister persister = entry.getPersister();

if ( log.isTraceEnabled() ) {
log.tracev(
"Locking {0} in mode: {1}",
MessageHelper.infoString( persister, entry.getId(), source.getFactory() ),
MessageHelper.infoString( entry.getPersister(), entry.getId(), source.getFactory() ),
requestedLockMode
);
}

final boolean cachingEnabled = persister.canWriteToCache();
final SoftLock lock;
final Object ck;
if ( cachingEnabled ) {
EntityDataAccess cache = persister.getCacheAccessStrategy();
ck = cache.generateCacheKey(
entry.getId(),
persister,
source.getFactory(),
source.getTenantIdentifier()
);
lock = cache.lockItem( source, ck, entry.getVersion() );
}
else {
lock = null;
ck = null;
switch (requestedLockMode) {
case OPTIMISTIC:
( (ReactiveSession) source ).getReactiveActionQueue()
.registerProcess( new ReactiveEntityVerifyVersionProcess(object) );
entry.setLockMode( requestedLockMode );
return CompletionStages.voidFuture();
case OPTIMISTIC_FORCE_INCREMENT:
( (ReactiveSession) source ).getReactiveActionQueue()
.registerProcess( new ReactiveEntityIncrementVersionProcess(object) );
entry.setLockMode( requestedLockMode );
return CompletionStages.voidFuture();
default:
return doUpgradeLock( object, entry, lockOptions, source );
}
}
else {
return CompletionStages.voidFuture();
}
}

private CompletionStage<Void> doUpgradeLock(Object object, EntityEntry entry,
LockOptions lockOptions,
EventSource source) {

final EntityPersister persister = entry.getPersister();

return ((ReactiveEntityPersister) persister).lockReactive(
final boolean canWriteToCache = persister.canWriteToCache();
final SoftLock lock;
final Object cacheKey;
if ( canWriteToCache ) {
EntityDataAccess cache = persister.getCacheAccessStrategy();
cacheKey = cache.generateCacheKey(
entry.getId(),
entry.getVersion(),
object,
lockOptions,
source
).thenAccept( v -> entry.setLockMode(requestedLockMode) )
persister,
source.getFactory(),
source.getTenantIdentifier()
);
lock = cache.lockItem( source, cacheKey, entry.getVersion() );
}
else {
cacheKey = null;
lock = null;
}

try {
return ((ReactiveEntityPersister) persister)
.lockReactive(
entry.getId(),
entry.getVersion(),
object,
lockOptions,
source
)
.thenAccept( v -> entry.setLockMode( lockOptions.getLockMode() ) )
.whenComplete( (r, e) -> {
// the database now holds a lock + the object is flushed from the cache,
// so release the soft lock
if ( cachingEnabled ) {
persister.getCacheAccessStrategy().unlockItem( source, ck, lock );
if ( canWriteToCache ) {
persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock );
}
} );

}
else {
return CompletionStages.voidFuture();
catch (HibernateException he) {
//in case lockReactive() throws an exception
if ( canWriteToCache ) {
persister.getCacheAccessStrategy().unlockItem( source, cacheKey, lock );
}
throw he;
}
}

Expand Down
Loading