Skip to content

Commit

Permalink
Part of #1260: write a manually run concurrency test to tease out pro…
Browse files Browse the repository at this point in the history
…blem with pool retention (#1265)
  • Loading branch information
cowtowncoder authored Apr 18, 2024
1 parent 33a99d3 commit 3010ad2
Showing 1 changed file with 175 additions and 0 deletions.
175 changes: 175 additions & 0 deletions src/test/java/perf/RecyclerPoolTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package perf;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.BufferRecycler;
import com.fasterxml.jackson.core.util.JsonRecyclerPools;
import com.fasterxml.jackson.core.util.RecyclerPool;

/**
* High-concurrency test that tries to see if unbounded {@link RecyclerPool}
* implementations grow without bounds or not.
*/
public class RecyclerPoolTest
{
final static int THREAD_COUNT = 100;

final static int RUNTIME_SECS = 60;

private final int _threadCount;

RecyclerPoolTest(int threadCount) {
_threadCount = threadCount;
}

public String testPool(JsonFactory jsonF, int runtimeMinutes)
throws InterruptedException
{
RecyclerPool<BufferRecycler> poolImpl = jsonF._getRecyclerPool();

final String poolName = poolImpl.getClass().getSimpleName();
final ExecutorService exec = Executors.newFixedThreadPool(_threadCount);
final AtomicLong calls = new AtomicLong();
final long startTime = System.currentTimeMillis();
final long runtimeMsecs = TimeUnit.SECONDS.toMillis(runtimeMinutes);
final long endtimeMsecs = startTime + runtimeMsecs;
final AtomicInteger threadsRunning = new AtomicInteger();

System.out.printf("Starting test of '%s' with %d threads, for %d seconds.\n",
poolImpl.getClass().getName(),
_threadCount, runtimeMsecs / 1000L);

for (int i = 0; i < _threadCount; ++i) {
final int id = i;
threadsRunning.incrementAndGet();
exec.execute(new Runnable() {
@Override
public void run() {
testUntil(jsonF, endtimeMsecs, id, calls);
threadsRunning.decrementAndGet();
}
});
}

long currentTime;
long nextPrint = 0L;
// Print if exceeds threshold (3 x threadcount), otherwise every 2.5 seconds
final int thresholdToPrint = _threadCount * 3;
int maxPooled = 0;

while ((currentTime = System.currentTimeMillis()) < endtimeMsecs) {
int poolSize;

if ((poolSize = poolImpl.pooledCount()) > thresholdToPrint
|| (currentTime > nextPrint)) {
double secs = (currentTime - startTime) / 1000.0;
System.out.printf(" (%s) %.1fs, %dk calls; %d threads; pool size: %d (max seen: %d)\n",
poolName, secs, calls.get()>>10, threadsRunning.get(), poolSize, maxPooled);
if (poolSize > maxPooled) {
maxPooled = poolSize;
}
Thread.sleep(100L);
nextPrint = currentTime + 2500L;
}
}

String desc = String.format("Completed test of '%s': max size seen = %d",
poolName, maxPooled);
System.out.printf("%s. Wait termination of threads..\n", desc);
if (!exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
System.out.printf("WARNING: ExecutorService.awaitTermination() failed: %d threads left; will shut down.\n",
threadsRunning.get());
exec.shutdown();
}
return desc;
}

void testUntil(JsonFactory jsonF,
long endTimeMsecs, int threadId, AtomicLong calls)
{
final Random rnd = new Random(threadId);
final byte[] JSON_INPUT = "\"abc\"".getBytes(StandardCharsets.UTF_8);

while (System.currentTimeMillis() < endTimeMsecs) {
try {
// Randomize call order a bit
switch (rnd.nextInt() & 3) {
case 0:
_testRead(jsonF, JSON_INPUT);
break;
case 1:
_testWrite(jsonF);
break;
case 2:
_testRead(jsonF, JSON_INPUT);
_testWrite(jsonF);
break;
default:
_testWrite(jsonF);
_testRead(jsonF, JSON_INPUT);
break;
}
} catch (Exception e) {
System.err.printf("ERROR: thread %d fail, will exit: (%s) %s\n",
threadId, e.getClass().getName(), e.getMessage());
break;
}
calls.incrementAndGet();
}
}

private void _testRead(JsonFactory jsonF, byte[] input) throws Exception
{
JsonParser p = jsonF.createParser(new ByteArrayInputStream(input));
while (p.nextToken() != null) {
;
}
p.close();
}

private void _testWrite(JsonFactory jsonF) throws Exception
{
StringWriter w = new StringWriter(16);
JsonGenerator g = jsonF.createGenerator(w);
g.writeStartArray();
g.writeString("foobar");
g.writeEndArray();
g.close();
}

public static void main(String[] args) throws Exception
{
RecyclerPoolTest test = new RecyclerPoolTest(THREAD_COUNT);
List<String> results = Arrays.asList(
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newConcurrentDequePool())
.build(),
RUNTIME_SECS),
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newBoundedPool(THREAD_COUNT - 5))
.build(),
RUNTIME_SECS),
test.testPool(JsonFactory.builder()
.recyclerPool(JsonRecyclerPools.newLockFreePool())
.build(),
RUNTIME_SECS)
);

System.out.println("Tests complete! Results:\n");
for (String result : results) {
System.out.printf(" * %s\n", result);
}
}
}

0 comments on commit 3010ad2

Please sign in to comment.