Skip to content

Commit

Permalink
[JENKINS-67164] Call StepExecution.onResume in response to `Workflo…
Browse files Browse the repository at this point in the history
…wRun.onLoad` not `FlowExecutionList.ItemListenerImpl`
  • Loading branch information
jglick committed May 11, 2022
1 parent b912c0e commit 8fba087
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ public Iterable<BlockStartNode> iterateEnclosingBlocks(@NonNull FlowNode node) {
protected void notifyShutdown() {
// Default is no-op
}

/**
* Called after a restart and any attempts at {@link StepExecution#onResume} have completed.
* This is a signal that it is safe to resume program execution.
* By default, does nothing.
*/
protected void afterStepExecutionsResumed() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import hudson.Extension;
import hudson.ExtensionList;
import hudson.XmlFile;
import hudson.init.InitMilestone;
import hudson.init.Terminator;
import hudson.model.listeners.ItemListener;
import hudson.remoting.SingleLaneExecutorService;
Expand All @@ -31,6 +32,7 @@
import java.util.logging.Logger;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;
import org.kohsuke.accmod.restrictions.DoNotUse;

/**
Expand All @@ -44,6 +46,8 @@ public class FlowExecutionList implements Iterable<FlowExecution> {
private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get());
private XmlFile configFile;

private transient volatile boolean resumptionComplete;

public FlowExecutionList() {
load();
}
Expand Down Expand Up @@ -160,11 +164,17 @@ public static FlowExecutionList get() {
}

/**
* @deprecated Only exists for binary compatibility.
* Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded.
*
* <p>This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup.
*
* <p>Useful to avoid resuming Pipelines in contexts that may lead to deadlock.
*
* <p>It is <em>not</em> guaranteed that {@link FlowExecution#afterStepExecutionsResumed} has been called at this point.
*/
@Deprecated
@Restricted(Beta.class)
public boolean isResumptionComplete() {
return false;
return resumptionComplete;
}

/**
Expand All @@ -179,29 +189,8 @@ public void onLoaded() {
for (final FlowExecution e : list) {
// The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline.
LOGGER.log(Level.FINE, "Eagerly loaded {0}", e);
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}
}, MoreExecutors.directExecutor());
}
list.resumptionComplete = true;
}
}

Expand Down Expand Up @@ -256,4 +245,56 @@ public void onFailure(@NonNull Throwable t) {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}

/**
* Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}.
*
* <p>Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes
* regardless of its presence in {@link FlowExecutionList}.
*/
@Extension
public static class ResumeStepExecutionListener extends FlowExecutionListener {
@Override
public void onResumed(@NonNull FlowExecution e) {
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
try {
if (e.isComplete()) {
// WorkflowRun.onLoad will not fireResumed if the execution was already complete when loaded,
// and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive.
return;
}
FlowExecutionList list = FlowExecutionList.get();
FlowExecutionOwner owner = e.getOwner();
if (!list.runningTasks.contains(owner)) {
LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()});
list.register(owner);
}
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
} finally {
e.afterStepExecutionsResumed();
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
e.afterStepExecutionsResumed();
}

}, Timer.get()); // We always hold RunMap and WorkflowRun locks here, so we resume steps on a different thread to avoid potential deadlocks. See JENKINS-67351.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void onCompleted(@NonNull FlowExecution execution) {
* Fires the {@link #onCreated(FlowExecution)} event.
*/
public static void fireCreated(@NonNull FlowExecution execution) {
// TODO Jenkins 2.325+ use Listeners.notify
for (FlowExecutionListener listener : ExtensionList.lookup(FlowExecutionListener.class)) {
listener.onCreated(execution);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,44 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNotNull;

import hudson.AbortException;
import hudson.model.ParametersAction;
import hudson.model.ParametersDefinitionProperty;
import hudson.model.Result;
import hudson.model.StringParameterDefinition;
import hudson.model.StringParameterValue;
import hudson.model.TaskListener;
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.Rule;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsSessionRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

public class FlowExecutionListTest {

Expand Down Expand Up @@ -79,4 +99,113 @@ public class FlowExecutionListTest {
});
}

@Test public void forceLoadRunningExecutionsAfterRestart() throws Throwable {
logging.capture(50);
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("semaphore('wait')", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", b);
});
sessions.then(r -> {
/*
Make sure that the build gets loaded automatically by FlowExecutionList$ItemListenerImpl before we load it explictly.
Expected call stack for resuming a Pipelines and its steps:
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:250)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:247)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:975)
at org.jenkinsci.plugins.workflow.flow.DirectExecutor.execute(DirectExecutor.java:33)
... Guava Futures API internals ...
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:985)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener.onResumed(FlowExecutionList.java:247)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionListener.fireResumed(FlowExecutionListener.java:84)
at org.jenkinsci.plugins.workflow.job.WorkflowRun.onLoad(WorkflowRun.java:528)
at hudson.model.RunMap.retrieve(RunMap.java:225)
... RunMap internals ...
at hudson.model.RunMap.getById(RunMap.java:205)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.run(WorkflowRun.java:937)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.get(WorkflowRun.java:948)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:65)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:57)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.SUCCESS, b);
});
}

@Issue("JENKINS-67164")
@Test public void resumeStepExecutions() throws Throwable {
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("noResume()", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
r.waitForMessage("Starting non-resumable step", b);
// TODO: Unclear how this might happen in practice.
FlowExecutionList.get().unregister(b.asFlowExecutionOwner());
});
sessions.then(r -> {
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.FAILURE, b);
r.assertLogContains("Unable to resume NonResumableStep", b);
});
}

public static class NonResumableStep extends Step implements Serializable {
public static final long serialVersionUID = 1L;
@DataBoundConstructor
public NonResumableStep() { }
@Override
public StepExecution start(StepContext sc) {
return new ExecutionImpl(sc);
}

private static class ExecutionImpl extends StepExecution implements Serializable {
public static final long serialVersionUID = 1L;
private ExecutionImpl(StepContext sc) {
super(sc);
}
@Override
public boolean start() throws Exception {
getContext().get(TaskListener.class).getLogger().println("Starting non-resumable step");
return false;
}
@Override
public void onResume() {
getContext().onFailure(new AbortException("Unable to resume NonResumableStep"));
}
}

@TestExtension public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "noResume";
}
}
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}

0 comments on commit 8fba087

Please sign in to comment.