Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Driver starting is included in restart policy. #859

Merged
merged 2 commits into from
Feb 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,12 @@ type TaskState struct {

const (
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand All @@ -163,4 +166,5 @@ type TaskEvent struct {
Signal int
Message string
KillError string
StartDelay int64
}
3 changes: 1 addition & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
if state.Failed() {
failed = true
} else {
dead = true
Expand Down
42 changes: 18 additions & 24 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -209,32 +210,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,

hostConfig := &docker.HostConfig{
// Convert MB to bytes. This is an absolute value.
//
// This value represents the total amount of memory a process can use.
// Swap is added to total memory and is managed by the OS, not docker.
// Since this may cause other processes to swap and cause system
// instability, we will simply not use swap.
//
// See: https://www.kernel.org/doc/Documentation/cgroups/memory.txt
Memory: int64(task.Resources.MemoryMB) * 1024 * 1024,
MemorySwap: -1,
// Convert Mhz to shares. This is a relative value.
//
// There are two types of CPU limiters available: Shares and Quotas. A
// Share allows a particular process to have a proportion of CPU time
// relative to other processes; 1024 by default. A CPU Quota is enforced
// over a Period of time and is a HARD limit on the amount of CPU time a
// process can use. Processes with quotas cannot burst, while processes
// with shares can, so we'll use shares.
//
// The simplest scale is 1 share to 1 MHz so 1024 = 1GHz. This means any
// given process will have at least that amount of resources, but likely
// more since it is (probably) rare that the machine will run at 100%
// CPU. This scale will cease to work if a node is overprovisioned.
//
// See:
// - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
CPUShares: int64(task.Resources.CPU),

// Binds are used to mount a host volume into the container. We mount a
Expand Down Expand Up @@ -403,6 +381,22 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
}, nil
}

var (
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func (d *DockerDriver) recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
var driverConfig DockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
Expand Down Expand Up @@ -482,7 +476,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
err = client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
return nil, d.recoverablePullError(err, image)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)

Expand Down
3 changes: 1 addition & 2 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ func (h *execHandle) run() {
h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
}
}
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0,
Err: err}
h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err)
close(h.waitCh)
h.pluginClient.Kill()
}
21 changes: 21 additions & 0 deletions client/driver/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package structs

import (
"fmt"

cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)

Expand Down Expand Up @@ -34,3 +35,23 @@ func (r *WaitResult) String() string {
type IsolationConfig struct {
Cgroup *cgroupConfig.Cgroup
}

// RecoverableError wraps an error and marks whether it is recoverable and could
// be retried or it is fatal.
type RecoverableError struct {
Err error
Recoverable bool
}

// NewRecoverableError is used to wrap an error and mark it as recoverable or
// not.
func NewRecoverableError(e error, recoverable bool) *RecoverableError {
return &RecoverableError{
Err: e,
Recoverable: recoverable,
}
}

func (r *RecoverableError) Error() string {
return r.Err.Error()
}
100 changes: 82 additions & 18 deletions client/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -25,6 +26,8 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr
}

type RestartTracker struct {
waitRes *cstructs.WaitResult
startErr error
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
Expand All @@ -40,46 +43,107 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
r.policy = policy
}

// NextRestart takes the exit code from the last attempt and returns whether the
// task should be restarted and the duration to wait.
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
// SetStartError is used to mark the most recent start error. If starting was
// successful the error should be nil.
func (r *RestartTracker) SetStartError(err error) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.startErr = err
return r
}

// SetWaitResult is used to mark the most recent wait result.
func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.waitRes = res
return r
}

// GetState returns the tasks next state given the set exit code and start
// error. One of the following states are returned:
// * TaskRestarting - Task should be restarted
// * TaskNotRestarting - Task should not be restarted and has exceeded its
// restart policy.
// * TaskTerminated - Task has terminated successfully and does not need a
// restart.
//
// If TaskRestarting is returned, the duration is how long to wait until
// starting the task again.
func (r *RestartTracker) GetState() (string, time.Duration) {
r.lock.Lock()
defer r.lock.Unlock()

// Hot path if no attempts are expected
if r.policy.Attempts == 0 {
return false, 0
if r.waitRes != nil && r.waitRes.Successful() {
return structs.TaskTerminated, 0
}

return structs.TaskNotRestarting, 0
}

r.count++

// Check if we have entered a new interval.
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
if now.After(end) {
r.count = 0
r.startTime = now
return r.shouldRestart(exitCode), r.jitter()
}

r.count++
if r.startErr != nil {
return r.handleStartError()
} else if r.waitRes != nil {
return r.handleWaitResult()
} else {
return "", 0
}
}

