Skip to content

Commit

Permalink
Use RefCounters to track open resources #196
Browse files Browse the repository at this point in the history
The DefaultEventLoopProvider tracks now resources using a ref-counter mechanism. Resources in use are no longer closed by the DefaultEventLoopProvider when calling release(...). Only the last call to release(...) causes the resource to be shut down.
  • Loading branch information
mp911de committed Feb 16, 2016
1 parent 8aeb185 commit e49eb2a
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Maps;
import com.lambdaworks.redis.EpollProvider;

import com.lambdaworks.redis.output.BooleanListOutput;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.*;
Expand All @@ -27,6 +27,8 @@ public class DefaultEventLoopGroupProvider implements EventLoopGroupProvider {
protected static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultEventLoopGroupProvider.class);

private final Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = new ConcurrentHashMap<Class<? extends EventExecutorGroup>, EventExecutorGroup>();
private final Map<ExecutorService, Long> refCounter = new ConcurrentHashMap<>();

private final int numberOfThreads;

private volatile boolean shutdownCalled = false;
Expand All @@ -43,8 +45,47 @@ public DefaultEventLoopGroupProvider(int numberOfThreads) {
@Override
public <T extends EventLoopGroup> T allocate(Class<T> type) {
synchronized (this) {
return getOrCreate(type);
return addReference(getOrCreate(type));
}
}

private <T extends ExecutorService> T addReference(T reference) {

synchronized (refCounter){
long counter = 0;
if(refCounter.containsKey(reference)){
counter = refCounter.get(reference);
}

logger.debug("Adding reference to {}, existing ref count {}", reference, counter);
counter++;
refCounter.put(reference, counter);
}

return reference;
}

private <T extends ExecutorService> T release(T reference) {

synchronized (refCounter) {
long counter = 0;
if (refCounter.containsKey(reference)) {
counter = refCounter.get(reference);
}

if (counter < 1) {
logger.debug("Attempting to release {} but ref count is {}", reference, counter);
}

counter--;
if (counter == 0) {
refCounter.remove(reference);
} else {
refCounter.put(reference, counter);
}
}

return reference;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -93,9 +134,9 @@ public static <T extends EventExecutorGroup> EventExecutorGroup createEventLoopG
@Override
public Promise<Boolean> release(EventExecutorGroup eventLoopGroup, long quietPeriod, long timeout, TimeUnit unit) {

Class<?> key = getKey(eventLoopGroup);
Class<?> key = getKey(release(eventLoopGroup));

if (key == null && eventLoopGroup.isShuttingDown()) {
if ((key == null && eventLoopGroup.isShuttingDown()) || refCounter.containsKey(eventLoopGroup)) {
DefaultPromise<Boolean> promise = new DefaultPromise<Boolean>(GlobalEventExecutor.INSTANCE);
promise.setSuccess(true);
return promise;
Expand Down Expand Up @@ -144,7 +185,7 @@ public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUni
aggregator.arm();

for (EventExecutorGroup executorGroup : copy.values()) {
Promise<Boolean> shutdown = toBooleanPromise(executorGroup.shutdownGracefully(quietPeriod, timeout, timeUnit));
Promise<Boolean> shutdown = toBooleanPromise(release(executorGroup, quietPeriod, timeout, timeUnit));
aggregator.add(shutdown);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public interface EventLoopGroupProvider {

/**
* Retrieve a {@link EventLoopGroup} for the type {@code type}. Do not terminate or shutdown the instance. Call the
* {@link #shutdown(long, long, TimeUnit)} method to free the resources.
* {@link #release(EventExecutorGroup, long, long, TimeUnit)} to release an individual instance or
* {@link #shutdown(long, long, TimeUnit)} method to free the all resources.
*
* @param type type of the event loop group, must not be {@literal null}
* @param <T> type parameter
Expand All @@ -44,7 +45,7 @@ public interface EventLoopGroupProvider {
int threadPoolSize();

/**
* Release the {@code eventLoopGroup} instance. The method will shutdown/terminate the event loop group if it is no longer
* Release a {@code eventLoopGroup} instance. The method will shutdown/terminate the event loop group if it is no longer
* needed.
*
* @param eventLoopGroup the eventLoopGroup instance, must not be {@literal null}
Expand Down
111 changes: 111 additions & 0 deletions src/test/java/com/lambdaworks/redis/RedisClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.lambdaworks.redis;

import static org.assertj.core.api.Assertions.assertThat;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import com.lambdaworks.redis.resource.ClientResources;
import com.lambdaworks.redis.resource.DefaultClientResources;
import com.lambdaworks.redis.resource.DefaultEventLoopGroupProvider;
import io.netty.util.concurrent.EventExecutorGroup;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class RedisClientTest {

@Test
public void reuseClientConnections() throws Exception {

// given
DefaultClientResources clientResources = DefaultClientResources.create();
Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources);

RedisClient redisClient1 = newClient(clientResources);
RedisClient redisClient2 = newClient(clientResources);
connectAndClose(redisClient1);
connectAndClose(redisClient2);

// when
EventExecutorGroup executor = eventLoopGroups.values().iterator().next();
redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS);

// then
connectAndClose(redisClient2);

clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS).get();

assertThat(eventLoopGroups).isEmpty();
assertThat(executor.isShuttingDown()).isTrue();
assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue();
}

@Test
public void reuseClientConnectionsShutdownTwoClients() throws Exception {

// given
DefaultClientResources clientResources = DefaultClientResources.create();
Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources);

RedisClient redisClient1 = newClient(clientResources);
RedisClient redisClient2 = newClient(clientResources);
connectAndClose(redisClient1);
connectAndClose(redisClient2);

// when
EventExecutorGroup executor = eventLoopGroups.values().iterator().next();

redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS);
assertThat(executor.isShutdown()).isFalse();
connectAndClose(redisClient2);
redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS);

// then
assertThat(eventLoopGroups).isEmpty();
assertThat(executor.isShutdown()).isTrue();
assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isFalse();

// cleanup
clientResources.shutdown(0, 0, TimeUnit.MILLISECONDS).get();
assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue();
}

@Test
public void managedClientResources() throws Exception {

// given
RedisClient redisClient1 = RedisClient.create(RedisURI.create(TestSettings.host(), TestSettings.port()));
ClientResources clientResources = redisClient1.getResources();
Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> eventLoopGroups = getExecutors(clientResources);
connectAndClose(redisClient1);

// when
EventExecutorGroup executor = eventLoopGroups.values().iterator().next();

redisClient1.shutdown(0, 0, TimeUnit.MILLISECONDS);

// then
assertThat(eventLoopGroups).isEmpty();
assertThat(executor.isShuttingDown()).isTrue();
assertThat(clientResources.eventExecutorGroup().isShuttingDown()).isTrue();
}

private void connectAndClose(RedisClient client) {
client.connect().close();
}

private RedisClient newClient(DefaultClientResources clientResources) {
return RedisClient.create(clientResources, RedisURI.create(TestSettings.host(), TestSettings.port()));
}

private Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> getExecutors(ClientResources clientResources)
throws Exception {
Field eventLoopGroupsField = DefaultEventLoopGroupProvider.class.getDeclaredField("eventLoopGroups");
eventLoopGroupsField.setAccessible(true);
return (Map) eventLoopGroupsField.get(clientResources.eventLoopGroupProvider());
}
}

0 comments on commit e49eb2a

Please sign in to comment.