Skip to content

Commit

Permalink
update DefaultReactiveListeners
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinking committed Apr 9, 2023
1 parent b1f63b6 commit bfe244f
Show file tree
Hide file tree
Showing 17 changed files with 1,182 additions and 938 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ public ReactiveCollectionRemoveAction(
this.affectedOwner = session.getPersistenceContextInternal().getLoadedCollectionOwnerOrNull( collection );
}

/**
* Removes a persistent collection for an unloaded proxy.
*
* Use this constructor when the owning entity is has not been loaded.
* @param persister The collection's persister
* @param id The collection key
* @param session The session
*/
public ReactiveCollectionRemoveAction(
final CollectionPersister persister,
final Object id,
final EventSource session) {
super( persister, null, id, session );
emptySnapshot = false;
affectedOwner = null;
}

@Override
public CompletionStage<Void> reactiveExecute() {
final Object key = getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.hibernate.action.internal.EntityDeleteAction;
import org.hibernate.cache.spi.access.EntityDataAccess;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.engine.spi.EntityKey;
import org.hibernate.engine.spi.PersistenceContext;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.event.spi.EventSource;
Expand Down Expand Up @@ -43,71 +44,119 @@ public ReactiveEntityDeleteAction(
super( id, state, version, instance, persister, isCascadeDeleteEnabled, session );
}

public ReactiveEntityDeleteAction(Object id, EntityPersister persister, EventSource session) {
super( id, persister, session );
}

@Override
public void execute() throws HibernateException {
throw LOG.nonReactiveMethodCall( "reactiveExecute" );
}

private boolean isInstanceLoaded() {
// A null instance signals that we're deleting an unloaded proxy.
return getInstance() != null;
}

@Override
public CompletionStage<Void> reactiveExecute() throws HibernateException {
final Object id = getId();
final Object version = getCurrentVersion();
final EntityPersister persister = getPersister();
final SharedSessionContractImplementor session = getSession();
final Object instance = getInstance();

final boolean veto = preDelete();
final boolean veto = isInstanceLoaded() && preDelete();

Object version = getVersion();
if ( persister.isVersionPropertyGenerated() ) {
// we need to grab the version value from the entity, otherwise
// we have issues with generated-version entities that may have
// multiple actions queued during the same flush
version = persister.getVersion( instance );
}
final Object ck = lockCacheItem();

final Object ck;
if ( persister.canWriteToCache() ) {
final EntityDataAccess cache = persister.getCacheAccessStrategy();
ck = cache.generateCacheKey( id, persister, session.getFactory(), session.getTenantIdentifier() );
setLock( cache.lockItem( session, ck, version ) );
}
else {
ck = null;
}

CompletionStage<Void> deleteStep = !isCascadeDeleteEnabled() && !veto
final CompletionStage<Void> deleteStep = !isCascadeDeleteEnabled() && !veto
? ( (ReactiveEntityPersister) persister ).deleteReactive( id, version, instance, session )
: voidFuture();

return deleteStep.thenAccept( v -> {
//postDelete:
// After actually deleting a row, record the fact that the instance no longer
// exists on the database (needed for identity-column key generation), and
// remove it from the session cache
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
final EntityEntry entry = persistenceContext.removeEntry( instance );
if ( entry == null ) {
throw new AssertionFailure( "possible non-threadsafe access to session" );
if ( isInstanceLoaded() ) {
postDeleteLoaded( id, persister, session, instance, ck );
}
entry.postDelete();

persistenceContext.removeEntity( entry.getEntityKey() );
persistenceContext.removeProxy( entry.getEntityKey() );

if ( persister.canWriteToCache() ) {
persister.getCacheAccessStrategy().remove( session, ck );
else {
// we're deleting an unloaded proxy
postDeleteUnloaded( id, persister, session, ck );
}

persistenceContext.getNaturalIdResolutions()
.removeSharedResolution( id, getNaturalIdValues(), persister );

postDelete();

final StatisticsImplementor statistics = getSession().getFactory().getStatistics();
if ( statistics.isStatisticsEnabled() && !veto ) {
statistics.deleteEntity( getPersister().getEntityName() );
}
} );
}

//TODO: copy/paste from superclass (make it protected!)
private Object getCurrentVersion() {
return getPersister().isVersionPropertyGenerated()
// skip if we're deleting an unloaded proxy, no need for the version
&& isInstanceLoaded()
// we need to grab the version value from the entity, otherwise
// we have issues with generated-version entities that may have
// multiple actions queued during the same flush
? getPersister().getVersion( getInstance() )
: getVersion();
}

//TODO: copy/paste of postDeleteLoaded() from superclass (make it protected!)
private void postDeleteLoaded(
Object id,
EntityPersister persister,
SharedSessionContractImplementor session,
Object instance,
Object ck) {
// After actually deleting a row, record the fact that the instance no longer
// exists on the database (needed for identity-column key generation), and
// remove it from the session cache
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
final EntityEntry entry = persistenceContext.removeEntry(instance);
if ( entry == null ) {
throw new AssertionFailure( "possible non-threadsafe access to session" );
}
entry.postDelete();
final EntityKey key = entry.getEntityKey();
persistenceContext.removeEntity( key );
persistenceContext.removeProxy( key );
removeCacheItem( ck );
persistenceContext.getNaturalIdResolutions().removeSharedResolution( id, getNaturalIdValues(), persister );
postDelete();
}

//TODO: copy/paste of postDeleteUnloaded() from superclass (make it protected!)
private void postDeleteUnloaded(Object id, EntityPersister persister, SharedSessionContractImplementor session, Object ck) {
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
final EntityKey key = session.generateEntityKey( id, persister );
if ( !persistenceContext.containsDeletedUnloadedEntityKey( key ) ) {
throw new AssertionFailure( "deleted proxy should be for an unloaded entity: " + key );
}
persistenceContext.removeProxy( key );
removeCacheItem( ck );
}

//TODO: copy/paste from superclass (make it protected!)
private Object lockCacheItem() {
final EntityPersister persister = getPersister();
if ( persister.canWriteToCache() ) {
final EntityDataAccess cache = persister.getCacheAccessStrategy();
final SharedSessionContractImplementor session = getSession();
final Object ck = cache.generateCacheKey( getId(), persister, session.getFactory(), session.getTenantIdentifier() );
setLock( cache.lockItem( session, ck, getCurrentVersion() ) );
return ck;
}
else {
return null;
}
}

//TODO: copy/paste from superclass (make it protected!)
private void removeCacheItem(Object ck) {
final EntityPersister persister = getPersister();
if ( persister.canWriteToCache() ) {
persister.getCacheAccessStrategy().remove( getSession(), ck);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.hibernate.reactive.event.impl;

import static org.hibernate.reactive.util.impl.CompletionStages.loop;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

import java.lang.invoke.MethodHandles;
import java.util.Map;
Expand All @@ -26,6 +25,7 @@
import org.hibernate.event.spi.FlushEntityEventListener;
import org.hibernate.event.spi.FlushEvent;
import org.hibernate.event.spi.PersistContext;
import org.hibernate.internal.util.EntityPrinter;
import org.hibernate.persister.entity.EntityPersister;
import org.hibernate.reactive.engine.ReactiveActionQueue;
import org.hibernate.reactive.engine.impl.Cascade;
Expand Down Expand Up @@ -54,15 +54,13 @@ protected CompletionStage<Void> performExecutions(EventSource session) {
// during-flush callbacks more leniency in regards to initializing proxies and
// lazy collections during their processing.
// For more information, see HHH-2763
return voidFuture()
.thenCompose( v -> {
session.getJdbcCoordinator().flushBeginning();
session.getPersistenceContext().setFlushing( true );
// we need to lock the collection caches before executing entity inserts/updates in order to
// account for bi-directional associations
actionQueue( session ).prepareActions();
return actionQueue( session ).executeActions();
} )
session.getJdbcCoordinator().flushBeginning();
session.getPersistenceContext().setFlushing( true );
// we need to lock the collection caches before executing entity inserts/updates
// in order to account for bidirectional associations
final ReactiveActionQueue actionQueue = actionQueue(session);
actionQueue.prepareActions();
return actionQueue.executeActions()
.whenComplete( (v, x) -> {
session.getPersistenceContext().setFlushing( false );
session.getJdbcCoordinator().flushEnding();
Expand All @@ -85,12 +83,12 @@ protected CompletionStage<Void> flushEverythingToExecutions(FlushEvent event) th

LOG.trace( "Flushing session" );

EventSource session = event.getSession();
final EventSource session = event.getSession();

final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
session.getInterceptor().preFlush( persistenceContext.managedEntitiesIterator() );

return prepareEntityFlushes(session, persistenceContext)
return prepareEntityFlushes( session, persistenceContext )
.thenAccept( v -> {
// we could move this inside if we wanted to
// tolerate collection initializations during
Expand All @@ -99,21 +97,46 @@ protected CompletionStage<Void> flushEverythingToExecutions(FlushEvent event) th
// now, any collections that are initialized
// inside this block do not get updated - they
// are ignored until the next flush
persistenceContext.setFlushing(true);
persistenceContext.setFlushing( true );
try {
int entityCount = flushEntities(event, persistenceContext);
int collectionCount = flushCollections(session, persistenceContext);
int entityCount = flushEntities( event, persistenceContext );
int collectionCount = flushCollections( session, persistenceContext );

event.setNumberOfEntitiesProcessed(entityCount);
event.setNumberOfCollectionsProcessed(collectionCount);
}
finally {
persistenceContext.setFlushing(false);
persistenceContext.setFlushing( false );
}

//some statistics
logFlushResults( event );
} );
}

//some statistics
// logFlushResults( event );
protected void logFlushResults(FlushEvent event) {
if ( !LOG.isDebugEnabled() ) {
return;
}
final EventSource session = event.getSession();
final PersistenceContext persistenceContext = session.getPersistenceContextInternal();
LOG.debugf(
"Flushed: %s insertions, %s updates, %s deletions to %s objects",
session.getActionQueue().numberOfInsertions(),
session.getActionQueue().numberOfUpdates(),
session.getActionQueue().numberOfDeletions(),
persistenceContext.getNumberOfManagedEntities()
);
LOG.debugf(
"Flushed: %s (re)creations, %s updates, %s removals to %s collections",
session.getActionQueue().numberOfCollectionCreations(),
session.getActionQueue().numberOfCollectionUpdates(),
session.getActionQueue().numberOfCollectionRemovals(),
persistenceContext.getCollectionEntriesSize()
);
new EntityPrinter( session.getFactory() ).toString(
persistenceContext.getEntitiesByKey().entrySet()
);
}

/**
Expand All @@ -125,17 +148,17 @@ private CompletionStage<Void> prepareEntityFlushes(EventSource session, Persiste

LOG.debug( "Processing flush-time cascades" );

PersistContext context = PersistContext.create();
final PersistContext context = PersistContext.create();
//safe from concurrent modification because of how concurrentEntries() is implemented on IdentityMap
Map.Entry<Object, EntityEntry>[] entries = persistenceContext.reentrantSafeEntityEntries();
final Map.Entry<Object, EntityEntry>[] entries = persistenceContext.reentrantSafeEntityEntries();
return loop(
entries,
index -> flushable( entries[index].getValue() ),
index -> cascadeOnFlush( session, entries[index].getValue().getPersister(), entries[index].getKey(), context ) );
}

private static boolean flushable(EntityEntry entry) {
Status status = entry.getStatus();
final Status status = entry.getStatus();
return status == Status.MANAGED
|| status == Status.SAVING
|| status == Status.READ_ONLY;
Expand Down Expand Up @@ -183,8 +206,8 @@ private int flushEntities(final FlushEvent event, final PersistenceContext persi

// Update the status of the object and if necessary, schedule an update

EntityEntry entry = me.getValue();
Status status = entry.getStatus();
final EntityEntry entry = me.getValue();
final Status status = entry.getStatus();

if ( status != Status.LOADING && status != Status.GONE ) {
final FlushEntityEvent entityEvent = new FlushEntityEvent( source, me.getKey(), entry );
Expand Down Expand Up @@ -316,7 +339,7 @@ protected void postFlush(SessionImplementor session) throws HibernateException {
}
else {
//otherwise recreate the mapping between the collection and its key
CollectionKey collectionKey = new CollectionKey(
final CollectionKey collectionKey = new CollectionKey(
collectionEntry.getLoadedPersister(),
collectionEntry.getLoadedKey()
);
Expand Down
Loading

0 comments on commit bfe244f

Please sign in to comment.