Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-67164] Call StepExecution.onResume in response to WorkflowRun.onLoad not FlowExecutionList.ItemListenerImpl #221

Merged
merged 5 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ public Iterable<BlockStartNode> iterateEnclosingBlocks(@NonNull FlowNode node) {
protected void notifyShutdown() {
// Default is no-op
}

/**
* Called after a restart and any attempts at {@link StepExecution#onResume} have completed.
* This is a signal that it is safe to resume program execution.
* By default, does nothing.
*/
protected void afterStepExecutionsResumed() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import hudson.Extension;
import hudson.ExtensionList;
import hudson.XmlFile;
import hudson.init.InitMilestone;
import hudson.init.Terminator;
import hudson.model.Computer;
import hudson.model.listeners.ItemListener;
import hudson.remoting.SingleLaneExecutorService;
import hudson.util.CopyOnWriteList;
Expand All @@ -22,15 +24,23 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;
import org.kohsuke.accmod.restrictions.DoNotUse;

/**
Expand All @@ -44,6 +54,8 @@ public class FlowExecutionList implements Iterable<FlowExecution> {
private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get());
private XmlFile configFile;

private transient volatile boolean resumptionComplete;

public FlowExecutionList() {
load();
}
Expand Down Expand Up @@ -160,11 +172,17 @@ public static FlowExecutionList get() {
}

/**
* @deprecated Only exists for binary compatibility.
* Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded.
*
* <p>This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup.
*
* <p>Useful to avoid resuming Pipelines in contexts that may lead to deadlock.
*
* <p>It is <em>not</em> guaranteed that {@link FlowExecution#afterStepExecutionsResumed} has been called at this point.
*/
@Deprecated
@Restricted(Beta.class)
public boolean isResumptionComplete() {
return false;
return resumptionComplete;
}

/**
Expand All @@ -179,29 +197,8 @@ public void onLoaded() {
for (final FlowExecution e : list) {
// The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline.
LOGGER.log(Level.FINE, "Eagerly loaded {0}", e);
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}
}, MoreExecutors.directExecutor());
}
list.resumptionComplete = true;
}
}

Expand Down Expand Up @@ -256,4 +253,132 @@ public void onFailure(@NonNull Throwable t) {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}

/**
* Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}.
*
* <p>Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes
* regardless of its presence in {@link FlowExecutionList}.
*/
@Extension
public static class ResumeStepExecutionListener extends FlowExecutionListener {
@Override
public void onResumed(@NonNull FlowExecution e) {
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
if (e.isComplete()) {
// WorkflowRun.onLoad will not fireResumed if the execution was already complete when loaded,
// and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive.
return;
}
FlowExecutionList list = FlowExecutionList.get();
FlowExecutionOwner owner = e.getOwner();
if (!list.runningTasks.contains(owner)) {
LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()});
list.register(owner);
}
LOGGER.log(Level.FINE, "Will resume {0}", result);
new ParallelResumer(result, e::afterStepExecutionsResumed).run();
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
e.afterStepExecutionsResumed();
}

}, MoreExecutors.directExecutor());
}
}

