From 999fe8761205b89eeac39cce7331d99852918534 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Sat, 17 Dec 2022 09:43:07 +1100 Subject: [PATCH] Serialize onCompleteFailure for #9059 (#9062) Serialize onCompleteFailure for #9059 * Fixed case where process() throws an exception. Before, exiting the processing loop would always skip to invoke onCompleteFailure(), causing the callback to not be completed. Now we fall through and possibly invoke onCompleteFailure() if it was not already invoked. * Updated javadocs. * Code cleanups. Co-authored-by: Simone Bordet --- .../eclipse/jetty/util/IteratingCallback.java | 300 ++++++++++-------- 1 file changed, 170 insertions(+), 130 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 31e5a70b36df..831b2462818a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -24,109 +24,118 @@ /** * This specialized callback implements a pattern that allows - * a large job to be broken into smaller tasks using iteration - * rather than recursion. + * a large asynchronous task to be broken into smaller + * asynchronous sub-tasks using iteration rather than recursion. *

* A typical example is the write of a large content to a socket, * divided in chunks. Chunk C1 is written by thread T1, which * also invokes the callback, which writes chunk C2, which invokes * the callback again, which writes chunk C3, and so forth. - *

*

- * The problem with the example is that if the callback thread + * The problem with the example above is that if the callback thread * is the same that performs the I/O operation, then the process * is recursive and may result in a stack overflow. * To avoid the stack overflow, a thread dispatch must be performed, * causing context switching and cache misses, affecting performance. - *

*

- * To avoid this issue, this callback uses an AtomicReference to - * record whether success callback has been called during the processing - * of a sub task, and if so then the processing iterates rather than - * recurring. - *

+ * To avoid this issue, this callback atomically records whether + * the callback for an asynchronous sub-task has been called + * during the processing of the asynchronous sub-task, and if so + * then the processing of the large asynchronous task iterates + * rather than recursing. *

