diff --git a/src/main/java/net/logstash/logback/util/ThreadLocalHolder.java b/src/main/java/net/logstash/logback/util/ThreadLocalHolder.java
index e91fc2ff..555e53a2 100644
--- a/src/main/java/net/logstash/logback/util/ThreadLocalHolder.java
+++ b/src/main/java/net/logstash/logback/util/ThreadLocalHolder.java
@@ -17,30 +17,31 @@
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
+import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* Maintains a per-thread value created by the {@link Supplier} given to the constructor.
- *
+ *
*
A thread obtains the value by calling {@link #acquire()} and must release it after
* use by calling {@link #release()}. If the value is not released, subsequent calls to
* {@link #acquire()} will throw an {@link IllegalStateException}.
- *
+ *
*
Instances value may also implement the optional {@link ThreadLocalHolder.Lifecycle}
* interface if they wish to be notified when they are recycled or disposed.
- *
+ *
*
The holder keeps track of each requesting thread and takes care of disposing the
* allocated value when it dies.
- *
+ *
* All allocated values are automatically disposed when {@link ThreadLocalHolder#close()}
* is called.
- *
+ *
*
Note: This class is for internal use only and subject to backward incompatible change
* at any time.
- *
+ *
* @param type of instances returned by this {@link ThreadLocalHolder}.
*
* @author brenuart
@@ -51,92 +52,92 @@ public class ThreadLocalHolder {
* The factory used to create new instances
*/
private final Supplier factory;
-
+
/**
* ThreadLocal holding per-thread instances
*/
private final ThreadLocal> threadLocal = ThreadLocal.withInitial(this::initializeThread);
-
+
/**
* Collection of values assigned to each thread
*/
- protected final CopyOnWriteArrayList threadValues = new CopyOnWriteArrayList<>(); /* visible for testing */
-
+ protected final Map threadValues = new ConcurrentHashMap<>(); /* visible for testing */
+
/**
* Reference to dead threads
*/
private final ReferenceQueue deadThreads = new ReferenceQueue<>();
-
+
/**
* {@code true} when the {@link ThreadLocalHolder} is closed.
* When closed, values released by threads will be immediately disposed and the reference cleared.
*/
private volatile boolean closed = false;
-
-
+
+
/**
* Create a new instance of the pool.
- *
+ *
* @param factory the factory used to create new instances.
*/
public ThreadLocalHolder(Supplier factory) {
this.factory = Objects.requireNonNull(factory);
}
-
-
+
+
/**
* Get the value assigned to the current thread, creating a new one if none is assigned yet or the
* previous has been disposed.
- *
+ *
* The value must be {@link #release()} to ensure proper life cycle before it can be {@link #acquire()}
* again.
- *
+ *
* @return the value assigned to this thread
* @throws IllegalStateException if the value is already in use and {@link #release()} was not yet invoked.
*/
public final T acquire() {
Holder holder = this.threadLocal.get();
-
+
if (holder.leased) {
throw new IllegalStateException("ThreadLocal value is already in use and not yet released.");
}
-
+
if (holder.value == null) {
holder.value = Objects.requireNonNull(createInstance());
}
-
+
holder.leased = true;
return holder.value;
}
-
-
+
+
/**
* Release the value and recycle it if possible.
- *
+ *
* @throws IllegalStateException if the value was not previously {@link #acquire()}.
*/
public final void release() {
Holder holder = this.threadLocal.get();
-
+
if (!holder.leased) {
throw new IllegalStateException("Invalid attempt at releasing a value that was not previously acquired.");
}
holder.leased = false;
-
+
/*
* Dispose value if it cannot be recycled
*/
if (this.closed || !safelyRecycleInstance(holder.value)) {
disposeHolder(holder);
}
-
+
/*
* Dispose values assigned to threads that just died
*/
processDeadThreads();
}
-
-
+
+
/**
* Close the holder and dispose all values.
* Threads are still able to {@link #acquire()} values after the holder is closed, but they will be disposed
@@ -148,39 +149,42 @@ public void close() {
* immediately instead of being recycled.
*/
this.closed = true;
-
+
/*
* Dispose value assigned to running threads.
* "inuse" values will be disposed by the owning thread when it releases it.
*/
- for (HolderRef holderRef: this.threadValues) {
+ for (HolderRef holderRef: this.threadValues.values()) {
Holder holder = holderRef.getHolder();
if (!holder.leased) {
disposeHolder(holder);
}
}
this.threadValues.clear();
-
+
/*
* Dispose values assigned to threads that just died
*/
processDeadThreads();
}
-
-
+
+
/**
* Create a new {@link Holder} and keep track of the asking thread for clearing when the thread
* is gone.
- *
+ *
* @return a {@link Holder} assigned to the current thread.
*/
private Holder initializeThread() {
- Holder holder = new Holder<>();
- threadValues.add(new HolderRef(Thread.currentThread(), holder, deadThreads));
- return holder;
+ final Thread currentThread = Thread.currentThread();
+ final long threadId = currentThread.getId();
+ return threadValues.computeIfAbsent(
+ threadId,
+ ignore -> new HolderRef(currentThread, new Holder<>(), deadThreads))
+ .holder;
}
-
+
/**
* Dispose values of dead threads
*/
@@ -193,34 +197,34 @@ private void processDeadThreads() {
while (ref != null) {
Holder holder = ref.getHolder();
disposeHolder(holder);
- threadValues.remove(ref);
-
+ threadValues.remove(ref.getThreadId());
+
ref = (HolderRef) deadThreads.poll();
}
}
-
-
+
+
private void disposeHolder(Holder holder) {
safelyDisposeInstance(holder.value);
holder.value = null;
}
-
-
+
+
/**
* Create a new object instance (must be non-null).
* Sub-classes may override this method to implement their own custom logic if needed.
- *
+ *
* @return a new object instance
*/
protected T createInstance() {
return this.factory.get();
}
-
-
+
+
/**
* Dispose the object instance by calling its life cycle methods.
* Sub-classes may override this method if they wish to implement their own custom logic.
- *
+ *
* @param instance the instance to dispose
*/
protected void disposeInstance(T instance) {
@@ -228,11 +232,11 @@ protected void disposeInstance(T instance) {
((Lifecycle) instance).dispose();
}
}
-
-
+
+
/**
* Safely dispose the given instance, ignoring any exception that may be thrown.
- *
+ *
* @param instance the instance to dispose
*/
private void safelyDisposeInstance(T instance) {
@@ -242,12 +246,12 @@ private void safelyDisposeInstance(T instance) {
// ignore
}
}
-
-
+
+
/**
* Recycle the instance before returning it to the pool.
* Sub-classes may override this method if they wish to implement their own custom logic.
- *
+ *
* @param instance the instance to recycle
* @return {@code true} if the instance can be recycled and returned to the pool, {@code false} if not.
*/
@@ -258,12 +262,12 @@ protected boolean recycleInstance(T instance) {
return true;
}
}
-
-
+
+
/**
* Safely call {@link ThreadLocalHolder#recycleInstance(Object)}, ignoring exceptions but returning
* {@code false} to prevent reuse if any is thrown.
- *
+ *
* @param instance the instance to recycle
* @return {@code true} if the instance can be recycled, {@code false} otherwise.
*/
@@ -274,8 +278,8 @@ private boolean safelyRecycleInstance(T instance) {
return false;
}
}
-
-
+
+
/**
* Optional interface that pooled instances may implement if they wish to be notified of
* life cycle events.
@@ -284,14 +288,14 @@ public interface Lifecycle {
/**
* Indicate whether the instance can be recycled and returned to the pool and perform
* the necessary recycling tasks.
- *
+ *
* @return {@code true} if the instance can be returned to the pool, {@code false} if
* it must be disposed instead.
*/
default boolean recycle() {
return true;
}
-
+
/**
* Dispose the instance and free allocated resources.
*/
@@ -299,11 +303,11 @@ default void dispose() {
// noop
}
}
-
-
+
+
/**
* Holds the value assigned to a thread together with its "inuse" state.
- *
+ *
* This class is static as to not have a reference to the outer {@link ThreadLocalHolder}
* and prevent it from being garbage collected.
*/
@@ -313,7 +317,7 @@ private static class Holder {
* May be null if none is already assigned or when the previous is disposed.
*/
private T value;
-
+
/**
* Indicate whether the instance is in use (acquired).
* Maintaining this flag helps to avoid recreating a new SoftReference every time
@@ -321,8 +325,8 @@ private static class Holder {
*/
private boolean leased;
}
-
-
+
+
/**
* A {@link WeakReference} to a thread with the {@link Holder} assigned to it.
* Used to detect the death of a thread and dispose the associated value.
@@ -330,14 +334,20 @@ private static class Holder {
/* visible for testing */
protected class HolderRef extends WeakReference {
private final Holder holder;
-
+ private final long threadId;
+
HolderRef(Thread owningThread, Holder holder, ReferenceQueue referenceQueue) {
super(owningThread, referenceQueue);
+ this.threadId = owningThread.getId();
this.holder = holder;
}
-
+
public Holder getHolder() {
return this.holder;
}
+
+ public long getThreadId() {
+ return this.threadId;
+ }
}
}
diff --git a/src/test/java/net/logstash/logback/util/ThreadLocalHolderTest.java b/src/test/java/net/logstash/logback/util/ThreadLocalHolderTest.java
index 9ffc8ca5..ba696ba9 100644
--- a/src/test/java/net/logstash/logback/util/ThreadLocalHolderTest.java
+++ b/src/test/java/net/logstash/logback/util/ThreadLocalHolderTest.java
@@ -33,6 +33,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -41,6 +42,7 @@
import net.logstash.logback.util.ThreadLocalHolder.Lifecycle;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
/**
@@ -52,14 +54,14 @@ public class ThreadLocalHolderTest {
private ThreadLocalHolder pool = new ThreadLocalHolder<>(this::createInstance);
private ExecutorService executor = new TestExecutorService();
-
-
+
+
@AfterEach
public void teardown() {
this.executor.shutdown();
}
-
-
+
+
/*
* Assert the same value is returned every time
*/
@@ -67,13 +69,13 @@ public void teardown() {
public void testAcquire_sameValueEveryTime() {
PooledObject obj1 = pool.acquire();
assertThat(obj1).isNotNull();
-
+
// Release and acquire again - must be same value
pool.release();
assertThat(pool.acquire()).isSameAs(obj1);
}
-
-
+
+
/*
* Assert different threads receive different values
*/
@@ -81,27 +83,27 @@ public void testAcquire_sameValueEveryTime() {
public void testAcquire_threadsReceiveDifferentValues() throws Exception {
PooledObject obj1 = executor.submit(() -> acquire()).get();
PooledObject obj2 = executor.submit(() -> acquire()).get();
-
+
assertThat(obj1).isNotSameAs(obj2);
}
-
+
private PooledObject acquire() {
System.out.println("acquire from thread " + Thread.currentThread());
return pool.acquire();
}
-
+
/*
* Assert an exception is thrown when value is not released
*/
public void testNotReleased() {
// acquire the value
assertThat(pool.acquire()).isNotNull();
-
+
// acquire a second time without a release
assertThatThrownBy(() -> pool.acquire()).isInstanceOf(IllegalStateException.class);
}
-
-
+
+
/*
* Assert Lifecyle#recycle() is invoked when value is released
*/
@@ -109,11 +111,11 @@ public void testNotReleased() {
public void testRecycle() {
PooledObject obj1 = pool.acquire();
pool.release();
-
+
verify(obj1, times(1)).recycle();
}
-
-
+
+
/*
* Assert value is disposed when Lifecycle#recycle() returns false
*/
@@ -121,16 +123,16 @@ public void testRecycle() {
public void testNotRecyclable() {
PooledObject obj1 = pool.acquire();
when(obj1.recycle()).thenReturn(false);
-
+
pool.release();
-
+
verify(obj1, times(1)).recycle();
verify(obj1, times(1)).dispose();
-
+
assertThat(pool.acquire()).isNotSameAs(obj1);
}
-
-
+
+
/*
* Assert values owned by dead threads are disposed, even if not yet released
*/
@@ -140,18 +142,18 @@ public void testValueDisposedOnThreadDeath() throws Exception {
// The thread is now dead and its reference should be enqueued
PooledObject unreleasedObj = executor.submit(() -> pool.acquire()).get();
PooledObject releasedObj = executor.submit(() -> acquireAndRelease()).get();
-
+
// Cleanup of dead threads happens when calling "release()".
// Lets acquire and release another value from the main thread to trigger
// the house keeping stuff.
pool.acquire();
pool.release();
-
+
verify(releasedObj, times(1)).dispose();
verify(unreleasedObj, times(1)).dispose();
}
-
-
+
+
/*
* Assert values are disposed when calling #close()
*/
@@ -159,11 +161,11 @@ public void testValueDisposedOnThreadDeath() throws Exception {
public void testClose() {
PooledObject obj1 = pool.acquire();
pool.release();
-
+
pool.close();
verify(obj1, times(1)).dispose();
}
-
+
/*
* Assert that acquire is till possible after the ThreadLocalHolder is closed but values
@@ -172,49 +174,49 @@ public void testClose() {
@Test
public void testClose_subsequentAcquiredValuesDisposed() {
pool.close();
-
+
PooledObject obj1 = pool.acquire();
pool.release();
-
+
verify(obj1, times(1)).dispose();
assertThat(pool.acquire()).isNotSameAs(obj1);
}
-
-
+
+
/*
* Close while a thread is still running and acquired a value
*/
@Test
public void testClose_withRunningThread() {
CyclicBarrier barrier = new CyclicBarrier(2);
-
+
// Acquire a value on separate thread and stay alive until latch is released
AtomicReference objRef = new AtomicReference<>();
executor.submit(() -> {
objRef.set(pool.acquire());
await(barrier);
-
+
await(barrier);
pool.release();
awaitUntilInterrupted(); // stay alive until test case is complete
});
-
+
// Wait until the thread acquired the value
await(barrier);
assertThat(pool.threadValues).hasSize(1);
-
+
// Close -
pool.close();
verify(objRef.get(), never()).dispose();
-
+
// Signal the thread to release its value
await(barrier);
verify(objRef.get(), timeout(100000).times(1)).dispose();
}
-
+
/*
* NullPointer exception thrown if factory returns null
*/
@@ -223,23 +225,23 @@ public void testFactoryReturnsNull() {
pool = new ThreadLocalHolder(() -> null);
assertThatThrownBy(() -> pool.acquire()).isInstanceOf(NullPointerException.class);
}
-
-
+
+
/*
* Exception thrown by the factory is propagated to ObjectPool#acquire()
*/
@Test
public void testFactoryThrowsException() {
RuntimeException e = new RuntimeException();
-
+
pool = new ThreadLocalHolder(() -> {
throw e;
});
assertThatThrownBy(() -> pool.acquire()).isSameAs(e);
}
-
-
+
+
/*
* Exception thrown by Lifecycle#recycle() -> survive and do not recycle
*/
@@ -247,12 +249,12 @@ public void testFactoryThrowsException() {
public void testRecycleThrowsException() {
PooledObject obj1 = spy(pool.acquire());
when(obj1.recycle()).thenThrow(new RuntimeException());
-
+
assertThatCode(() -> pool.release()).doesNotThrowAnyException();
assertThat(pool.acquire()).isNotSameAs(obj1);
}
-
-
+
+
/*
* Exception thrown by Lifecycle#dispose() -> survive and do not recycle
*/
@@ -260,30 +262,63 @@ public void testRecycleThrowsException() {
public void testDisposeThrowsException() {
PooledObject obj1 = spy(pool.acquire());
doThrow(new RuntimeException()).when(obj1).dispose();
-
+
assertThatCode(() -> pool.release()).doesNotThrowAnyException();
assertThat(pool.acquire()).isNotSameAs(obj1);
}
-
-
+
+ /**
+ * Test to verify that memory leak explained in
+ * https://github.com/logfellow/logstash-logback-encoder/issues/722 is not happening again.
+ */
+ @DisplayName("threadValues should not have a bigger size than the number of threads it executes")
+ @Test
+ public void testThreadValues_expectsAtMaximumOneValuePerThreadInThreadValues() {
+ final ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ final int poolSize = ForkJoinPool.getCommonPoolParallelism();
+
+ for (int i = 0; i < 10_000; i++) {
+ commonPool
+ .execute(
+ () -> {
+ final PooledObject acquire = pool.acquire();
+
+ assertThat(acquire.threadId).isEqualTo(Thread.currentThread().getId());
+ pool.release();
+ });
+ }
+ // try to wait for some time so that it executes the commonPool tasks
+ commonPool.awaitQuiescence(5, TimeUnit.SECONDS);
+ // it should have at maximum the number of threads plus 1 which indicates the main thread which ForkJoinPool
+ // might end up using for optimization
+ assertThat(pool.threadValues.size()).isLessThanOrEqualTo(poolSize + 1);
+ }
+
+
// --------------------------------------------------------------------------------------------
-
+
private PooledObject createInstance() {
- return spy(new PooledObject());
+ return spy(new PooledObject(Thread.currentThread().getId()));
}
-
+
public static class PooledObject implements ThreadLocalHolder.Lifecycle {
+ private long threadId;
+
+ public PooledObject(long threadId) {
+ this.threadId = threadId;
+ }
+
@Override
public boolean recycle() {
return Lifecycle.super.recycle();
}
-
+
@Override
public void dispose() {
Lifecycle.super.dispose();
}
}
-
+
/*
* Utility method to acquire and release a value in one go
*/
@@ -292,7 +327,7 @@ private PooledObject acquireAndRelease() {
pool.release();
return obj;
}
-
+
/*
* Await until the current thread is interrupted
*/
@@ -316,17 +351,17 @@ private void await(CyclicBarrier barrier) {
throw new IllegalStateException(e);
}
}
-
+
/**
* An {@link ExecutorService} that executes each submitted task on a new thread and takes care
* of notifying the {@link ThreadLocalHolder} when a thread is about to die.
- *
+ *
* This implementation guarantees that the {@link ThreadLocalHolder} is notified about the death
* of the thread *before* any Future is unblocked.
*/
private class TestExecutorService extends AbstractExecutorService {
private final List runningThreads = new CopyOnWriteArrayList<>();
-
+
@Override
@SuppressWarnings("unchecked")
protected RunnableFuture newTaskFor(Callable callable) {
@@ -344,15 +379,15 @@ public Object call() throws Exception {
}
}
};
-
+
return super.newTaskFor(wrappedCallable);
}
-
+
@Override
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return newTaskFor(Executors.callable(runnable, value));
}
-
+
@Override
public void execute(Runnable command) {
Thread t = new Thread(command) {
@@ -368,16 +403,16 @@ public void run() {
t.start();
runningThreads.add(t);
}
-
+
@SuppressWarnings("rawtypes")
private void notifyThreadDeath(Thread thread) {
- for (HolderRef ref: pool.threadValues) {
+ for (HolderRef ref: pool.threadValues.values()) {
if (ref.get() == thread) {
ref.enqueue();
}
}
}
-
+
@Override
public void shutdown() {
for (Thread t: runningThreads) {