diff --git a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java index e7e39822c351f..f2a414678229c 100644 --- a/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java +++ b/extensions/redis-cache/runtime/src/main/java/io/quarkus/cache/redis/runtime/RedisCacheImpl.java @@ -25,6 +25,7 @@ import io.smallrye.mutiny.unchecked.Unchecked; import io.smallrye.mutiny.unchecked.UncheckedFunction; import io.smallrye.mutiny.vertx.MutinyHelper; +import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.redis.client.Command; import io.vertx.mutiny.redis.client.Redis; @@ -102,6 +103,11 @@ public RedisCacheImpl(RedisCacheInfo cacheInfo, Vertx vertx, Redis redis, Suppli this.redis = redis; } + private static boolean isRecomputableError(Throwable error) { + return error instanceof ConnectException + || error instanceof ConnectionPoolTooBusyException; + } + private Class loadClass(String type) throws ClassNotFoundException { if (PRIMITIVE_TO_CLASS_MAPPING.containsKey(type)) { return PRIMITIVE_TO_CLASS_MAPPING.get(type); @@ -211,7 +217,7 @@ public Uni apply(V value) { } }) - .onFailure(ConnectException.class).recoverWithUni(new Function>() { + .onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(new Function>() { @Override public Uni apply(Throwable e) { log.warn("Unable to connect to Redis, recomputing cached value", e); @@ -260,7 +266,7 @@ public Uni apply(RedisConnection connection) { }); } }) - .onFailure(ConnectException.class).recoverWithUni(e -> { + .onFailure(RedisCacheImpl::isRecomputableError).recoverWithUni(e -> { log.warn("Unable to connect to Redis, recomputing cached value", e); return valueLoader.apply(key); }); diff --git a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java index c1e7466a4ffbe..adf1bf53a1d0b 100644 --- a/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java +++ b/extensions/redis-cache/runtime/src/test/java/io/quarkus/cache/redis/runtime/RedisCacheImplTest.java @@ -6,7 +6,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Duration; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -20,8 +22,10 @@ import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.json.Json; import io.vertx.mutiny.redis.client.Command; +import io.vertx.mutiny.redis.client.Redis; import io.vertx.mutiny.redis.client.Request; import io.vertx.mutiny.redis.client.Response; +import io.vertx.redis.client.RedisOptions; class RedisCacheImplTest extends RedisCacheTestBase { @@ -50,6 +54,33 @@ public void testPutInTheCache() { assertThat(r).isNotNull(); } + @Test + public void testExhaustConnectionPool() { + String k = UUID.randomUUID().toString(); + RedisCacheInfo info = new RedisCacheInfo(); + info.name = "foo"; + info.valueType = String.class.getName(); + info.expireAfterWrite = Optional.of(Duration.ofSeconds(2)); + + Redis redis = Redis.createClient(vertx, new RedisOptions() + .setMaxPoolSize(1) + .setMaxPoolWaiting(0) + .setConnectionString("redis://" + server.getHost() + ":" + server.getFirstMappedPort())); + + RedisCacheImpl cache = new RedisCacheImpl(info, vertx, redis, BLOCKING_ALLOWED); + + List> responses = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + responses.add(cache.get(k, s -> "hello")); + } + + final var values = Uni.combine().all().unis(responses).with(list -> list).await().indefinitely(); + assertThat(values).isNotEmpty().allMatch(value -> value.equals("hello")); + + var r = redis.send(Request.cmd(Command.GET).arg("cache:foo:" + k)).await().indefinitely(); + assertThat(r).isNotNull(); + } + @Test public void testPutInTheCacheWithoutRedis() { String k = UUID.randomUUID().toString();