diff --git a/common/src/main/java/org/keycloak/common/util/Retry.java b/common/src/main/java/org/keycloak/common/util/Retry.java index 05894afb358b..d3225b9b9c6e 100644 --- a/common/src/main/java/org/keycloak/common/util/Retry.java +++ b/common/src/main/java/org/keycloak/common/util/Retry.java @@ -18,7 +18,7 @@ package org.keycloak.common.util; import java.time.Duration; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; /** * @author Stian Thorgersen @@ -125,8 +125,8 @@ public static int executeWithBackoff(AdvancedRunnable runnable, ThrowableCallbac } } - private static int computeBackoffInterval(int base, int iteration) { - return new Random().nextInt(computeIterationBase(base, iteration)); + public static int computeBackoffInterval(int base, int iteration) { + return ThreadLocalRandom.current().nextInt(computeIterationBase(base, iteration)); } private static int computeIterationBase(int base, int iteration) { diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java index 7f9bc7bd1e84..d0907221d925 100644 --- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java +++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; @@ -36,6 +37,7 @@ import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.context.Flag; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; @@ -54,7 +56,6 @@ import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory; import org.keycloak.executors.ExecutorsProvider; import org.keycloak.models.KeycloakSession; -import org.infinispan.client.hotrod.exceptions.HotRodClientException; import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX; @@ -67,13 +68,16 @@ public class InfinispanNotificationsManager { protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class); + private static final int BACKOFF_BASE_MILLIS = 10; + private static final int MAX_BACKOFF_RETRIES = 10; + private final ConcurrentMultivaluedHashMap listeners = new ConcurrentMultivaluedHashMap<>(); private final ConcurrentMap taskCallbacks = new ConcurrentHashMap<>(); private final Cache workCache; - private final RemoteCache workRemoteCache; + private final RemoteCache workRemoteCache; private final String myAddress; @@ -81,8 +85,7 @@ public class InfinispanNotificationsManager { private final ExecutorService listenersExecutor; - - protected InfinispanNotificationsManager(Cache workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) { + protected InfinispanNotificationsManager(Cache workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) { this.workCache = workCache; this.workRemoteCache = workRemoteCache; this.myAddress = myAddress; @@ -93,7 +96,7 @@ protected InfinispanNotificationsManager(Cache workCache, // Create and init manager including all listeners etc public static InfinispanNotificationsManager create(KeycloakSession session, Cache workCache, String myAddress, String mySite, Set remoteStores) { - RemoteCache workRemoteCache = null; + RemoteCache workRemoteCache = null; if (!remoteStores.isEmpty()) { RemoteStore remoteStore = remoteStores.iterator().next(); @@ -189,12 +192,12 @@ public void cacheEntryCreated(CacheEntryCreatedEvent event @CacheEntryModified public void cacheEntryModified(CacheEntryModifiedEvent event) { - eventReceived(event.getKey(), event.getValue()); + eventReceived(event.getKey(), event.getNewValue()); } @CacheEntryRemoved public void cacheEntryRemoved(CacheEntryRemovedEvent event) { - taskFinished(event.getKey(), true); + taskFinished(event.getKey()); } } @@ -203,31 +206,28 @@ public void cacheEntryRemoved(CacheEntryRemovedEvent event @ClientListener public class HotRodListener { - private final RemoteCache remoteCache; + private final RemoteCache remoteCache; - public HotRodListener(RemoteCache remoteCache) { + public HotRodListener(RemoteCache remoteCache) { this.remoteCache = remoteCache; } @ClientCacheEntryCreated - public void created(ClientCacheEntryCreatedEvent event) { - String key = event.getKey().toString(); - hotrodEventReceived(key); + public void created(ClientCacheEntryCreatedEvent event) { + hotrodEventReceived(event.getKey()); } @ClientCacheEntryModified - public void updated(ClientCacheEntryModifiedEvent event) { - String key = event.getKey().toString(); - hotrodEventReceived(key); + public void updated(ClientCacheEntryModifiedEvent event) { + hotrodEventReceived(event.getKey()); } @ClientCacheEntryRemoved - public void removed(ClientCacheEntryRemovedEvent event) { - String key = event.getKey().toString(); - taskFinished(key, true); + public void removed(ClientCacheEntryRemovedEvent event) { + taskFinished(event.getKey()); } @@ -235,11 +235,22 @@ private void hotrodEventReceived(String key) { // TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request try { listenersExecutor.submit(() -> { - Object value = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> - // We've seen deadlocks in Infinispan 14.x when shutting down Infinispan concurrently, therefore wrapping this - remoteCache.get(key) - ); - eventReceived(key, (Serializable) value); + Supplier fetchEvent = () -> remoteCache.get(key); + Serializable event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent); + int iteration = 0; + // Event might have been generated from a node which is more up-to-date, so the fetch might return null. + // Retry until we find a node that is up-to-date and has the entry. + while (event == null && iteration < MAX_BACKOFF_RETRIES) { + ++iteration; + try { + Thread.sleep(Retry.computeBackoffInterval(BACKOFF_BASE_MILLIS, iteration)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent); + } + eventReceived(key, event); }); } catch (RejectedExecutionException ree) { @@ -254,11 +265,10 @@ private void hotrodEventReceived(String key) { } } } - } private void eventReceived(String key, Serializable obj) { - if (!(obj instanceof WrapperClusterEvent)) { + if (!(obj instanceof WrapperClusterEvent event)) { // Items with the TASK_KEY_PREFIX might be gone fast once the locking is complete, therefore, don't log them. // It is still good to have the warning in case of real events return null because they have been, for example, expired if (obj == null && !key.startsWith(TASK_KEY_PREFIX)) { @@ -267,8 +277,6 @@ private void eventReceived(String key, Serializable obj) { return; } - WrapperClusterEvent event = (WrapperClusterEvent) obj; - if (event.isIgnoreSender()) { if (this.myAddress.equals(event.getSender())) { return; @@ -298,16 +306,15 @@ private void eventReceived(String key, Serializable obj) { } - void taskFinished(String taskKey, boolean success) { + void taskFinished(String taskKey) { TaskCallback callback = taskCallbacks.remove(taskKey); if (callback != null) { if (logger.isDebugEnabled()) { - logger.debugf("Finished task '%s' with '%b'", taskKey, success); + logger.debugf("Finished task '%s' with '%b'", taskKey, true); } - callback.setSuccess(success); + callback.setSuccess(true); callback.getTaskCompletedLatch().countDown(); } - } }