// If we are under the attempts, restart with delay.
if r.count <= r.policy.Attempts {
return r.shouldRestart(exitCode), r.jitter()
// handleStartError returns the new state and potential wait duration for
// restarting the task after it was not successfully started. On start errors,
// the restart policy is always treated as fail mode to ensure we don't
// infinitely try to start a task.
func (r *RestartTracker) handleStartError() (string, time.Duration) {
// If the error is not recoverable, do not restart.
if rerr, ok := r.startErr.(*cstructs.RecoverableError); !(ok && rerr.Recoverable) {
return structs.TaskNotRestarting, 0
}

// Don't restart since mode is "fail"
if r.policy.Mode == structs.RestartPolicyModeFail {
return false, 0
if r.count > r.policy.Attempts {
return structs.TaskNotRestarting, 0
}

// Apply an artifical wait to enter the next interval
return r.shouldRestart(exitCode), end.Sub(now)
return structs.TaskRestarting, r.jitter()
}

// shouldRestart returns whether a restart should occur based on the exit code
// and job type.
func (r *RestartTracker) shouldRestart(exitCode int) bool {
return exitCode != 0 || r.onSuccess
// handleWaitResult returns the new state and potential wait duration for
// restarting the task after it has exited.
func (r *RestartTracker) handleWaitResult() (string, time.Duration) {
// If the task started successfully and restart on success isn't specified,
// don't restart but don't mark as failed.
if r.waitRes.Successful() && !r.onSuccess {
return structs.TaskTerminated, 0
}

if r.count > r.policy.Attempts {
if r.policy.Mode == structs.RestartPolicyModeFail {
return structs.TaskNotRestarting, 0
} else {
return structs.TaskRestarting, r.getDelay()
}
}

return structs.TaskRestarting, r.jitter()
}

// getDelay returns the delay time to enter the next interval.
func (r *RestartTracker) getDelay() time.Duration {
end := r.startTime.Add(r.policy.Interval)
now := time.Now()
return end.Sub(now)
}

// jitter returns the delay time plus a jitter.
Expand Down
53 changes: 40 additions & 13 deletions client/restarts_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client

import (
"fmt"
"testing"
"time"

cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -23,14 +25,18 @@ func withinJitter(expected, actual time.Duration) bool {
expected.Nanoseconds()) <= jitter
}

func testWaitResult(exit int) *cstructs.WaitResult {
return cstructs.NewWaitResult(exit, 0, nil)
}

func TestClient_RestartTracker_ModeDelay(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeService)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
Expand All @@ -39,8 +45,8 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) {

// Follow up restarts should cause delay.
for i := 0; i < 3; i++ {
actual, when := rt.NextRestart(127)
if !actual {
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fail()
}
if !(when > p.Delay && when <= p.Interval) {
Expand All @@ -54,27 +60,27 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
rt := newRestartTracker(p, structs.JobTypeSystem)
for i := 0; i < p.Attempts; i++ {
actual, when := rt.NextRestart(127)
if !actual {
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

// Next restart should cause fail
if actual, _ := rt.NextRestart(127); actual {
t.Fail()
if state, _ := rt.SetWaitResult(testWaitResult(127)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
}
}

func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
t.Parallel()
p := testPolicy(false, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeBatch)
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated {
t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated)
}
}

Expand All @@ -83,7 +89,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := newRestartTracker(p, structs.JobTypeService)
if actual, when := rt.NextRestart(1); actual {
if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("expect no restart, got restart/delay: %v", when)
}
}

func TestClient_RestartTracker_StartError_Recoverable(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeDelay)
rt := newRestartTracker(p, structs.JobTypeSystem)
recErr := cstructs.NewRecoverableError(fmt.Errorf("foo"), true)
for i := 0; i < p.Attempts; i++ {
state, when := rt.SetStartError(recErr).GetState()
if state != structs.TaskRestarting {
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
}
if !withinJitter(p.Delay, when) {
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
}
}

// Next restart should cause fail
if state, _ := rt.SetStartError(recErr).GetState(); state != structs.TaskNotRestarting {
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
}
}
Loading