From e49eb2aa28c0b18b7c22485f5c6e9639bf970a85 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 16 Feb 2016 17:20:46 +0100 Subject: [PATCH] Use RefCounters to track open resources #196 The DefaultEventLoopProvider tracks now resources using a ref-counter mechanism. Resources in use are no longer closed by the DefaultEventLoopProvider when calling release(...). Only the last call to release(...) causes the resource to be shut down. --- .../DefaultEventLoopGroupProvider.java | 51 +++++++- .../resource/EventLoopGroupProvider.java | 5 +- .../lambdaworks/redis/RedisClientTest.java | 111 ++++++++++++++++++ 3 files changed, 160 insertions(+), 7 deletions(-) create mode 100644 src/test/java/com/lambdaworks/redis/RedisClientTest.java diff --git a/src/main/java/com/lambdaworks/redis/resource/DefaultEventLoopGroupProvider.java b/src/main/java/com/lambdaworks/redis/resource/DefaultEventLoopGroupProvider.java index cbfb813979..e3fd3e733e 100644 --- a/src/main/java/com/lambdaworks/redis/resource/DefaultEventLoopGroupProvider.java +++ b/src/main/java/com/lambdaworks/redis/resource/DefaultEventLoopGroupProvider.java @@ -4,12 +4,12 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.collect.Maps; import com.lambdaworks.redis.EpollProvider; -import com.lambdaworks.redis.output.BooleanListOutput; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.*; @@ -27,6 +27,8 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider { protected static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultEventLoopGroupProvider.class); private final Map, EventExecutorGroup> eventLoopGroups = new ConcurrentHashMap, EventExecutorGroup>(); + private final Map refCounter = new ConcurrentHashMap<>(); + private final int numberOfThreads; private volatile boolean shutdownCalled = false; @@ -43,8 +45,47 @@ public DefaultEventLoopGroupProvider(int numberOfThreads) { @Override public T allocate(Class type) { synchronized (this) { - return getOrCreate(type); + return addReference(getOrCreate(type)); + } + } + + private T addReference(T reference) { + + synchronized (refCounter){ + long counter = 0; + if(refCounter.containsKey(reference)){ + counter = refCounter.get(reference); + } + + logger.debug("Adding reference to {}, existing ref count {}", reference, counter); + counter++; + refCounter.put(reference, counter); + } + + return reference; + } + + private T release(T reference) { + + synchronized (refCounter) { + long counter = 0; + if (refCounter.containsKey(reference)) { + counter = refCounter.get(reference); + } + + if (counter < 1) { + logger.debug("Attempting to release {} but ref count is {}", reference, counter); + } + + counter--; + if (counter == 0) { + refCounter.remove(reference); + } else { + refCounter.put(reference, counter); + } } + + return reference; } @SuppressWarnings("unchecked") @@ -93,9 +134,9 @@ public static EventExecutorGroup createEventLoopG @Override public Promise release(EventExecutorGroup eventLoopGroup, long quietPeriod, long timeout, TimeUnit unit) { - Class key = getKey(eventLoopGroup); + Class key = getKey(release(eventLoopGroup)); - if (key == null && eventLoopGroup.isShuttingDown()) { + if ((key == null && eventLoopGroup.isShuttingDown()) || refCounter.containsKey(eventLoopGroup)) { DefaultPromise promise = new DefaultPromise(GlobalEventExecutor.INSTANCE); promise.setSuccess(true); return promise; @@ -144,7 +185,7 @@ public Future shutdown(long quietPeriod, long timeout, TimeUnit timeUni aggregator.arm(); for (EventExecutorGroup executorGroup : copy.values()) { - Promise shutdown = toBooleanPromise(executorGroup.shutdownGracefully(quietPeriod, timeout, timeUnit)); + Promise shutdown = toBooleanPromise(release(executorGroup, quietPeriod, timeout, timeUnit)); aggregator.add(shutdown); } diff --git a/src/main/java/com/lambdaworks/redis/resource/EventLoopGroupProvider.java b/src/main/java/com/lambdaworks/redis/resource/EventLoopGroupProvider.java index 5a3a40bca9..747b122ae6 100644 --- a/src/main/java/com/lambdaworks/redis/resource/EventLoopGroupProvider.java +++ b/src/main/java/com/lambdaworks/redis/resource/EventLoopGroupProvider.java @@ -27,7 +27,8 @@ public interface EventLoopGroupProvider { /** * Retrieve a {@link EventLoopGroup} for the type {@code type}. Do not terminate or shutdown the instance. Call the - * {@link #shutdown(long, long, TimeUnit)} method to free the resources. + * {@link #release(EventExecutorGroup, long, long, TimeUnit)} to release an individual instance or + * {@link #shutdown(long, long, TimeUnit)} method to free the all resources. * * @param type type of the event loop group, must not be {@literal null} * @param type parameter @@ -44,7 +45,7 @@ public interface EventLoopGroupProvider { int threadPoolSize(); /** - * Release the {@code eventLoopGroup} instance. The method will shutdown/terminate the event loop group if it is no longer + * Release a {@code eventLoopGroup} instance. The method will shutdown/terminate the event loop group if it is no longer * needed. * * @param eventLoopGroup the eventLoopGroup instance, must not be {@literal null} diff --git a/src/test/java/com/lambdaworks/redis/RedisClientTest.java b/src/test/java/com/lambdaworks/redis/RedisClientTest.java new file mode 100644 index 0000000000..0c8a4794f6 --- /dev/null +++ b/src/test/java/com/lambdaworks/redis/RedisClientTest.java @@ -0,0 +1,111 @@ +package com.lambdaworks.redis; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import com.lambdaworks.redis.resource.ClientResources; +import com.lambdaworks.redis.resource.DefaultClientResources; +import com.lambdaworks.redis.resource.DefaultEventLoopGroupProvider; +import io.netty.util.concurrent.EventExecutorGroup; + +/** + * @author Mark Paluch + */ +public class RedisClientTest { + + @Test + public void reuseClientConnections() throws Exception { + + // given + DefaultClientResources clientResources = DefaultClientResources.create(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + + RedisClient redisClient1 = newClient(clientResources); + RedisClient redisClient2 = newClient(clientResources); + connectAndClose(redisClient1); + connectAndClose(redisClient2); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + connectAndClose(redisClient2); + + clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS).get(); + + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShuttingDown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + @Test + public void reuseClientConnectionsShutdownTwoClients() throws Exception { + + // given + DefaultClientResources clientResources = DefaultClientResources.create(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + + RedisClient redisClient1 = newClient(clientResources); + RedisClient redisClient2 = newClient(clientResources); + connectAndClose(redisClient1); + connectAndClose(redisClient2); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + + redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + assertThat(executor.isShutdown()).isFalse(); + connectAndClose(redisClient2); + redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShutdown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isFalse(); + + // cleanup + clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS).get(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + @Test + public void managedClientResources() throws Exception { + + // given + RedisClient redisClient1 = RedisClient.create(RedisURI.create(TestSettings.host(), TestSettings.port())); + ClientResources clientResources = redisClient1.getResources(); + Map, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources); + connectAndClose(redisClient1); + + // when + EventExecutorGroup executor = eventLoopGroups.values().iterator().next(); + + redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS); + + // then + assertThat(eventLoopGroups).isEmpty(); + assertThat(executor.isShuttingDown()).isTrue(); + assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue(); + } + + private void connectAndClose(RedisClient client) { + client.connect().close(); + } + + private RedisClient newClient(DefaultClientResources clientResources) { + return RedisClient.create(clientResources, RedisURI.create(TestSettings.host(), TestSettings.port())); + } + + private Map, EventExecutorGroup> getExecutors(ClientResources clientResources) + throws Exception { + Field eventLoopGroupsField = DefaultEventLoopGroupProvider.class.getDeclaredField("eventLoopGroups"); + eventLoopGroupsField.setAccessible(true); + return (Map) eventLoopGroupsField.get(clientResources.eventLoopGroupProvider()); + } +} \ No newline at end of file