Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address concurrency issues in DequeRecycler close #41695

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
*/
public class ConcurrentDequeRecycler<T> extends DequeRecycler<T> {

// we maintain size separately because concurrent deque implementations typically have linear-time size() impls
// we maintain size separately because concurrent deque implementations typically have linear-time size() impls.
// due to the concurrent nature of this class, size may differ from the actual size of the deque depending on
// other concurrent requests and their current state
final AtomicInteger size;

public ConcurrentDequeRecycler(C<T> c, int maxSize) {
Expand All @@ -39,7 +41,6 @@ public ConcurrentDequeRecycler(C<T> c, int maxSize) {

@Override
public void close() {
assert deque.size() == size.get();
super.close();
size.set(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@


import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link Recycler} implementation based on a {@link Deque}. This implementation is NOT thread-safe.
*/
public class DequeRecycler<T> extends AbstractRecycler<T> {

final AtomicBoolean closed = new AtomicBoolean(false);
final Deque<T> deque;
final int maxSize;

Expand All @@ -38,12 +40,13 @@ public DequeRecycler(C<T> c, Deque<T> queue, int maxSize) {

@Override
public void close() {
// call destroy() for every cached object
for (T t : deque) {
c.destroy(t);
if (closed.compareAndSet(false, true)) {
T t;
while ((t = deque.pollFirst()) != null) {
c.destroy(t);
}
assert deque.size() == 0;
}
// finally get rid of all references
deque.clear();
}

@Override
Expand Down Expand Up @@ -90,7 +93,7 @@ public void close() {
if (value == null) {
throw new IllegalStateException("recycler entry already released...");
}
final boolean recycle = beforeRelease();
final boolean recycle = beforeRelease() && closed.get() == false;
if (recycle) {
c.recycle(value);
deque.addFirst(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public static <T> Recycler.Factory<T> dequeFactory(final Recycler.C<T> c, final
}

/**
* Wrap the provided recycler so that calls to {@link Recycler#obtain()} and {@link Recycler.V#close()} are protected by
* a lock.
* Wrap the provided recycler so that calls to {@link Recycler#obtain()}, {@link Recycler#close()} and {@link Recycler.V#close()}
* are protected by a lock.
*/
public static <T> Recycler<T> locked(final Recycler<T> recycler) {
return new FilterRecycler<T>() {
Expand All @@ -79,6 +79,13 @@ public Recycler.V<T> obtain() {
}
}

@Override
public void close() {
synchronized (lock) {
super.close();
}
}

@Override
protected Recycler.V<T> wrap(final Recycler.V<T> delegate) {
return new Recycler.V<T>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,64 @@

package org.elasticsearch.common.recycler;

import org.elasticsearch.common.lease.Releasables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class LockedRecyclerTests extends AbstractRecyclerTestCase {

@Override
protected Recycler<byte[]> newRecycler(int limit) {
return Recyclers.locked(Recyclers.deque(RECYCLER_C, limit));
}

public void testConcurrentCloseAndObtain() throws Exception {
final int prepopulatedAmount = 1_000_000;
final Recycler<byte[]> recycler = newRecycler(prepopulatedAmount);
final List<Recycler.V<byte[]>> recyclers = new ArrayList<>();
// prepopulate recycler with 1 million entries to ensure we have entries to iterate over
for (int i = 0; i < prepopulatedAmount; i++) {
recyclers.add(recycler.obtain());
}
Releasables.close(recyclers);
final int numberOfProcessors = Runtime.getRuntime().availableProcessors();
final int numberOfThreads = scaledRandomIntBetween((numberOfProcessors + 1) / 2, numberOfProcessors * 3);
final int numberOfIterations = scaledRandomIntBetween(100_000, 1_000_000);
final CountDownLatch latch = new CountDownLatch(1 + numberOfThreads);
List<Thread> threads = new ArrayList<>(numberOfThreads);
for (int i = 0; i < numberOfThreads - 1; i++) {
threads.add(new Thread(() -> {
latch.countDown();
try {
latch.await();
for (int iter = 0; iter < numberOfIterations; iter++) {
Recycler.V<byte[]> o = recycler.obtain();
final byte[] bytes = o.v();
assertNotNull(bytes);
o.close();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
threads.add(new Thread(() -> {
latch.countDown();
try {
latch.await();
Thread.sleep(randomLongBetween(1, 200));
recycler.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));

threads.forEach(Thread::start);
latch.countDown();
for (Thread t : threads) {
t.join();
}
}
}