From 341eeae6e7a4c9c01c794a2f3d4a2fefc3d477ff Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 16 Sep 2020 09:13:22 -0400 Subject: [PATCH] [ML] fixes testWatchdog test verifying matcher is interrupted on timeout (#62391) (#62447) Constructing the timout checker FIRST and THEN registering the watcher allows the test to have a race condition. The timeout value could be reached BEFORE the matcher is added. To prevent the matcher never being interrupted, a new timedOut value is added to the watcher thread entry. Then when a new matcher is registered, if the thread was previously timedout, we interrupt the matcher immediately. closes #48861 --- .../filestructurefinder/TimeoutChecker.java | 19 +++++++++++++++---- .../TimeoutCheckerTests.java | 10 ++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java index a943e7b7a7349..99ce19e3ad110 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java @@ -84,7 +84,6 @@ public synchronized void close() { * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. */ public void check(String where) { - if (timeoutExceeded) { throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where + "] as it has taken longer than the timeout of [" + timeout + "]"); @@ -101,7 +100,6 @@ public void check(String where) { * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. */ public Map grokCaptures(Grok grok, String text, String where) { - try { return grok.captures(text); } finally { @@ -137,12 +135,15 @@ void add(Thread thread, TimeValue timeout) { } @Override - public void register(Matcher matcher) { + public synchronized void register(Matcher matcher) { WatchDogEntry value = registry.get(Thread.currentThread()); if (value != null) { boolean wasFalse = value.registered.compareAndSet(false, true); assert wasFalse; value.matchers.add(matcher); + if (value.isTimedOut()) { + matcher.interrupt(); + } } } @@ -167,8 +168,9 @@ void remove(Thread thread) { assert previousValue != null; } - void interruptLongRunningThreadIfRegistered(Thread thread) { + synchronized void interruptLongRunningThreadIfRegistered(Thread thread) { WatchDogEntry value = registry.get(thread); + value.timedOut(); if (value.registered.get()) { for (Matcher matcher : value.matchers) { matcher.interrupt(); @@ -181,12 +183,21 @@ static class WatchDogEntry { final TimeValue timeout; final AtomicBoolean registered; final Collection matchers; + boolean timedOut; WatchDogEntry(TimeValue timeout) { this.timeout = timeout; this.registered = new AtomicBoolean(false); this.matchers = new CopyOnWriteArrayList<>(); } + + private void timedOut() { + timedOut = true; + } + + private boolean isTimedOut() { + return timedOut; + } } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java index 34c9a047a9958..2666bcc44f5cc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java @@ -59,21 +59,19 @@ public void testCheckTimeoutExceeded() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48861") public void testWatchdog() throws Exception { - TimeValue timeout = TimeValue.timeValueMillis(500); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(10, 500)); try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) { - TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; - + final TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; Matcher matcher = mock(Matcher.class); - TimeoutChecker.watchdog.register(matcher); + watchdog.register(matcher); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1)); try { assertBusy(() -> { verify(matcher).interrupt(); }); } finally { - TimeoutChecker.watchdog.unregister(matcher); + watchdog.unregister(matcher); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0)); } }