Skip to content

Commit

Permalink
Retry fetching event from remote cache
Browse files Browse the repository at this point in the history
Closes keycloak#28303

Signed-off-by: Pedro Ruivo <[email protected]>
Signed-off-by: Alexander Schwartz <[email protected]>
Co-authored-by: Alexander Schwartz <[email protected]>
  • Loading branch information
pruivo and ahus1 committed May 6, 2024
1 parent 1e47a0b commit 9d69239
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
6 changes: 3 additions & 3 deletions common/src/main/java/org/keycloak/common/util/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.keycloak.common.util;

import java.time.Duration;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
* @author <a href="mailto:[email protected]">Stian Thorgersen</a>
Expand Down Expand Up @@ -125,8 +125,8 @@ public static int executeWithBackoff(AdvancedRunnable runnable, ThrowableCallbac
}
}

private static int computeBackoffInterval(int base, int iteration) {
return new Random().nextInt(computeIterationBase(base, iteration));
public static int computeBackoffInterval(int base, int iteration) {
return ThreadLocalRandom.current().nextInt(computeIterationBase(base, iteration));
}

private static int computeIterationBase(int base, int iteration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
Expand All @@ -36,6 +37,7 @@
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
Expand All @@ -54,7 +56,6 @@
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
import org.keycloak.executors.ExecutorsProvider;
import org.keycloak.models.KeycloakSession;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;

import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX;

Expand All @@ -67,22 +68,24 @@ public class InfinispanNotificationsManager {

protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);

private static final int BACKOFF_BASE_MILLIS = 10;
private static final int MAX_BACKOFF_RETRIES = 10;

private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();

private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();

private final Cache<String, Serializable> workCache;

private final RemoteCache workRemoteCache;
private final RemoteCache<Object, Serializable> workRemoteCache;

private final String myAddress;

private final String mySite;

private final ExecutorService listenersExecutor;


protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache<Object, Serializable> workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
this.workCache = workCache;
this.workRemoteCache = workRemoteCache;
this.myAddress = myAddress;
Expand All @@ -93,7 +96,7 @@ protected InfinispanNotificationsManager(Cache<String, Serializable> workCache,

// Create and init manager including all listeners etc
public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
RemoteCache workRemoteCache = null;
RemoteCache<Object, Serializable> workRemoteCache = null;

if (!remoteStores.isEmpty()) {
RemoteStore remoteStore = remoteStores.iterator().next();
Expand Down Expand Up @@ -189,12 +192,12 @@ public void cacheEntryCreated(CacheEntryCreatedEvent<String, Serializable> event

@CacheEntryModified
public void cacheEntryModified(CacheEntryModifiedEvent<String, Serializable> event) {
eventReceived(event.getKey(), event.getValue());
eventReceived(event.getKey(), event.getNewValue());
}

@CacheEntryRemoved
public void cacheEntryRemoved(CacheEntryRemovedEvent<String, Serializable> event) {
taskFinished(event.getKey(), true);
taskFinished(event.getKey());
}

}
Expand All @@ -203,43 +206,51 @@ public void cacheEntryRemoved(CacheEntryRemovedEvent<String, Serializable> event
@ClientListener
public class HotRodListener {

private final RemoteCache<Object, Object> remoteCache;
private final RemoteCache<Object, Serializable> remoteCache;

public HotRodListener(RemoteCache<Object, Object> remoteCache) {
public HotRodListener(RemoteCache<Object, Serializable> remoteCache) {
this.remoteCache = remoteCache;
}


@ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent event) {
String key = event.getKey().toString();
hotrodEventReceived(key);
public void created(ClientCacheEntryCreatedEvent<String> event) {
hotrodEventReceived(event.getKey());
}


@ClientCacheEntryModified
public void updated(ClientCacheEntryModifiedEvent event) {
String key = event.getKey().toString();
hotrodEventReceived(key);
public void updated(ClientCacheEntryModifiedEvent<String> event) {
hotrodEventReceived(event.getKey());
}


@ClientCacheEntryRemoved
public void removed(ClientCacheEntryRemovedEvent event) {
String key = event.getKey().toString();
taskFinished(key, true);
public void removed(ClientCacheEntryRemovedEvent<String> event) {
taskFinished(event.getKey());
}


private void hotrodEventReceived(String key) {
// TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request
try {
listenersExecutor.submit(() -> {
Object value = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() ->
// We've seen deadlocks in Infinispan 14.x when shutting down Infinispan concurrently, therefore wrapping this
remoteCache.get(key)
);
eventReceived(key, (Serializable) value);
Supplier<Serializable> fetchEvent = () -> remoteCache.get(key);
Serializable event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
int iteration = 0;
// Event might have been generated from a node which is more up-to-date, so the fetch might return null.
// Retry until we find a node that is up-to-date and has the entry.
while (event == null && iteration < MAX_BACKOFF_RETRIES) {
++iteration;
try {
Thread.sleep(Retry.computeBackoffInterval(BACKOFF_BASE_MILLIS, iteration));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
}
eventReceived(key, event);

});
} catch (RejectedExecutionException ree) {
Expand All @@ -254,11 +265,10 @@ private void hotrodEventReceived(String key) {
}
}
}

}

private void eventReceived(String key, Serializable obj) {
if (!(obj instanceof WrapperClusterEvent)) {
if (!(obj instanceof WrapperClusterEvent event)) {
// Items with the TASK_KEY_PREFIX might be gone fast once the locking is complete, therefore, don't log them.
// It is still good to have the warning in case of real events return null because they have been, for example, expired
if (obj == null && !key.startsWith(TASK_KEY_PREFIX)) {
Expand All @@ -267,8 +277,6 @@ private void eventReceived(String key, Serializable obj) {
return;
}

WrapperClusterEvent event = (WrapperClusterEvent) obj;

if (event.isIgnoreSender()) {
if (this.myAddress.equals(event.getSender())) {
return;
Expand Down Expand Up @@ -298,16 +306,15 @@ private void eventReceived(String key, Serializable obj) {
}


void taskFinished(String taskKey, boolean success) {
void taskFinished(String taskKey) {
TaskCallback callback = taskCallbacks.remove(taskKey);

if (callback != null) {
if (logger.isDebugEnabled()) {
logger.debugf("Finished task '%s' with '%b'", taskKey, success);
logger.debugf("Finished task '%s' with '%b'", taskKey, true);
}
callback.setSuccess(success);
callback.setSuccess(true);
callback.getTaskCompletedLatch().countDown();
}

}
}

0 comments on commit 9d69239

Please sign in to comment.