Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart preempted jobs v2 #113

Merged
merged 14 commits into from
Jul 25, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import hudson.util.FormValidation;
import hudson.util.HttpResponses;
import hudson.util.ListBoxModel;
import hudson.util.StreamTaskListener;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -251,7 +250,7 @@ public Collection<PlannedNode> provision(Label label, int excessWorkload) {
// Get next config in round robin fashion
InstanceConfiguration config = configs.get(i % configs.size());

final ComputeEngineInstance node = config.provision(StreamTaskListener.fromStdout());
final ComputeEngineInstance node = config.provision();
Jenkins.get().addNode(node);
result.add(createPlannedNode(config, node));
excessWorkload -= node.getNumExecutors();
Expand Down Expand Up @@ -408,7 +407,7 @@ public HttpResponse doProvision(@QueryParameter String configuration)
throw HttpResponses.error(SC_BAD_REQUEST, "No such Instance Configuration: " + configuration);
}

ComputeEngineInstance node = c.provision(StreamTaskListener.fromStdout());
ComputeEngineInstance node = c.provision();
if (node == null) throw HttpResponses.error(SC_BAD_REQUEST, "Could not provision new node.");
Jenkins.get().addNode(node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,101 @@
package com.google.jenkins.plugins.computeengine;

import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.Scheduling;
import hudson.model.Executor;
import hudson.model.Result;
import hudson.model.TaskListener;
import hudson.slaves.AbstractCloudComputer;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.model.CauseOfInterruption;
import lombok.extern.java.Log;
import org.kohsuke.stapler.DataBoundSetter;
import org.kohsuke.stapler.HttpRedirect;
import org.kohsuke.stapler.HttpResponse;

@Log
public class ComputeEngineComputer extends AbstractCloudComputer<ComputeEngineInstance> {

private volatile Instance instance;

private static final Logger LOGGER = Logger.getLogger(ComputeEngineCloud.class.getName());
private CompletableFuture<Boolean> preemptedFuture;

public ComputeEngineComputer(ComputeEngineInstance slave) {
super(slave);
}

@Override
public ComputeEngineInstance getNode() {
return (ComputeEngineInstance) super.getNode();
}

public void onConnected() {
void onConnected(TaskListener listener) {
ComputeEngineInstance node = getNode();
if (node != null) {
node.onConnected();
if (getPreemptible()) {
String nodeName = node.getNodeName();
final String msg =
"Instance " + nodeName + " is preemptive, setting up preemption listener";
log.log(Level.INFO, msg);
listener.getLogger().println(msg);
preemptedFuture =
CompletableFuture.supplyAsync(
() -> getPreemptedStatus(listener, nodeName), threadPoolForRemoting);
}
}
}

private Boolean getPreemptedStatus(TaskListener listener, String nodeName) {
try {
boolean value = getChannel().call(new PreemptedCheckCallable(listener));
log.log(Level.FINE, "Got information that node was preempted with value [" + value + "]");
if (value) {
log.log(Level.FINE, "Preempted node was preempted, terminating all executors");
getChannel().close();
getExecutors().forEach(executor -> interruptExecutor(executor, nodeName));
}
return value;
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}

private void interruptExecutor(Executor executor, String nodeName) {
log.log(Level.INFO, "Terminating executor " + executor + " node " + nodeName);
executor.interrupt(
Result.FAILURE,
new CauseOfInterruption() {
@Override
public String getShortDescription() {
return "Instance " + nodeName + " was preempted";
}
});
}

/**
* Check if instance is preemptible.
*
* @return true if instance was set as preemptible.
*/
public boolean getPreemptible() {
ingwarsw marked this conversation as resolved.
Show resolved Hide resolved
try {
Scheduling scheduling = getInstance().getScheduling();
return scheduling != null && scheduling.getPreemptible();
} catch (IOException e) {
log.log(Level.WARNING, "Error when getting preemptible status", e);
return false;
ingwarsw marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Check if instance was actually preempted.
*
* @return true if instance was preempted (we can use it to reschedule job in this case).
*/
public boolean getPreempted() {
try {
return preemptedFuture != null && preemptedFuture.isDone() && preemptedFuture.get();
} catch (InterruptedException | ExecutionException e) {
log.log(Level.WARNING, "Error when getting preempted status", e);
return false;
ingwarsw marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -119,12 +187,10 @@ public HttpResponse doDoDelete() throws IOException {
ComputeEngineInstance node = getNode();
if (node != null) {
try {
ComputeEngineCloud cloud = getCloud();

node.terminate();
} catch (InterruptedException ie) {
// Termination Exception
LOGGER.log(Level.WARNING, "Node Termination Error", ie);
log.log(Level.WARNING, "Node Termination Error", ie);
}
}
return new HttpRedirect("..");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class ComputeEngineComputerListener extends ComputerListener {
@Override
public void onOnline(Computer c, TaskListener listener) {
if (c instanceof ComputeEngineComputer) {
((ComputeEngineComputer) c).onConnected();
ComputeEngineComputer computer = (ComputeEngineComputer) c;
computer.onConnected(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected void _terminate(TaskListener listener) throws IOException, Interrupted

// If the instance is running, attempt to terminate it. This is an asynch call and we
// return immediately, hoping for the best.
cloud.getClient().terminateInstanceWithStatus(cloud.getProjectId(), zone, name, "RUNNING");
cloud.getClient().terminateInstance(cloud.getProjectId(), zone, name);
ingwarsw marked this conversation as resolved.
Show resolved Hide resolved
} catch (CloudNotFoundException cnfe) {
listener.error(cnfe.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.jenkins.plugins.computeengine;

import com.google.common.collect.ImmutableList;
import hudson.model.Action;
import hudson.model.Cause;
import hudson.model.CauseAction;
import hudson.model.Executor;
import hudson.model.ExecutorListener;
import hudson.model.Job;
import hudson.model.Queue;
import hudson.security.ACL;
import hudson.security.ACLContext;
import hudson.slaves.RetentionStrategy;
import java.util.List;
import java.util.logging.Level;
import jenkins.model.Jenkins;
import lombok.extern.java.Log;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;

/**
* A strategy that allows: - setting one shot instances {@link OnceRetentionStrategy} - in case of
* preemption of GCP instance to restart preempted tasks
*/
@Log
public class ComputeEngineRetentionStrategy extends RetentionStrategy<ComputeEngineComputer>
implements ExecutorListener {
private final OnceRetentionStrategy delegate;
private final boolean oneShot;

/**
* Creates the retention strategy.
*
* @param retentionTimeMinutes Number of minutes of idleness after which to kill the slave; serves
* a backup in case the strategy fails to detect the end of a task.
* @param oneShot Create one shot instance strategy.
*/
ComputeEngineRetentionStrategy(int retentionTimeMinutes, boolean oneShot) {
this.oneShot = oneShot;
delegate = new OnceRetentionStrategy(retentionTimeMinutes);
}

@Override
public long check(ComputeEngineComputer c) {
return delegate.check(c);
ingwarsw marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void start(ComputeEngineComputer c) {
delegate.start(c);
}

@Override
public void taskAccepted(Executor executor, Queue.Task task) {
if (oneShot) {
delegate.taskAccepted(executor, task);
}
}

@Override
public void taskCompleted(Executor executor, Queue.Task task, long durationMS) {
if (wasPreempted(executor)) {
rescheduleTask(task);
}
if (oneShot) {
delegate.taskCompleted(executor, task, durationMS);
}
}

@Override
public void taskCompletedWithProblems(
Executor executor, Queue.Task task, long durationMS, Throwable problems) {
if (wasPreempted(executor)) {
rescheduleTask(task);
}
if (oneShot) {
delegate.taskCompletedWithProblems(executor, task, durationMS, problems);
}
}

private Queue.Task getBaseTask(Queue.Task task) {
Queue.Task parent = task.getOwnerTask();
while (task != parent) {
task = parent;
parent = task.getOwnerTask();
}
return parent;
}

private boolean wasPreempted(Executor executor) {
ComputeEngineComputer computer = (ComputeEngineComputer) executor.getOwner();
final boolean preempted = computer.getPreempted();
return preempted;
}

private void rescheduleTask(Queue.Task task) {
Queue.Task baseTask = getBaseTask(task);
log.log(Level.INFO, baseTask + " was preempted, rescheduling");
List<Action> actions = generateActionsForTask(task);
try (ACLContext notUsed = ACL.as(task.getDefaultAuthentication())) {
Jenkins.get().getQueue().schedule2(baseTask, 0, actions);
}
}

private List<Action> generateActionsForTask(Queue.Task task) {
Queue.Task baseTask = getBaseTask(task);
try {
final Job job = (Job) baseTask;
final List causes = job.getLastBuild().getCauses();
log.log(Level.FINE, "Original causes: " + causes);
} catch (Exception e) {
log.log(Level.WARNING, "Exception for " + baseTask, e);
}
return ImmutableList.of(
new CauseAction(new Cause.UserIdCause()), new CauseAction(new RebuildCause()));
}

public static class RebuildCause extends Cause {
@Override
public String getShortDescription() {
return Messages.RebuildCause_ShortDescription();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.model.Jenkins;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,10 @@
import hudson.model.Descriptor;
import hudson.model.Label;
import hudson.model.Node;
import hudson.model.TaskListener;
import hudson.model.labels.LabelAtom;
import hudson.slaves.CloudRetentionStrategy;
import hudson.util.FormValidation;
import hudson.util.ListBoxModel;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -64,16 +61,17 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import jenkins.model.Jenkins;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.jenkinsci.plugins.durabletask.executors.OnceRetentionStrategy;
import org.kohsuke.stapler.AncestorInPath;
import org.kohsuke.stapler.DataBoundConstructor;
import org.kohsuke.stapler.DataBoundSetter;
Expand All @@ -85,6 +83,7 @@
* until lombok 1.18.8 is released. */
@Builder(builderClassName = "Builder", buildMethodName = "notbuild")
@AllArgsConstructor
@Log
public class InstanceConfiguration implements Describable<InstanceConfiguration> {
public static final String SSH_METADATA_KEY = "ssh-keys";
public static final Long DEFAULT_BOOT_DISK_SIZE_GB = 10L;
Expand Down Expand Up @@ -276,14 +275,13 @@ public void appendLabel(String key, String value) {
googleLabels.put(key, value);
}

public ComputeEngineInstance provision(TaskListener listener) throws IOException {
PrintStream logger = listener.getLogger();
public ComputeEngineInstance provision() throws IOException {
try {
Instance instance = instance();
// TODO: JENKINS-55285
Operation operation =
cloud.getClient().insertInstance(cloud.getProjectId(), template, instance);
logger.println("Sent insert request for instance configuration [" + description + "]");
log.info("Sent insert request for instance configuration [" + description + "]");
String targetRemoteFs = this.remoteFs;
ComputeEngineComputerLauncher launcher;
if (this.windowsConfiguration != null) {
Expand Down Expand Up @@ -315,16 +313,13 @@ public ComputeEngineInstance provision(TaskListener listener) throws IOException
.mode(mode)
.labelString(labels)
.launcher(launcher)
.retentionStrategy(
oneShot
? new OnceRetentionStrategy(retentionTimeMinutes)
: new CloudRetentionStrategy(retentionTimeMinutes))
.retentionStrategy(new ComputeEngineRetentionStrategy(retentionTimeMinutes, oneShot))
.launchTimeout(getLaunchTimeoutMillis())
.javaExecPath(javaExecPath)
.sshKeyPair(sshKeyPair)
.build();
} catch (Descriptor.FormException fe) {
logger.printf("Error provisioning instance: %s", fe.getMessage());
log.log(Level.WARNING, "Error provisioning instance: " + fe.getMessage(), fe);
return null;
}
}
Expand Down
Loading