Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Only trigger action once per thread #107232

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/107232.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 107232
summary: Only trigger action once per thread
area: Transform
type: bug
issues:
- 107215
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TransformRetryableStartUpListener<Response> implements TransformScheduler.
private final Supplier<Boolean> shouldRetry;
private final TransformContext context;
private final AtomicBoolean isFirstRun;
private final AtomicBoolean isRunning;
private final AtomicBoolean shouldRunAction;

/**
* @param transformId the transform associated with this listener. All events to this listener must be for the same transformId.
Expand Down Expand Up @@ -53,30 +53,28 @@ class TransformRetryableStartUpListener<Response> implements TransformScheduler.
this.shouldRetry = shouldRetry;
this.context = context;
this.isFirstRun = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
this.shouldRunAction = new AtomicBoolean(true);
}

@Override
public void triggered(TransformScheduler.Event event) {
if (isRunning.get() && transformId.equals(event.transformId())) {
if (transformId.equals(event.transformId()) && shouldRunAction.compareAndSet(true, false)) {
action.accept(ActionListener.wrap(this::actionSucceeded, this::actionFailed));
}
}

private void markDone() {
if (isRunning.compareAndSet(true, false)) {
synchronized (context) {
context.resetStartUpFailureCount();
}
}
}

private void actionSucceeded(Response r) {
maybeNotifyRetryListener(false);
markDone();
actionListener.onResponse(r);
}

private void markDone() {
synchronized (context) {
context.resetStartUpFailureCount();
}
}

private void maybeNotifyRetryListener(boolean response) {
if (isFirstRun.compareAndSet(true, false)) {
retryScheduledListener.onResponse(response);
Expand All @@ -87,6 +85,7 @@ private void actionFailed(Exception e) {
if (shouldRetry.get()) {
maybeNotifyRetryListener(true);
recordError(e);
shouldRunAction.set(true);
} else {
maybeNotifyRetryListener(false);
markDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -236,4 +237,67 @@ public void testCancelRetryImmediately() {
assertFalse("Retries should not be scheduled.", retryResult.get());
verify(context, only()).resetStartUpFailureCount();
}

/**
* Given triggered has been called
* When we call trigger a second time
* And the first call has not finished
* Then we should not take any action
*
* Given the first call has finished
* When we call trigger a third time
* Then we should successfully call the action
*/
public void testRunOneAtATime() {
var retryResult = new AtomicReference<Boolean>();
var responseResult = new AtomicInteger(0);
var context = mock(TransformContext.class);

var savedListener = new AtomicReference<ActionListener<Void>>();
Consumer<ActionListener<Void>> action = l -> {
if (savedListener.compareAndSet(null, l) == false) {
fail("Action should only be called once.");
}
};

var listener = new TransformRetryableStartUpListener<>(
"transformId",
action,
responseListener(responseResult),
retryListener(retryResult),
() -> true,
context
);

callThreeTimes("transformId", listener);

// verify the action has been called
assertNotNull(savedListener.get());

// assert the listener has not been called yet
assertEquals("Response Listener should never be called once.", 0, responseResult.get());
assertNull("Retry Listener should not be called.", retryResult.get());
verifyNoInteractions(context);

savedListener.get().onFailure(new IllegalStateException("first call fails"));

// assert only 1 retry and 0 success
assertEquals("Response Listener should only be called once.", 0, responseResult.get());
assertNotNull("Retry Listener should be called.", retryResult.get());
assertTrue("Retries should be scheduled.", retryResult.get());
verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class));
verify(context, never()).resetStartUpFailureCount();

// rerun and succeed
savedListener.set(null);
callThreeTimes("transformId", listener);
savedListener.get().onResponse(null);

// assert only 1 retry and 1 failure
assertEquals("Response Listener should only be called once.", 1, responseResult.get());
assertNotNull("Retry Listener should be called.", retryResult.get());
assertTrue("Retries should be scheduled.", retryResult.get());
verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class));
verify(context, times(1)).resetStartUpFailureCount();
}
}