Skip to content

Commit

Permalink
Merge pull request #14810 from stuartwdouglas/flakey-test2
Browse files Browse the repository at this point in the history
Fix flakey test
  • Loading branch information
geoand authored Feb 4, 2021
2 parents aab9911 + 36e994a commit 2fce745
Showing 1 changed file with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.wildfly.common.Assert;
Expand Down Expand Up @@ -48,13 +50,14 @@ public void connectionsFromOtherThreadsGetClosed() throws ExecutionException, In
public void connectionsThatAreNotTheLastGetClosedSuccessfully()
throws ExecutionException, InterruptedException {
ExecutorService e = Executors.newFixedThreadPool(3);
Set<Thread> threads = new HashSet<>();
try {
TestableThreadLocalPool globalPool = new TestableThreadLocalPool();
Assert.assertTrue(globalPool.trackedSize() == 0);
final TestPoolInterface p1 = grabPoolFromLongLivingThread(globalPool, e);
final TestPoolInterface p1 = grabPoolFromLongLivingThread(globalPool, e, threads);
Assert.assertFalse(p1.isClosed());
Assert.assertTrue(globalPool.trackedSize() == 1);
final TestPoolInterface p2 = grabPoolFromLongLivingThread(globalPool, e);
final TestPoolInterface p2 = grabPoolFromLongLivingThread(globalPool, e, threads);
Assert.assertFalse(p1.isClosed());
Assert.assertFalse(p2.isClosed());
Assert.assertFalse(p1.isClosed());
Expand All @@ -64,6 +67,9 @@ public void connectionsThatAreNotTheLastGetClosedSuccessfully()
while (!e.isTerminated()) {
Thread.sleep(1);
}
for (Thread thread : threads) {
waitForThreadStop(thread);
}
final TestPoolInterface p3 = grabPoolFromOtherThread(globalPool);
Assert.assertTrue(p1.isClosed());
Assert.assertTrue(p2.isClosed());
Expand Down Expand Up @@ -107,30 +113,35 @@ public void plainClose() {
private TestPoolInterface grabPoolFromOtherThread(TestableThreadLocalPool globalPool)
throws ExecutionException, InterruptedException {
final ExecutorService e = Executors.newSingleThreadExecutor();
AtomicReference<Thread> thread = new AtomicReference<>();
CompletableFuture<Thread> thread = new CompletableFuture<>();
final Future<TestPoolInterface> creation = e.submit(() -> {
thread.set(Thread.currentThread());
thread.complete(Thread.currentThread());
return globalPool.pool();
});
final TestPoolInterface poolInstance = creation.get();
e.shutdown();
while (thread.get().isAlive()) {
waitForThreadStop(thread.get());
return poolInstance;
}

private void waitForThreadStop(Thread thread) throws InterruptedException {
while (thread.isAlive()) {
//even after the pool is shutdown the thread may not actually report itself
//as being dead yet, which can lead to race conditions
//so we wait just to be sure
Thread.sleep(1);
}
return poolInstance;
}

private TestPoolInterface grabPoolFromLongLivingThread(TestableThreadLocalPool globalPool,
ExecutorService e)
ExecutorService e, Set<Thread> allThreads)
throws InterruptedException, ExecutionException {
AtomicReference<Thread> thread = new AtomicReference<>();
CompletableFuture<Thread> thread = new CompletableFuture<>();
final Future<TestPoolInterface> creation = e.submit(() -> {
thread.set(Thread.currentThread());
thread.complete(Thread.currentThread());
return globalPool.pool();
});
allThreads.add(thread.get());
return creation.get();
}

Expand Down

0 comments on commit 2fce745

Please sign in to comment.