diff --git a/server/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java b/server/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java index 04103c5e274d9..a200216c99d9a 100644 --- a/server/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java +++ b/server/src/main/java/org/elasticsearch/common/recycler/ConcurrentDequeRecycler.java @@ -29,7 +29,9 @@ */ public class ConcurrentDequeRecycler extends DequeRecycler { - // we maintain size separately because concurrent deque implementations typically have linear-time size() impls + // we maintain size separately because concurrent deque implementations typically have linear-time size() impls. + // due to the concurrent nature of this class, size may differ from the actual size of the deque depending on + // other concurrent requests and their current state final AtomicInteger size; public ConcurrentDequeRecycler(C c, int maxSize) { @@ -39,7 +41,6 @@ public ConcurrentDequeRecycler(C c, int maxSize) { @Override public void close() { - assert deque.size() == size.get(); super.close(); size.set(0); } diff --git a/server/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java b/server/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java index a40befe9d8191..423cffd828d81 100644 --- a/server/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java +++ b/server/src/main/java/org/elasticsearch/common/recycler/DequeRecycler.java @@ -21,12 +21,14 @@ import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe. */ public class DequeRecycler extends AbstractRecycler { + final AtomicBoolean closed = new AtomicBoolean(false); final Deque deque; final int maxSize; @@ -38,12 +40,13 @@ public DequeRecycler(C c, Deque queue, int maxSize) { @Override public void close() { - // call destroy() for every cached object - for (T t : deque) { - c.destroy(t); + if (closed.compareAndSet(false, true)) { + T t; + while ((t = deque.pollFirst()) != null) { + c.destroy(t); + } + assert deque.size() == 0; } - // finally get rid of all references - deque.clear(); } @Override @@ -90,7 +93,7 @@ public void close() { if (value == null) { throw new IllegalStateException("recycler entry already released..."); } - final boolean recycle = beforeRelease(); + final boolean recycle = beforeRelease() && closed.get() == false; if (recycle) { c.recycle(value); deque.addFirst(value); diff --git a/server/src/main/java/org/elasticsearch/common/recycler/Recyclers.java b/server/src/main/java/org/elasticsearch/common/recycler/Recyclers.java index 3ea9d17c25f19..b24ea3cbb1e4f 100644 --- a/server/src/main/java/org/elasticsearch/common/recycler/Recyclers.java +++ b/server/src/main/java/org/elasticsearch/common/recycler/Recyclers.java @@ -55,8 +55,8 @@ public static Recycler.Factory dequeFactory(final Recycler.C c, final } /** - * Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#close()} are protected by - * a lock. + * Wrap the provided recycler so that calls to {@link Recycler#obtain()}, {@link Recycler#close()} and {@link Recycler.V#close()} + * are protected by a lock. */ public static Recycler locked(final Recycler recycler) { return new FilterRecycler() { @@ -79,6 +79,13 @@ public Recycler.V obtain() { } } + @Override + public void close() { + synchronized (lock) { + super.close(); + } + } + @Override protected Recycler.V wrap(final Recycler.V delegate) { return new Recycler.V() { diff --git a/server/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java b/server/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java index ad7b2943afca1..f66f6e698eb3d 100644 --- a/server/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java +++ b/server/src/test/java/org/elasticsearch/common/recycler/LockedRecyclerTests.java @@ -19,6 +19,12 @@ package org.elasticsearch.common.recycler; +import org.elasticsearch.common.lease.Releasables; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + public class LockedRecyclerTests extends AbstractRecyclerTestCase { @Override @@ -26,4 +32,51 @@ protected Recycler newRecycler(int limit) { return Recyclers.locked(Recyclers.deque(RECYCLER_C, limit)); } + public void testConcurrentCloseAndObtain() throws Exception { + final int prepopulatedAmount = 1_000_000; + final Recycler recycler = newRecycler(prepopulatedAmount); + final List> recyclers = new ArrayList<>(); + // prepopulate recycler with 1 million entries to ensure we have entries to iterate over + for (int i = 0; i < prepopulatedAmount; i++) { + recyclers.add(recycler.obtain()); + } + Releasables.close(recyclers); + final int numberOfProcessors = Runtime.getRuntime().availableProcessors(); + final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3); + final int numberOfIterations = scaledRandomIntBetween(100_000, 1_000_000); + final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads); + List threads = new ArrayList<>(numberOfThreads); + for (int i = 0; i < numberOfThreads - 1; i++) { + threads.add(new Thread(() -> { + latch.countDown(); + try { + latch.await(); + for (int iter = 0; iter < numberOfIterations; iter++) { + Recycler.V o = recycler.obtain(); + final byte[] bytes = o.v(); + assertNotNull(bytes); + o.close(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + })); + } + threads.add(new Thread(() -> { + latch.countDown(); + try { + latch.await(); + Thread.sleep(randomLongBetween(1, 200)); + recycler.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + })); + + threads.forEach(Thread::start); + latch.countDown(); + for (Thread t : threads) { + t.join(); + } + } }