From 6ff6dc1fb4a344deea3ad8f858a2c051c8880bb8 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 9 Apr 2024 18:16:42 -0400 Subject: [PATCH] [Transform] Only trigger action once per thread (#107232) TransformScheduler can trigger its tasks on multiple threads. TransformTask uses an AtomicReference to manage one trigger event per thread by cycling between "Started" and "Indexing". The Retry Listener now has the same protection. "shouldRunAction" will cycle to false during execution and back to true if the action fails and should be retried. Fix #107215 --- docs/changelog/107232.yaml | 6 ++ .../TransformRetryableStartUpListener.java | 21 +++--- ...ransformRetryableStartUpListenerTests.java | 64 +++++++++++++++++++ 3 files changed, 80 insertions(+), 11 deletions(-) create mode 100644 docs/changelog/107232.yaml diff --git a/docs/changelog/107232.yaml b/docs/changelog/107232.yaml new file mode 100644 index 0000000000000..1422848cb1c91 --- /dev/null +++ b/docs/changelog/107232.yaml @@ -0,0 +1,6 @@ +pr: 107232 +summary: Only trigger action once per thread +area: Transform +type: bug +issues: + - 107215 diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListener.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListener.java index 17548fd8d427f..33b20d5513bc5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListener.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListener.java @@ -22,7 +22,7 @@ class TransformRetryableStartUpListener implements TransformScheduler. private final Supplier shouldRetry; private final TransformContext context; private final AtomicBoolean isFirstRun; - private final AtomicBoolean isRunning; + private final AtomicBoolean shouldRunAction; /** * @param transformId the transform associated with this listener. All events to this listener must be for the same transformId. @@ -53,30 +53,28 @@ class TransformRetryableStartUpListener implements TransformScheduler. this.shouldRetry = shouldRetry; this.context = context; this.isFirstRun = new AtomicBoolean(true); - this.isRunning = new AtomicBoolean(true); + this.shouldRunAction = new AtomicBoolean(true); } @Override public void triggered(TransformScheduler.Event event) { - if (isRunning.get() && transformId.equals(event.transformId())) { + if (transformId.equals(event.transformId()) && shouldRunAction.compareAndSet(true, false)) { action.accept(ActionListener.wrap(this::actionSucceeded, this::actionFailed)); } } - private void markDone() { - if (isRunning.compareAndSet(true, false)) { - synchronized (context) { - context.resetStartUpFailureCount(); - } - } - } - private void actionSucceeded(Response r) { maybeNotifyRetryListener(false); markDone(); actionListener.onResponse(r); } + private void markDone() { + synchronized (context) { + context.resetStartUpFailureCount(); + } + } + private void maybeNotifyRetryListener(boolean response) { if (isFirstRun.compareAndSet(true, false)) { retryScheduledListener.onResponse(response); @@ -87,6 +85,7 @@ private void actionFailed(Exception e) { if (shouldRetry.get()) { maybeNotifyRetryListener(true); recordError(e); + shouldRunAction.set(true); } else { maybeNotifyRetryListener(false); markDone(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListenerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListenerTests.java index 1a2bbfd434455..77b290e015d9a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListenerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListenerTests.java @@ -18,6 +18,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.only; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -236,4 +237,67 @@ public void testCancelRetryImmediately() { assertFalse("Retries should not be scheduled.", retryResult.get()); verify(context, only()).resetStartUpFailureCount(); } + + /** + * Given triggered has been called + * When we call trigger a second time + * And the first call has not finished + * Then we should not take any action + * + * Given the first call has finished + * When we call trigger a third time + * Then we should successfully call the action + */ + public void testRunOneAtATime() { + var retryResult = new AtomicReference(); + var responseResult = new AtomicInteger(0); + var context = mock(TransformContext.class); + + var savedListener = new AtomicReference>(); + Consumer> action = l -> { + if (savedListener.compareAndSet(null, l) == false) { + fail("Action should only be called once."); + } + }; + + var listener = new TransformRetryableStartUpListener<>( + "transformId", + action, + responseListener(responseResult), + retryListener(retryResult), + () -> true, + context + ); + + callThreeTimes("transformId", listener); + + // verify the action has been called + assertNotNull(savedListener.get()); + + // assert the listener has not been called yet + assertEquals("Response Listener should never be called once.", 0, responseResult.get()); + assertNull("Retry Listener should not be called.", retryResult.get()); + verifyNoInteractions(context); + + savedListener.get().onFailure(new IllegalStateException("first call fails")); + + // assert only 1 retry and 0 success + assertEquals("Response Listener should only be called once.", 0, responseResult.get()); + assertNotNull("Retry Listener should be called.", retryResult.get()); + assertTrue("Retries should be scheduled.", retryResult.get()); + verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class)); + verify(context, never()).resetStartUpFailureCount(); + + // rerun and succeed + savedListener.set(null); + callThreeTimes("transformId", listener); + savedListener.get().onResponse(null); + + // assert only 1 retry and 1 failure + assertEquals("Response Listener should only be called once.", 1, responseResult.get()); + assertNotNull("Retry Listener should be called.", retryResult.get()); + assertTrue("Retries should be scheduled.", retryResult.get()); + verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class)); + verify(context, times(1)).resetStartUpFailureCount(); + } }