Skip to content

Commit

Permalink
support for lock() operation
Browse files Browse the repository at this point in the history
supported lock modes:

- PESSIMISTIC_LOCK_MODE_WRITE
- PESSIMISTIC_LOCK_MODE_FORCE_INCREMENT
- PESSIMISTIC_LOCK_MODE_READ

not supported:

- optimistic lock modes

see hibernate#142
  • Loading branch information
gavinking committed May 27, 2020
1 parent 457cf56 commit 146fbf9
Show file tree
Hide file tree
Showing 20 changed files with 573 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private void attachEventContextManagingListenersIfRequired(SessionFactoryService
eventListenerRegistry.getEventListenerGroup( EventType.MERGE ).appendListener( new DefaultReactiveMergeEventListener() );
eventListenerRegistry.getEventListenerGroup( EventType.DELETE ).appendListener( new DefaultReactiveDeleteEventListener() );
eventListenerRegistry.getEventListenerGroup( EventType.REFRESH ).appendListener( new DefaultReactiveRefreshEventListener() );
eventListenerRegistry.getEventListenerGroup( EventType.LOCK ).appendListener( new DefaultReactiveLockEventListener() );
eventListenerRegistry.getEventListenerGroup( EventType.LOAD ).appendListener( new DefaultReactiveLoadEventListener() );
eventListenerRegistry.getEventListenerGroup( EventType.INIT_COLLECTION ).appendListener( new DefaultReactiveInitializeCollectionEventListener() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.hibernate.reactive.engine.impl;

import org.hibernate.HibernateException;
import org.hibernate.LockMode;
import org.hibernate.event.internal.MergeContext;
import org.hibernate.event.spi.EventSource;
import org.hibernate.internal.CoreMessageLogger;
Expand Down Expand Up @@ -42,7 +43,7 @@ private CascadingActions() {
public static final CascadingAction<IdentitySet> DELETE =
new BaseCascadingAction<IdentitySet>(org.hibernate.engine.spi.CascadingActions.DELETE) {
@Override
public CompletionStage <?> cascade(
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
Expand All @@ -61,7 +62,7 @@ public CompletionStage <?> cascade(
public static final CascadingAction<IdentitySet> PERSIST =
new BaseCascadingAction<IdentitySet>(org.hibernate.engine.spi.CascadingActions.PERSIST) {
@Override
public CompletionStage <?> cascade(
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
Expand All @@ -81,7 +82,7 @@ public CompletionStage <?> cascade(
public static final CascadingAction<IdentitySet> PERSIST_ON_FLUSH =
new BaseCascadingAction<IdentitySet>(org.hibernate.engine.spi.CascadingActions.PERSIST_ON_FLUSH) {
@Override
public CompletionStage <?> cascade(
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
Expand All @@ -99,7 +100,7 @@ public CompletionStage <?> cascade(
public static final CascadingAction<MergeContext> MERGE =
new BaseCascadingAction<MergeContext>(org.hibernate.engine.spi.CascadingActions.MERGE) {
@Override
public CompletionStage <?> cascade(
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
Expand All @@ -118,7 +119,7 @@ public CompletionStage <?> cascade(
public static final CascadingAction<IdentitySet> REFRESH =
new BaseCascadingAction<IdentitySet>(org.hibernate.engine.spi.CascadingActions.REFRESH) {
@Override
public CompletionStage <?> cascade(
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
Expand All @@ -130,6 +131,24 @@ public CompletionStage <?> cascade(
}
};

/**
* @see org.hibernate.Session#lock(Object, org.hibernate.LockMode)
*/
public static final CascadingAction<LockMode> LOCK =
new BaseCascadingAction<LockMode>(org.hibernate.engine.spi.CascadingActions.LOCK) {
@Override
public CompletionStage<?> cascade(
EventSource session,
Object child,
String entityName,
LockMode context,
boolean isCascadeDeleteEnabled)
throws HibernateException {
LOG.tracev("Cascading to lock: {0}", entityName);
return session.unwrap(ReactiveSession.class).reactiveLock(child, context);
}
};

public abstract static class BaseCascadingAction<C> implements CascadingAction<C> {
private final org.hibernate.engine.spi.CascadingAction delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
* @see DefaultReactivePersistOnFlushEventListener
* @see DefaultReactiveMergeEventListener
*/
abstract class AbstractReactiveSaveEventListener<C>
implements CallbackRegistryConsumer {
abstract class AbstractReactiveSaveEventListener<C> implements CallbackRegistryConsumer {

private static final CoreMessageLogger LOG = CoreLogging.messageLogger( AbstractReactiveSaveEventListener.class );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

import org.jboss.logging.Logger;

public class DefaultReactiveAutoFlushEventListener extends AbstractReactiveFlushingEventListener implements ReactiveAutoFlushEventListener, AutoFlushEventListener {
public class DefaultReactiveAutoFlushEventListener extends AbstractReactiveFlushingEventListener
implements ReactiveAutoFlushEventListener, AutoFlushEventListener {

private static final CoreMessageLogger LOG = Logger.getMessageLogger( CoreMessageLogger.class, DefaultReactiveAutoFlushEventListener.class.getName() );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
*/
public class DefaultReactiveDeleteEventListener
implements DeleteEventListener, ReactiveDeleteEventListener, CallbackRegistryConsumer, JpaBootstrapSensitive {

private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveDeleteEventListener.class );

private CallbackRegistry callbackRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
/**
* A reactific {@link org.hibernate.event.internal.DefaultFlushEventListener}.
*/
public class DefaultReactiveFlushEventListener extends AbstractReactiveFlushingEventListener implements ReactiveFlushEventListener, FlushEventListener {
public class DefaultReactiveFlushEventListener extends AbstractReactiveFlushingEventListener
implements ReactiveFlushEventListener, FlushEventListener {

private static final CoreMessageLogger LOG = Logger.getMessageLogger(
CoreMessageLogger.class,
DefaultReactiveFlushEventListener.class.getName()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.event.impl;

import org.hibernate.HibernateException;
import org.hibernate.LockMode;
import org.hibernate.LockOptions;
import org.hibernate.ObjectDeletedException;
import org.hibernate.TransientObjectException;
import org.hibernate.cache.spi.access.EntityDataAccess;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.internal.CascadePoint;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.engine.spi.PersistenceContext;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.Status;
import org.hibernate.event.internal.AbstractReassociateEventListener;
import org.hibernate.event.internal.DefaultLockEventListener;
import org.hibernate.event.spi.EventSource;
import org.hibernate.event.spi.LockEvent;
import org.hibernate.event.spi.LockEventListener;
import org.hibernate.internal.CoreMessageLogger;
import org.hibernate.persister.entity.EntityPersister;
import org.hibernate.pretty.MessageHelper;
import org.hibernate.reactive.engine.impl.Cascade;
import org.hibernate.reactive.engine.impl.CascadingActions;
import org.hibernate.reactive.engine.impl.ForeignKeys;
import org.hibernate.reactive.event.spi.ReactiveLockEventListener;
import org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister;
import org.hibernate.reactive.util.impl.CompletionStages;
import org.jboss.logging.Logger;

import java.io.Serializable;
import java.util.concurrent.CompletionStage;

public class DefaultReactiveLockEventListener extends AbstractReassociateEventListener
implements LockEventListener, ReactiveLockEventListener {

private static final CoreMessageLogger log = Logger.getMessageLogger(
CoreMessageLogger.class,
DefaultLockEventListener.class.getName()
);

@Override
public CompletionStage<Void> reactiveOnLock(LockEvent event) throws HibernateException {
if ( event.getObject() == null ) {
throw new NullPointerException( "attempted to lock null" );
}

if ( event.getLockMode() == LockMode.WRITE ) {
throw new HibernateException( "Invalid lock mode for lock()" );
}

if ( event.getLockMode() == LockMode.UPGRADE_SKIPLOCKED ) {
log.explicitSkipLockedLockCombo();
}

SessionImplementor source = event.getSession();
final PersistenceContext persistenceContext = source.getPersistenceContextInternal();
Object entity = persistenceContext.unproxyAndReassociate( event.getObject() );
//TODO: if object was an uninitialized proxy, this is inefficient,
// resulting in two SQL selects

EntityEntry entry = persistenceContext.getEntry(entity);
CompletionStage<EntityEntry> stage;
if (entry==null) {
final EntityPersister persister = source.getEntityPersister( event.getEntityName(), entity );
final Serializable id = persister.getIdentifier( entity, source );
stage = ForeignKeys.isNotTransient( event.getEntityName(), entity, Boolean.FALSE, source )
.thenApply(
trans -> {
if (!trans) {
throw new TransientObjectException(
"cannot lock an unsaved transient instance: " +
persister.getEntityName()
);
}

EntityEntry e = reassociate(event, entity, id, persister);
cascadeOnLock(event, persister, entity);
return e;
} );

}
else {
stage = CompletionStages.completedFuture(entry);
}

return stage.thenCompose( e -> upgradeLock( entity, e, event.getLockOptions(), event.getSession() ) );
}

private void cascadeOnLock(LockEvent event, EntityPersister persister, Object entity) {
EventSource source = event.getSession();
final PersistenceContext persistenceContext = source.getPersistenceContextInternal();
persistenceContext.incrementCascadeLevel();
try {
new Cascade(
CascadingActions.LOCK,
CascadePoint.AFTER_LOCK,
persister,
entity,
event.getLockOptions(),
source
).cascade();
}
finally {
persistenceContext.decrementCascadeLevel();
}
}

/**
* Performs a pessimistic lock upgrade on a given entity, if needed.
*
* @param object The entity for which to upgrade the lock.
* @param entry The entity's EntityEntry instance.
* @param lockOptions contains the requested lock mode.
* @param source The session which is the source of the event being processed.
*/
protected CompletionStage<Void> upgradeLock(Object object, EntityEntry entry,
LockOptions lockOptions,
EventSource source) {

LockMode requestedLockMode = lockOptions.getLockMode();
if ( requestedLockMode.greaterThan( entry.getLockMode() ) ) {
// The user requested a "greater" (i.e. more restrictive) form of
// pessimistic lock

if ( entry.getStatus() != Status.MANAGED ) {
throw new ObjectDeletedException(
"attempted to lock a deleted instance",
entry.getId(),
entry.getPersister().getEntityName()
);
}

final EntityPersister persister = entry.getPersister();

if ( log.isTraceEnabled() ) {
log.tracev(
"Locking {0} in mode: {1}",
MessageHelper.infoString( persister, entry.getId(), source.getFactory() ),
requestedLockMode
);
}

final boolean cachingEnabled = persister.canWriteToCache();
final SoftLock lock;
final Object ck;
if ( cachingEnabled ) {
EntityDataAccess cache = persister.getCacheAccessStrategy();
ck = cache.generateCacheKey(
entry.getId(),
persister,
source.getFactory(),
source.getTenantIdentifier()
);
lock = cache.lockItem( source, ck, entry.getVersion() );
}
else {
lock = null;
ck = null;
}

return ((ReactiveEntityPersister) persister).lockReactive(
entry.getId(),
entry.getVersion(),
object,
lockOptions,
source
).thenAccept( v -> entry.setLockMode(requestedLockMode) )
.whenComplete( (r, e) -> {
// the database now holds a lock + the object is flushed from the cache,
// so release the soft lock
if ( cachingEnabled ) {
persister.getCacheAccessStrategy().unlockItem( source, ck, lock );
}
} );

}
else {
return CompletionStages.nullFuture();
}
}

@Override
public void onLock(LockEvent event) throws HibernateException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
/**
* A reactific {@link org.hibernate.event.internal.DefaultMergeEventListener}.
*/
public class DefaultReactiveMergeEventListener extends AbstractReactiveSaveEventListener<MergeContext> implements ReactiveMergeEventListener, MergeEventListener {
public class DefaultReactiveMergeEventListener extends AbstractReactiveSaveEventListener<MergeContext>
implements ReactiveMergeEventListener, MergeEventListener {

private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveMergeEventListener.class );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class DefaultReactivePersistEventListener
extends AbstractReactiveSaveEventListener<IdentitySet>
implements PersistEventListener, ReactivePersistEventListener, CallbackRegistryConsumer {

private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactivePersistEventListener.class );

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
/**
* A reactific {@link org.hibernate.event.internal.DefaultRefreshEventListener}.
*/
public class DefaultReactiveRefreshEventListener implements RefreshEventListener, ReactiveRefreshEventListener {
public class DefaultReactiveRefreshEventListener
implements RefreshEventListener, ReactiveRefreshEventListener {

private static final CoreMessageLogger LOG = CoreLogging.messageLogger( DefaultReactiveRefreshEventListener.class );

public CompletionStage<Void> reactiveOnRefresh(RefreshEvent event) throws HibernateException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: LGPL-2.1-or-later
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.event.spi;

import org.hibernate.HibernateException;
import org.hibernate.event.spi.LockEvent;

import java.io.Serializable;
import java.util.concurrent.CompletionStage;

/**
* Defines the contract for handling of lock events generated from a session.
*
* @author Steve Ebersole
*/
public interface ReactiveLockEventListener extends Serializable {

/**
* Handle the given lock event.
*
* @param event The lock event to be handled.
*/
CompletionStage<Void> reactiveOnLock(LockEvent event) throws HibernateException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,21 @@ interface Session extends AutoCloseable {
*/
Uni<Session> refresh(Object... entities);

/**
* Obtain the specified lock level upon the given object. For example, this
* may be used to perform a version check with {@link LockMode#READ}, or to
* upgrade to a pessimistic lock with {@link LockMode#PESSIMISTIC_WRITE}.
* This operation cascades to associated instances if the association is
* mapped with {@code cascade="lock"}.
*
* Note that the optimistic lock modes {@link LockMode#OPTIMISTIC} and
* {@link LockMode#OPTIMISTIC_FORCE_INCREMENT} are not currently supported.
*
* @param entity a persistent or transient instance
* @param lockMode the lock level
*/
Uni<Session> lock(Object entity, LockMode lockMode);

/**
* Force this session to flush asynchronously. Must be called at the
* end of a unit of work, before committing the transaction and closing
Expand Down
Loading

0 comments on commit 146fbf9

Please sign in to comment.