/** Calls {@link StepExecution#onResume} for each step in a running build.
* Does so in parallel, but always completing enclosing blocks before the enclosed step.
* A simplified version of https://stackoverflow.com/a/67449067/12916, since this should be a tree not a general DAG.
*/
private static final class ParallelResumer {
Copy link
Member

@dwnusbaum dwnusbaum May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required for correctness or is it an optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an optimization. Previously all steps were resumed serially in a topological order. (Or I hope getCurrentExecutions enforced a topological order; the sorting by CpsThread is a bit opaque to me.) The problem in jenkinsci/workflow-durable-task-step-plugin#180 arises when you have a bunch of parallel branches all running node and all of them are going to fail due to missing agents. Previously, Jenkins would wait 5m for the first agent, report that it was gone, then wait 5m for the next agent, etc. Now all the node blocks are resumed in parallel, followed by the sh steps inside them, etc. Have a test demonstrating this in workflow-durable-task-step but only now got an incremental build from jenkinsci/workflow-cps-plugin#534 that I needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess it's kind of unusual for ExecutorStepExecution.onResume to block if no other steps do so, but looking through jenkinsci/workflow-durable-task-step-plugin#180 it seems like you already tried various alternatives, so this seems fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this is the replacement for the TryRepeatedly pickle. I played with a lot of alternatives indeed. I am not claiming this approach is ideal but it seems to be straightforward enough and do the job.

Earlier, before hitting on the idea of having onResume block, I had hoped to arrange it so that the program would resume right away, and then a sh step running on the dead agent would eventually figure out there was no hope and abort. I ran into problems in functional tests, though, things like

node('agent') {
  input 'Proceed?' // restart here
  sh 'true'
}

The program would run past input to the sh step, which would then either fail with a MissingContextVariableException (confusing) or block the CPS VM thread inside DSL (which will throw an error if it blocks >5s). Admittedly this is an artificial case (you should not hold an executor like that) but a lot of tests were failing that I had to work around in artificial ways and it seemed uncomfortable. It would have been possible to fix all these cases but only in pretty intrusive ways: by defining a new StepContext object in lieu of FilePath (perhaps DynamicFilePathContext.Representation) which could be deserialized without pickles, but every step currently expecting FilePath would need to be adjusted to expect this instead, and then the step would need to have complicated logic to wait for the new contextual object to be ready (translatable to a live FilePath) before doing anything useful.

The compromise I settled on is closer to the original pickle behavior in that CPS code and steps do not run until an attempt is made to restore the original context: until all open node blocks have gotten their agent to reconnect, or timed out trying. The difference is only that in case this fails (with a timeout), the error is thrown out of StepExecution.onResume and thus StepContext.onFailure so it becomes a properly modeled Throwable with a CPS VM stack trace that we can catch and handle—unlike the original situation where the entire build would be effectively hard-killed with no possible cleanup.


private final Runnable onCompletion;
/** Step nodes mapped to the step execution. Entries removed when they are ready to be resumed. */
private final Map<FlowNode, StepExecution> nodes = new HashMap<>();
/** Step nodes currently being resumed. Removed after resumption completes. */
private final Set<FlowNode> processing = new HashSet<>();
/** Step nodes mapped to the nearest enclosing step node (no entry if at root). */
private final Map<FlowNode, FlowNode> enclosing = new HashMap<>();

ParallelResumer(Collection<StepExecution> executions, Runnable onCompletion) {
this.onCompletion = onCompletion;
// First look up positions in the flow graph, so that we can compute dependencies:
for (StepExecution se : executions) {
try {
FlowNode n = se.getContext().get(FlowNode.class);
if (n != null) {
nodes.put(n, se);
} else {
LOGGER.warning(() -> "Could not find FlowNode for " + se + " so it will not be resumed");
}
} catch (IOException | InterruptedException x) {
LOGGER.log(Level.WARNING, "Could not look up FlowNode for " + se + " so it will not be resumed", x);
}
}
for (FlowNode n : nodes.keySet()) {
LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner();
scanner.setup(n);
for (FlowNode parent : scanner) {
if (parent != n && nodes.containsKey(parent)) {
enclosing.put(n, parent);
break;
}
}
}
}

synchronized void run() {
LOGGER.fine(() -> "Checking status with nodes=" + nodes + " enclosing=" + enclosing + " processing=" + processing);
if (nodes.isEmpty()) {
if (processing.isEmpty()) {
LOGGER.fine("Done");
onCompletion.run();
}
return;
}
Map<FlowNode, StepExecution> ready = new HashMap<>();
for (Map.Entry<FlowNode, StepExecution> entry : nodes.entrySet()) {
FlowNode n = entry.getKey();
FlowNode parent = enclosing.get(n);
if (parent == null || !nodes.containsKey(parent)) {
ready.put(n, entry.getValue());
}
}
LOGGER.fine(() -> "Ready to resume: " + ready);
nodes.keySet().removeAll(ready.keySet());
for (Map.Entry<FlowNode, StepExecution> entry : ready.entrySet()) {
FlowNode n = entry.getKey();
StepExecution exec = entry.getValue();
processing.add(n);
// Strictly speaking threadPoolForRemoting should be used for agent communications.
// In practice the onResume impl known to block is in ExecutorStepExecution.
jglick marked this conversation as resolved.
Show resolved Hide resolved
// Avoid jenkins.util.Timer since it is capped at 10 threads and should not be used for long tasks.
Computer.threadPoolForRemoting.submit(() -> {
LOGGER.fine(() -> "About to resume " + n + " ~ " + exec);
try {
exec.onResume();
} catch (Throwable x) {
exec.getContext().onFailure(x);
}
LOGGER.fine(() -> "Finished resuming " + n + " ~ " + exec);
synchronized (ParallelResumer.this) {
processing.remove(n);
run();
}
});
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void onCompleted(@NonNull FlowExecution execution) {
* Fires the {@link #onCreated(FlowExecution)} event.
*/
public static void fireCreated(@NonNull FlowExecution execution) {
// TODO Jenkins 2.325+ use Listeners.notify
for (FlowExecutionListener listener : ExtensionList.lookup(FlowExecutionListener.class)) {
listener.onCreated(execution);
}
Expand Down
Loading