Skip to content

Commit

Permalink
Fix test concurrent remote connection updates
Browse files Browse the repository at this point in the history
This test has a race condition. The action listener used to listen for
connections has a guard against being executed twice. However, this
listener can be executed twice. After on success is invoked the test
starts to tear down. At this point, the threads the test forked will
terminate and the remote cluster connection will be closed. However, a
thread forked to the management thread pool by the remote cluster
connection can still be executing and try to continue connecting. This
thread will be cancelled when the remote cluster connection is closed
and this leads to the action listener being invoked again. To address
this, we explicitly check that the reason that on failure was invoked
was cancellation, and we assert that the listener was already previously
invoked. Interestingly, this issue has always been present yet a recent
change (#28667) exposed errors that occur on tasks submitted to the
thread pool and were silently being lost.

Relates #28695
  • Loading branch information
jasontedor committed Feb 16, 2018
1 parent 3d2393e commit 10666a4
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ public void onNodeDisconnected(DiscoveryNode node) {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/28695")
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
Expand Down Expand Up @@ -591,17 +590,28 @@ public void run() {
CountDownLatch latch = new CountDownLatch(numConnectionAttempts);
for (int i = 0; i < numConnectionAttempts; i++) {
AtomicBoolean executed = new AtomicBoolean(false);
ActionListener<Void> listener = ActionListener.wrap(x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();}, x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();
if (x instanceof RejectedExecutionException) {
// that's fine
} else {
throw new AssertionError(x);
}
});
ActionListener<Void> listener = ActionListener.wrap(
x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();},
x -> {
/*
* This can occur on a thread submitted to the thread pool while we are closing the
* remote cluster connection at the end of the test.
*/
if (x instanceof CancellableThreads.ExecutionCancelledException) {
// we should already be shutting down
assertTrue(executed.get());
return;
}

assertTrue(executed.compareAndSet(false, true));
latch.countDown();

if (!(x instanceof RejectedExecutionException)) {
throw new AssertionError(x);
}
});
connection.updateSeedNodes(seedNodes, listener);
}
latch.await();
Expand Down

0 comments on commit 10666a4

Please sign in to comment.