- * Subclasses must implement method {@link #process()} where the sub - * task is executed and a suitable {@link IteratingCallback.Action} is - * returned to this callback to indicate the overall progress of the job. - * This callback is passed to the asynchronous execution of each sub - * task and a call the {@link #succeeded()} on this callback represents - * the completion of the sub task. - *

+ * Subclasses must implement method {@link #process()} where the + * asynchronous sub-task is initiated and a suitable {@link Action} + * is returned to this callback to indicate the overall progress of + * the large asynchronous task. + * This callback is passed to the asynchronous sub-task, and a call + * to {@link #succeeded()} on this callback represents the successful + * completion of the asynchronous sub-task, while a call to + * {@link #failed(Throwable)} on this callback represents the + * completion with a failure of the large asynchronous task. */ public abstract class IteratingCallback implements Callback { /** - * The internal states of this callback + * The internal states of this callback. */ private enum State { /** - * This callback is IDLE, ready to iterate. + * This callback is idle, ready to iterate. */ IDLE, /** - * This callback is iterating calls to {@link #process()} and is dealing with - * the returns. To get into processing state, it much of held the lock state - * and set iterating to true. + * This callback is just about to call {@link #process()}, + * or within it, or just exited from it, either normally + * or by throwing. */ PROCESSING, /** - * Waiting for a schedule callback + * Method {@link #process()} returned {@link Action#SCHEDULED} + * and this callback is waiting for the asynchronous sub-task + * to complete. */ PENDING, /** - * Called by a schedule callback + * The asynchronous sub-task was completed successfully + * via a call to {@link #succeeded()} while in + * {@link #PROCESSING} state. */ CALLED, /** - * The overall job has succeeded as indicated by a {@link Action#SUCCEEDED} return - * from {@link IteratingCallback#process()} + * The iteration terminated successfully as indicated by + * {@link Action#SUCCEEDED} returned from + * {@link IteratingCallback#process()}. */ SUCCEEDED, /** - * The overall job has failed as indicated by a call to {@link IteratingCallback#failed(Throwable)} + * The iteration terminated with a failure via a call + * to {@link IteratingCallback#failed(Throwable)}. */ FAILED, /** - * This callback has been closed and cannot be reset. + * This callback has been {@link #close() closed} and + * cannot be {@link #reset() reset}. */ CLOSED } /** - * The indication of the overall progress of the overall job that - * implementations of {@link #process()} must return. + * The indication of the overall progress of the iteration + * that implementations of {@link #process()} must return. */ protected enum Action { /** * Indicates that {@link #process()} has no more work to do, - * but the overall job is not completed yet, probably waiting + * but the iteration is not completed yet, probably waiting * for additional events to trigger more work. */ IDLE, /** - * Indicates that {@link #process()} is executing asynchronously - * a sub task, where the execution has started but the callback + * Indicates that {@link #process()} has initiated an asynchronous + * sub-task, where the execution has started but the callback + * that signals the completion of the asynchronous sub-task * may have not yet been invoked. */ SCHEDULED, - /** - * Indicates that {@link #process()} has completed the overall job. + * Indicates that {@link #process()} has completed the whole + * iteration successfully. */ SUCCEEDED } private Locker _locker = new Locker(); private State _state; + private Throwable _failure; private boolean _iterate; protected IteratingCallback() @@ -140,11 +149,10 @@ protected IteratingCallback(boolean needReset) } /** - * Method called by {@link #iterate()} to process the sub task. + * Method called by {@link #iterate()} to process the asynchronous sub-task. *

- * Implementations must start the asynchronous execution of the sub task + * Implementations must initiate the asynchronous execution of the sub-task * (if any) and return an appropriate action: - *

* * * @return the appropriate Action - * @throws Throwable if the sub task processing throws + * @throws Throwable if the sub-task processing throws */ protected abstract Action process() throws Throwable; @@ -179,44 +187,42 @@ protected void onCompleteFailure(Throwable cause) /** * This method must be invoked by applications to start the processing - * of sub tasks. It can be called at any time by any thread, and it's - * contract is that when called, then the {@link #process()} method will - * be called during or soon after, either by the calling thread or by - * another thread. + * of asynchronous sub-tasks. + *

+ * It can be called at any time by any thread, and its contract is that + * when called, then the {@link #process()} method will be called during + * or soon after, either by the calling thread or by another thread, but + * in either case by one thread only. */ public void iterate() { boolean process = false; - loop: - while (true) + try (Locker.Lock lock = _locker.lock()) { - try (Locker.Lock lock = _locker.lock()) + switch (_state) { - switch (_state) - { - case PENDING: - case CALLED: - // process will be called when callback is handled - break loop; + case PENDING: + case CALLED: + // process will be called when callback is handled + break; - case IDLE: - _state = State.PROCESSING; - process = true; - break loop; + case IDLE: + _state = State.PROCESSING; + process = true; + break; - case PROCESSING: - _iterate = true; - break loop; + case PROCESSING: + _iterate = true; + break; - case FAILED: - case SUCCEEDED: - break loop; + case FAILED: + case SUCCEEDED: + break; - case CLOSED: - default: - throw new IllegalStateException(toString()); - } + case CLOSED: + default: + throw new IllegalStateException(toString()); } } if (process) @@ -228,14 +234,15 @@ private void processing() // This should only ever be called when in processing state, however a failed or close call // may happen concurrently, so state is not assumed. - boolean onCompleteSuccess = false; + boolean notifyCompleteSuccess = false; + Throwable notifyCompleteFailure = null; // While we are processing processing: while (true) { // Call process to get the action that we have to take. - Action action; + Action action = null; try { action = process(); @@ -243,7 +250,7 @@ private void processing() catch (Throwable x) { failed(x); - break processing; + // Fall through to possibly invoke onCompleteFailure(). } // acted on the action we have just received @@ -253,65 +260,63 @@ private void processing() { case PROCESSING: { - switch (action) + if (action != null) { - case IDLE: + switch (action) { - // Has iterate been called while we were processing? - if (_iterate) + case IDLE: + { + // Has iterate been called while we were processing? + if (_iterate) + { + // yes, so skip idle and keep processing + _iterate = false; + continue; + } + + // No, so we can go idle + _state = State.IDLE; + break processing; + } + case SCHEDULED: + { + // we won the race against the callback, so the callback has to process and we can break processing + _state = State.PENDING; + break processing; + } + case SUCCEEDED: { - // yes, so skip idle and keep processing + // we lost the race against the callback, _iterate = false; - _state = State.PROCESSING; - continue processing; + _state = State.SUCCEEDED; + notifyCompleteSuccess = true; + break processing; + } + default: + { + break; } - - // No, so we can go idle - _state = State.IDLE; - break processing; - } - - case SCHEDULED: - { - // we won the race against the callback, so the callback has to process and we can break processing - _state = State.PENDING; - break processing; - } - - case SUCCEEDED: - { - // we lost the race against the callback, - _iterate = false; - _state = State.SUCCEEDED; - onCompleteSuccess = true; - break processing; } - - default: - break; } throw new IllegalStateException(String.format("%s[action=%s]", this, action)); } case CALLED: { - switch (action) - { - case SCHEDULED: - { - // we lost the race, so we have to keep processing - _state = State.PROCESSING; - continue processing; - } - - default: - throw new IllegalStateException(String.format("%s[action=%s]", this, action)); - } + if (action != Action.SCHEDULED) + throw new IllegalStateException(String.format("%s[action=%s]", this, action)); + // we lost the race, so we have to keep processing + _state = State.PROCESSING; + continue; } - case SUCCEEDED: case FAILED: case CLOSED: + notifyCompleteFailure = _failure; + _failure = null; + break processing; + + case SUCCEEDED: break processing; case IDLE: @@ -322,14 +327,17 @@ private void processing() } } - if (onCompleteSuccess) + if (notifyCompleteSuccess) onCompleteSuccess(); + else if (notifyCompleteFailure != null) + onCompleteFailure(notifyCompleteFailure); } /** - * Invoked when the sub task succeeds. - * Subclasses that override this method must always remember to call - * {@code super.succeeded()}. + * Method to invoke when the asynchronous sub-task succeeds. + *

+ * Subclasses that override this method must always remember + * to call {@code super.succeeded()}. */ @Override public void succeeded() @@ -367,9 +375,18 @@ public void succeeded() } /** - * Invoked when the sub task fails. - * Subclasses that override this method must always remember to call - * {@code super.failed(Throwable)}. + * Method to invoke when the asynchronous sub-task fails, + * or to fail the overall asynchronous task and therefore + * terminate the iteration. + *

+ * Subclasses that override this method must always remember + * to call {@code super.failed(Throwable)}. + *

+ * Eventually, {@link #onCompleteFailure(Throwable)} is + * called, either by the caller thread or by the processing + * thread. + * + * @see #isFailed() */ @Override public void failed(Throwable x) @@ -386,12 +403,15 @@ public void failed(Throwable x) case CALLED: // too late!. break; - case PENDING: + { + failure = true; + break; + } case PROCESSING: { _state = State.FAILED; - failure = true; + _failure = x; break; } default: @@ -402,6 +422,15 @@ public void failed(Throwable x) onCompleteFailure(x); } + /** + * Method to invoke to forbid further invocations to {@link #iterate()} + * and {@link #reset()}. + *

+ * When this method is invoked during processing, it behaves like invoking + * {@link #failed(Throwable)}. + * + * @see #isClosed() + */ public void close() { String failure = null; @@ -415,12 +444,18 @@ public void close() _state = State.CLOSED; break; + case PROCESSING: + _failure = new IOException(String.format("Close %s in state %s", this, _state)); + _state = State.CLOSED; + break; + case CLOSED: break; default: failure = String.format("Close %s in state %s", this, _state); _state = State.CLOSED; + break; } } @@ -428,9 +463,8 @@ public void close() onCompleteFailure(new IOException(failure)); } - /* - * only for testing - * @return whether this callback is idle and {@link #iterate()} needs to be called + /** + * @return whether this callback is idle, and {@link #iterate()} needs to be called */ boolean isIdle() { @@ -440,6 +474,9 @@ boolean isIdle() } } + /** + * @return whether this callback has been {@link #close() closed} + */ public boolean isClosed() { try (Locker.Lock lock = _locker.lock()) @@ -449,7 +486,7 @@ public boolean isClosed() } /** - * @return whether this callback has failed + * @return whether this callback has been {@link #failed(Throwable) failed} */ public boolean isFailed() { @@ -460,7 +497,9 @@ public boolean isFailed() } /** - * @return whether this callback has succeeded + * @return whether this callback and the overall asynchronous task has been succeeded + * + * @see #onCompleteSuccess() */ public boolean isSucceeded() { @@ -473,9 +512,9 @@ public boolean isSucceeded() /** * Resets this callback. *

- * A callback can only be reset to IDLE from the - * SUCCEEDED or FAILED states or if it is already IDLE. - *

+ * A callback can only be reset to the idle state from the + * {@link #isSucceeded() succeeded} or {@link #isFailed() failed} states + * or if it is already idle. * * @return true if the reset was successful */ @@ -490,8 +529,9 @@ public boolean reset() case SUCCEEDED: case FAILED: - _iterate = false; _state = State.IDLE; + _failure = null; + _iterate = false; return true; default: