diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecution.java index b1fa3af9..27273754 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecution.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecution.java @@ -322,4 +322,12 @@ public Iterable iterateEnclosingBlocks(@NonNull FlowNode node) { protected void notifyShutdown() { // Default is no-op } + + /** + * Called after a restart and any attempts at {@link StepExecution#onResume} have completed. + * This is a signal that it is safe to resume program execution. + * By default, does nothing. + */ + protected void afterStepExecutionsResumed() {} + } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java index b04be346..13d4194b 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java @@ -10,7 +10,9 @@ import hudson.Extension; import hudson.ExtensionList; import hudson.XmlFile; +import hudson.init.InitMilestone; import hudson.init.Terminator; +import hudson.model.Computer; import hudson.model.listeners.ItemListener; import hudson.remoting.SingleLaneExecutorService; import hudson.util.CopyOnWriteList; @@ -22,15 +24,23 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import org.jenkinsci.plugins.workflow.graph.FlowNode; +import org.jenkinsci.plugins.workflow.graphanalysis.LinearBlockHoppingScanner; import org.kohsuke.accmod.Restricted; +import org.kohsuke.accmod.restrictions.Beta; import org.kohsuke.accmod.restrictions.DoNotUse; /** @@ -44,6 +54,8 @@ public class FlowExecutionList implements Iterable { private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get()); private XmlFile configFile; + private transient volatile boolean resumptionComplete; + public FlowExecutionList() { load(); } @@ -160,11 +172,17 @@ public static FlowExecutionList get() { } /** - * @deprecated Only exists for binary compatibility. + * Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded. + * + *

This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup. + * + *

Useful to avoid resuming Pipelines in contexts that may lead to deadlock. + * + *

It is not guaranteed that {@link FlowExecution#afterStepExecutionsResumed} has been called at this point. */ - @Deprecated + @Restricted(Beta.class) public boolean isResumptionComplete() { - return false; + return resumptionComplete; } /** @@ -179,29 +197,8 @@ public void onLoaded() { for (final FlowExecution e : list) { // The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline. LOGGER.log(Level.FINE, "Eagerly loaded {0}", e); - Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback>() { - @Override - public void onSuccess(@NonNull List result) { - LOGGER.log(Level.FINE, "Will resume {0}", result); - for (StepExecution se : result) { - try { - se.onResume(); - } catch (Throwable x) { - se.getContext().onFailure(x); - } - } - } - - @Override - public void onFailure(@NonNull Throwable t) { - if (t instanceof CancellationException) { - LOGGER.log(Level.FINE, "Cancelled load of " + e, t); - } else { - LOGGER.log(Level.WARNING, "Failed to load " + e, t); - } - } - }, MoreExecutors.directExecutor()); } + list.resumptionComplete = true; } } @@ -256,4 +253,132 @@ public void onFailure(@NonNull Throwable t) { executor.shutdown(); executor.awaitTermination(1, TimeUnit.MINUTES); } + + /** + * Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}. + * + *

Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes + * regardless of its presence in {@link FlowExecutionList}. + */ + @Extension + public static class ResumeStepExecutionListener extends FlowExecutionListener { + @Override + public void onResumed(@NonNull FlowExecution e) { + Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback>() { + @Override + public void onSuccess(@NonNull List result) { + if (e.isComplete()) { + // WorkflowRun.onLoad will not fireResumed if the execution was already complete when loaded, + // and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive. + return; + } + FlowExecutionList list = FlowExecutionList.get(); + FlowExecutionOwner owner = e.getOwner(); + if (!list.runningTasks.contains(owner)) { + LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()}); + list.register(owner); + } + LOGGER.log(Level.FINE, "Will resume {0}", result); + new ParallelResumer(result, e::afterStepExecutionsResumed).run(); + } + + @Override + public void onFailure(@NonNull Throwable t) { + if (t instanceof CancellationException) { + LOGGER.log(Level.FINE, "Cancelled load of " + e, t); + } else { + LOGGER.log(Level.WARNING, "Failed to load " + e, t); + } + e.afterStepExecutionsResumed(); + } + + }, MoreExecutors.directExecutor()); + } + } + + /** Calls {@link StepExecution#onResume} for each step in a running build. + * Does so in parallel, but always completing enclosing blocks before the enclosed step. + * A simplified version of https://stackoverflow.com/a/67449067/12916, since this should be a tree not a general DAG. + */ + private static final class ParallelResumer { + + private final Runnable onCompletion; + /** Step nodes mapped to the step execution. Entries removed when they are ready to be resumed. */ + private final Map nodes = new HashMap<>(); + /** Step nodes currently being resumed. Removed after resumption completes. */ + private final Set processing = new HashSet<>(); + /** Step nodes mapped to the nearest enclosing step node (no entry if at root). */ + private final Map enclosing = new HashMap<>(); + + ParallelResumer(Collection executions, Runnable onCompletion) { + this.onCompletion = onCompletion; + // First look up positions in the flow graph, so that we can compute dependencies: + for (StepExecution se : executions) { + try { + FlowNode n = se.getContext().get(FlowNode.class); + if (n != null) { + nodes.put(n, se); + } else { + LOGGER.warning(() -> "Could not find FlowNode for " + se + " so it will not be resumed"); + } + } catch (IOException | InterruptedException x) { + LOGGER.log(Level.WARNING, "Could not look up FlowNode for " + se + " so it will not be resumed", x); + } + } + for (FlowNode n : nodes.keySet()) { + LinearBlockHoppingScanner scanner = new LinearBlockHoppingScanner(); + scanner.setup(n); + for (FlowNode parent : scanner) { + if (parent != n && nodes.containsKey(parent)) { + enclosing.put(n, parent); + break; + } + } + } + } + + synchronized void run() { + LOGGER.fine(() -> "Checking status with nodes=" + nodes + " enclosing=" + enclosing + " processing=" + processing); + if (nodes.isEmpty()) { + if (processing.isEmpty()) { + LOGGER.fine("Done"); + onCompletion.run(); + } + return; + } + Map ready = new HashMap<>(); + for (Map.Entry entry : nodes.entrySet()) { + FlowNode n = entry.getKey(); + FlowNode parent = enclosing.get(n); + if (parent == null || !nodes.containsKey(parent)) { + ready.put(n, entry.getValue()); + } + } + LOGGER.fine(() -> "Ready to resume: " + ready); + nodes.keySet().removeAll(ready.keySet()); + for (Map.Entry entry : ready.entrySet()) { + FlowNode n = entry.getKey(); + StepExecution exec = entry.getValue(); + processing.add(n); + // Strictly speaking threadPoolForRemoting should be used for agent communications. + // In practice the only onResume impl known to block is in ExecutorStepExecution. + // Avoid jenkins.util.Timer since it is capped at 10 threads and should not be used for long tasks. + Computer.threadPoolForRemoting.submit(() -> { + LOGGER.fine(() -> "About to resume " + n + " ~ " + exec); + try { + exec.onResume(); + } catch (Throwable x) { + exec.getContext().onFailure(x); + } + LOGGER.fine(() -> "Finished resuming " + n + " ~ " + exec); + synchronized (ParallelResumer.this) { + processing.remove(n); + run(); + } + }); + } + } + + } + } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListener.java b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListener.java index 570cc6f8..bfdc5035 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListener.java @@ -62,6 +62,7 @@ public void onCompleted(@NonNull FlowExecution execution) { * Fires the {@link #onCreated(FlowExecution)} event. */ public static void fireCreated(@NonNull FlowExecution execution) { + // TODO Jenkins 2.325+ use Listeners.notify for (FlowExecutionListener listener : ExtensionList.lookup(FlowExecutionListener.class)) { listener.onCreated(execution); } diff --git a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java index 733a3859..8cf59f30 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java @@ -24,17 +24,35 @@ package org.jenkinsci.plugins.workflow.flow; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNotNull; +import hudson.AbortException; import hudson.model.ParametersAction; import hudson.model.ParametersDefinitionProperty; +import hudson.model.Result; import hudson.model.StringParameterDefinition; import hudson.model.StringParameterValue; +import hudson.model.TaskListener; import hudson.model.queue.QueueTaskFuture; +import java.io.Serializable; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Set; +import java.util.function.Supplier; import java.util.logging.Level; +import org.hamcrest.Matcher; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.job.WorkflowJob; import org.jenkinsci.plugins.workflow.job.WorkflowRun; +import org.jenkinsci.plugins.workflow.steps.Step; +import org.jenkinsci.plugins.workflow.steps.StepContext; +import org.jenkinsci.plugins.workflow.steps.StepDescriptor; +import org.jenkinsci.plugins.workflow.steps.StepExecution; +import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep; import org.junit.ClassRule; import org.junit.Test; import org.junit.Rule; @@ -42,6 +60,8 @@ import org.jvnet.hudson.test.Issue; import org.jvnet.hudson.test.LoggerRule; import org.jvnet.hudson.test.JenkinsSessionRule; +import org.jvnet.hudson.test.TestExtension; +import org.kohsuke.stapler.DataBoundConstructor; public class FlowExecutionListTest { @@ -79,4 +99,113 @@ public class FlowExecutionListTest { }); } + @Test public void forceLoadRunningExecutionsAfterRestart() throws Throwable { + logging.capture(50); + sessions.then(r -> { + WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p"); + p.setDefinition(new CpsFlowDefinition("semaphore('wait')", true)); + WorkflowRun b = p.scheduleBuild2(0).waitForStart(); + SemaphoreStep.waitForStart("wait/1", b); + }); + sessions.then(r -> { + /* + Make sure that the build gets loaded automatically by FlowExecutionList$ItemListenerImpl before we load it explictly. + Expected call stack for resuming a Pipelines and its steps: + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:250) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:247) + at com.google.common.util.concurrent.Futures$6.run(Futures.java:975) + at org.jenkinsci.plugins.workflow.flow.DirectExecutor.execute(DirectExecutor.java:33) + ... Guava Futures API internals ... + at com.google.common.util.concurrent.Futures.addCallback(Futures.java:985) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener.onResumed(FlowExecutionList.java:247) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionListener.fireResumed(FlowExecutionListener.java:84) + at org.jenkinsci.plugins.workflow.job.WorkflowRun.onLoad(WorkflowRun.java:528) + at hudson.model.RunMap.retrieve(RunMap.java:225) + ... RunMap internals ... + at hudson.model.RunMap.getById(RunMap.java:205) + at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.run(WorkflowRun.java:937) + at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.get(WorkflowRun.java:948) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:65) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:57) + at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) + at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) + at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175) + at jenkins.model.Jenkins.(Jenkins.java:1019) + */ + waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep"))); + WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); + SemaphoreStep.success("wait/1", null); + WorkflowRun b = p.getBuildByNumber(1); + r.waitForCompletion(b); + r.assertBuildStatus(Result.SUCCESS, b); + }); + } + + @Issue("JENKINS-67164") + @Test public void resumeStepExecutions() throws Throwable { + sessions.then(r -> { + WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p"); + p.setDefinition(new CpsFlowDefinition("noResume()", true)); + WorkflowRun b = p.scheduleBuild2(0).waitForStart(); + r.waitForMessage("Starting non-resumable step", b); + // TODO: Unclear how this might happen in practice. + FlowExecutionList.get().unregister(b.asFlowExecutionOwner()); + }); + sessions.then(r -> { + WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class); + WorkflowRun b = p.getBuildByNumber(1); + r.waitForCompletion(b); + r.assertBuildStatus(Result.FAILURE, b); + r.assertLogContains("Unable to resume NonResumableStep", b); + }); + } + + public static class NonResumableStep extends Step implements Serializable { + public static final long serialVersionUID = 1L; + @DataBoundConstructor + public NonResumableStep() { } + @Override + public StepExecution start(StepContext sc) { + return new ExecutionImpl(sc); + } + + private static class ExecutionImpl extends StepExecution implements Serializable { + public static final long serialVersionUID = 1L; + private ExecutionImpl(StepContext sc) { + super(sc); + } + @Override + public boolean start() throws Exception { + getContext().get(TaskListener.class).getLogger().println("Starting non-resumable step"); + return false; + } + @Override + public void onResume() { + getContext().onFailure(new AbortException("Unable to resume NonResumableStep")); + } + } + + @TestExtension public static class DescriptorImpl extends StepDescriptor { + @Override + public Set> getRequiredContext() { + return Collections.singleton(TaskListener.class); + } + @Override + public String getFunctionName() { + return "noResume"; + } + } + } + + /** + * Wait up to 5 seconds for the given supplier to return a matching value. + */ + private static void waitFor(Supplier valueSupplier, Matcher matcher) throws InterruptedException { + Instant end = Instant.now().plus(Duration.ofSeconds(5)); + while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) { + Thread.sleep(100L); + } + assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher); + } + }