From e0224b8a350b5bdb2aade822614d226465435ad5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 28 Feb 2016 16:56:05 -0800 Subject: [PATCH 1/2] Refactor task runner to include driver starting into restart policy and add recoverable errors --- api/tasks.go | 4 + client/alloc_runner.go | 3 +- client/driver/docker.go | 42 ++++----- client/driver/exec.go | 3 +- client/driver/structs/structs.go | 21 +++++ client/restarts.go | 100 ++++++++++++++++---- client/restarts_test.go | 53 ++++++++--- client/task_runner.go | 151 +++++++++++++++---------------- client/task_runner_test.go | 33 ++++--- command/alloc_status.go | 20 +++- nomad/structs/structs.go | 29 ++++++ 11 files changed, 306 insertions(+), 153 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index cdcf4e5d43b..5450dece120 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -148,9 +148,12 @@ type TaskState struct { const ( TaskDriverFailure = "Driver Failure" + TaskReceived = "Received" TaskStarted = "Started" TaskTerminated = "Terminated" TaskKilled = "Killed" + TaskRestarting = "Restarting" + TaskNotRestarting = "Restarts Exceeded" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -163,4 +166,5 @@ type TaskEvent struct { Signal int Message string KillError string + StartDelay int64 } diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 4eb0d9b87ef..6022885bbb6 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -245,8 +245,7 @@ func (r *AllocRunner) Alloc() *structs.Allocation { case structs.TaskStatePending: pending = true case structs.TaskStateDead: - last := len(state.Events) - 1 - if state.Events[last].Type == structs.TaskDriverFailure { + if state.Failed() { failed = true } else { dead = true diff --git a/client/driver/docker.go b/client/driver/docker.go index 48e65f6fe2a..0f439e40888 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strconv" "strings" "sync" @@ -209,32 +210,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, hostConfig := &docker.HostConfig{ // Convert MB to bytes. This is an absolute value. - // - // This value represents the total amount of memory a process can use. - // Swap is added to total memory and is managed by the OS, not docker. - // Since this may cause other processes to swap and cause system - // instability, we will simply not use swap. - // - // See: https://www.kernel.org/doc/Documentation/cgroups/memory.txt Memory: int64(task.Resources.MemoryMB) * 1024 * 1024, MemorySwap: -1, // Convert Mhz to shares. This is a relative value. - // - // There are two types of CPU limiters available: Shares and Quotas. A - // Share allows a particular process to have a proportion of CPU time - // relative to other processes; 1024 by default. A CPU Quota is enforced - // over a Period of time and is a HARD limit on the amount of CPU time a - // process can use. Processes with quotas cannot burst, while processes - // with shares can, so we'll use shares. - // - // The simplest scale is 1 share to 1 MHz so 1024 = 1GHz. This means any - // given process will have at least that amount of resources, but likely - // more since it is (probably) rare that the machine will run at 100% - // CPU. This scale will cease to work if a node is overprovisioned. - // - // See: - // - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt - // - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt CPUShares: int64(task.Resources.CPU), // Binds are used to mount a host volume into the container. We mount a @@ -403,6 +381,22 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, }, nil } +var ( + // imageNotFoundMatcher is a regex expression that matches the image not + // found error Docker returns. + imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) +) + +// recoverablePullError wraps the error gotten when trying to pull and image if +// the error is recoverable. +func (d *DockerDriver) recoverablePullError(err error, image string) error { + recoverable := true + if imageNotFoundMatcher.MatchString(err.Error()) { + recoverable = false + } + return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable) +} + func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { var driverConfig DockerDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { @@ -482,7 +476,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle err = client.PullImage(pullOptions, authOptions) if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) - return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err) + return nil, d.recoverablePullError(err, image) } d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag) diff --git a/client/driver/exec.go b/client/driver/exec.go index e224c400255..cf8e021cc75 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -277,8 +277,7 @@ func (h *execHandle) run() { h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e) } } - h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0, - Err: err} + h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err) close(h.waitCh) h.pluginClient.Kill() } diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go index 91225345762..cafd25c26f6 100644 --- a/client/driver/structs/structs.go +++ b/client/driver/structs/structs.go @@ -2,6 +2,7 @@ package structs import ( "fmt" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) @@ -34,3 +35,23 @@ func (r *WaitResult) String() string { type IsolationConfig struct { Cgroup *cgroupConfig.Cgroup } + +// RecoverableError wraps an error and marks whether it is recoverable and could +// be retried or it is fatal. +type RecoverableError struct { + Err error + Recoverable bool +} + +// NewRecoverableError is used to wrap an error and mark it as recoverable or +// not. +func NewRecoverableError(e error, recoverable bool) *RecoverableError { + return &RecoverableError{ + Err: e, + Recoverable: recoverable, + } +} + +func (r *RecoverableError) Error() string { + return r.Err.Error() +} diff --git a/client/restarts.go b/client/restarts.go index 8e6af3bf4d0..66d94bf9295 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -5,6 +5,7 @@ import ( "sync" "time" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,6 +26,8 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr } type RestartTracker struct { + waitRes *cstructs.WaitResult + startErr error count int // Current number of attempts. onSuccess bool // Whether to restart on successful exit code. startTime time.Time // When the interval began @@ -40,46 +43,107 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) { r.policy = policy } -// NextRestart takes the exit code from the last attempt and returns whether the -// task should be restarted and the duration to wait. -func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) { +// SetStartError is used to mark the most recent start error. If starting was +// successful the error should be nil. +func (r *RestartTracker) SetStartError(err error) *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.startErr = err + return r +} + +// SetWaitResult is used to mark the most recent wait result. +func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.waitRes = res + return r +} + +// GetState returns the tasks next state given the set exit code and start +// error. One of the following states are returned: +// * TaskRestarting - Task should be restarted +// * TaskNotRestarting - Task should not be restarted and has exceeded its +// restart policy. +// * TaskTerminated - Task has terminated successfully and does not need a +// restart. +// +// If TaskRestarting is returned, the duration is how long to wait until +// starting the task again. +func (r *RestartTracker) GetState() (string, time.Duration) { r.lock.Lock() defer r.lock.Unlock() // Hot path if no attempts are expected if r.policy.Attempts == 0 { - return false, 0 + if r.waitRes != nil && r.waitRes.Successful() { + return structs.TaskTerminated, 0 + } + + return structs.TaskNotRestarting, 0 } + r.count++ + // Check if we have entered a new interval. end := r.startTime.Add(r.policy.Interval) now := time.Now() if now.After(end) { r.count = 0 r.startTime = now - return r.shouldRestart(exitCode), r.jitter() } - r.count++ + if r.startErr != nil { + return r.handleStartError() + } else if r.waitRes != nil { + return r.handleWaitResult() + } else { + return "", 0 + } +} - // If we are under the attempts, restart with delay. - if r.count <= r.policy.Attempts { - return r.shouldRestart(exitCode), r.jitter() +// handleStartError returns the new state and potential wait duration for +// restarting the task after it was not successfully started. On start errors, +// the restart policy is always treated as fail mode to ensure we don't +// infinitely try to start a task. +func (r *RestartTracker) handleStartError() (string, time.Duration) { + // If the error is not recoverable, do not restart. + if rerr, ok := r.startErr.(*cstructs.RecoverableError); !(ok && rerr.Recoverable) { + return structs.TaskNotRestarting, 0 } - // Don't restart since mode is "fail" - if r.policy.Mode == structs.RestartPolicyModeFail { - return false, 0 + if r.count > r.policy.Attempts { + return structs.TaskNotRestarting, 0 } - // Apply an artifical wait to enter the next interval - return r.shouldRestart(exitCode), end.Sub(now) + return structs.TaskRestarting, r.jitter() } -// shouldRestart returns whether a restart should occur based on the exit code -// and job type. -func (r *RestartTracker) shouldRestart(exitCode int) bool { - return exitCode != 0 || r.onSuccess +// handleWaitResult returns the new state and potential wait duration for +// restarting the task after it has exited. +func (r *RestartTracker) handleWaitResult() (string, time.Duration) { + // If the task started successfully and restart on success isn't specified, + // don't restart but don't mark as failed. + if r.waitRes.Successful() && !r.onSuccess { + return structs.TaskTerminated, 0 + } + + if r.count > r.policy.Attempts { + if r.policy.Mode == structs.RestartPolicyModeFail { + return structs.TaskNotRestarting, 0 + } else { + return structs.TaskRestarting, r.getDelay() + } + } + + return structs.TaskRestarting, r.jitter() +} + +// getDelay returns the delay time to enter the next interval. +func (r *RestartTracker) getDelay() time.Duration { + end := r.startTime.Add(r.policy.Interval) + now := time.Now() + return end.Sub(now) } // jitter returns the delay time plus a jitter. diff --git a/client/restarts_test.go b/client/restarts_test.go index 89dd18c1a36..e46a7333497 100644 --- a/client/restarts_test.go +++ b/client/restarts_test.go @@ -1,9 +1,11 @@ package client import ( + "fmt" "testing" "time" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -23,14 +25,18 @@ func withinJitter(expected, actual time.Duration) bool { expected.Nanoseconds()) <= jitter } +func testWaitResult(exit int) *cstructs.WaitResult { + return cstructs.NewWaitResult(exit, 0, nil) +} + func TestClient_RestartTracker_ModeDelay(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeDelay) rt := newRestartTracker(p, structs.JobTypeService) for i := 0; i < p.Attempts; i++ { - actual, when := rt.NextRestart(127) - if !actual { - t.Fatalf("NextRestart() returned %v, want %v", actual, true) + state, when := rt.SetWaitResult(testWaitResult(127)).GetState() + if state != structs.TaskRestarting { + t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting) } if !withinJitter(p.Delay, when) { t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay) @@ -39,8 +45,8 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) { // Follow up restarts should cause delay. for i := 0; i < 3; i++ { - actual, when := rt.NextRestart(127) - if !actual { + state, when := rt.SetWaitResult(testWaitResult(127)).GetState() + if state != structs.TaskRestarting { t.Fail() } if !(when > p.Delay && when <= p.Interval) { @@ -54,9 +60,9 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) { p := testPolicy(true, structs.RestartPolicyModeFail) rt := newRestartTracker(p, structs.JobTypeSystem) for i := 0; i < p.Attempts; i++ { - actual, when := rt.NextRestart(127) - if !actual { - t.Fatalf("NextRestart() returned %v, want %v", actual, true) + state, when := rt.SetWaitResult(testWaitResult(127)).GetState() + if state != structs.TaskRestarting { + t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting) } if !withinJitter(p.Delay, when) { t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay) @@ -64,8 +70,8 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) { } // Next restart should cause fail - if actual, _ := rt.NextRestart(127); actual { - t.Fail() + if state, _ := rt.SetWaitResult(testWaitResult(127)).GetState(); state != structs.TaskNotRestarting { + t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting) } } @@ -73,8 +79,8 @@ func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) { t.Parallel() p := testPolicy(false, structs.RestartPolicyModeDelay) rt := newRestartTracker(p, structs.JobTypeBatch) - if shouldRestart, _ := rt.NextRestart(0); shouldRestart { - t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false) + if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated { + t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated) } } @@ -83,7 +89,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) { p := testPolicy(true, structs.RestartPolicyModeFail) p.Attempts = 0 rt := newRestartTracker(p, structs.JobTypeService) - if actual, when := rt.NextRestart(1); actual { + if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting { t.Fatalf("expect no restart, got restart/delay: %v", when) } } + +func TestClient_RestartTracker_StartError_Recoverable(t *testing.T) { + t.Parallel() + p := testPolicy(true, structs.RestartPolicyModeDelay) + rt := newRestartTracker(p, structs.JobTypeSystem) + recErr := cstructs.NewRecoverableError(fmt.Errorf("foo"), true) + for i := 0; i < p.Attempts; i++ { + state, when := rt.SetStartError(recErr).GetState() + if state != structs.TaskRestarting { + t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting) + } + if !withinJitter(p.Delay, when) { + t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay) + } + } + + // Next restart should cause fail + if state, _ := rt.SetStartError(recErr).GetState(); state != structs.TaskNotRestarting { + t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting) + } +} diff --git a/client/task_runner.go b/client/task_runner.go index e368f7aa0d9..a25835e268e 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -203,33 +203,6 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { return driver, err } -// startTask is used to start the task if there is no handle -func (r *TaskRunner) startTask() error { - // Create a driver - driver, err := r.createDriver() - if err != nil { - e := structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err) - r.setState(structs.TaskStateDead, e) - return err - } - - // Start the job - handle, err := driver.Start(r.ctx, r.task) - if err != nil { - r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", - r.task.Name, r.alloc.ID, err) - e := structs.NewTaskEvent(structs.TaskDriverFailure). - SetDriverError(fmt.Errorf("failed to start: %v", err)) - r.setState(structs.TaskStateDead, e) - return err - } - r.handleLock.Lock() - r.handle = handle - r.handleLock.Unlock() - r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) - return nil -} - // Run is a long running routine used to manage the task func (r *TaskRunner) Run() { defer close(r.waitCh) @@ -241,33 +214,48 @@ func (r *TaskRunner) Run() { } func (r *TaskRunner) run() { - var forceStart bool for { - // Start the task if not yet started or it is being forced. + // Start the task if not yet started or it is being forced. This logic + // is necessary because in the case of a restore the handle already + // exists. r.handleLock.Lock() handleEmpty := r.handle == nil r.handleLock.Unlock() - if handleEmpty || forceStart { - forceStart = false - if err := r.startTask(); err != nil { - return + if handleEmpty { + startErr := r.startTask() + r.restartTracker.SetStartError(startErr) + if startErr != nil { + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr)) + goto RESTART } } - // Store the errors that caused use to stop waiting for updates. - var waitRes *cstructs.WaitResult - var destroyErr error - destroyed := false - - // Register the services defined by the task with Consil + // Mark the task as started and register it with Consul. + r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) r.consulService.Register(r.task, r.alloc) - OUTER: // Wait for updates + WAIT: for { select { - case waitRes = <-r.handle.WaitCh(): - break OUTER + case waitRes := <-r.handle.WaitCh(): + // De-Register the services belonging to the task from consul + r.consulService.Deregister(r.task, r.alloc) + + if waitRes == nil { + panic("nil wait") + } + + // Log whether the task was successful or not. + r.restartTracker.SetWaitResult(waitRes) + r.setState(structs.TaskStateDead, r.waitErrorToEvent(waitRes)) + if !waitRes.Successful() { + r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes) + } else { + r.logger.Printf("[INFO] client: task %q for alloc %q completed successfully", r.task.Name, r.alloc.ID) + } + + break WAIT case update := <-r.updateCh: if err := r.handleUpdate(update); err != nil { r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err) @@ -278,50 +266,32 @@ func (r *TaskRunner) run() { if !destroySuccess { // We couldn't successfully destroy the resource created. r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) - } else { - // Wait for the task to exit but cap the time to ensure we don't block. - select { - case waitRes = <-r.handle.WaitCh(): - case <-time.After(3 * time.Second): - } } // Store that the task has been destroyed and any associated error. - destroyed = true - destroyErr = err - break OUTER + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) + r.consulService.Deregister(r.task, r.alloc) + return } } - // De-Register the services belonging to the task from consul - r.consulService.Deregister(r.task, r.alloc) - - // If the user destroyed the task, we do not attempt to do any restarts. - if destroyed { - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) - return - } - - // Log whether the task was successful or not. - if !waitRes.Successful() { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes) - } else { - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID) - } - - // Check if we should restart. If not mark task as dead and exit. - shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode) - waitEvent := r.waitErrorToEvent(waitRes) - if !shouldRestart { + RESTART: + state, when := r.restartTracker.GetState() + switch state { + case structs.TaskNotRestarting, structs.TaskTerminated: r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) - r.setState(structs.TaskStateDead, waitEvent) + if state == structs.TaskNotRestarting { + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting)) + } + return + case structs.TaskRestarting: + r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when) + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when)) + default: + r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state) return } - r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) - r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) - r.setState(structs.TaskStatePending, waitEvent) - // Sleep but watch for destroy events. select { case <-time.After(when): @@ -330,7 +300,7 @@ func (r *TaskRunner) run() { // Destroyed while we were waiting to restart, so abort. r.destroyLock.Lock() - destroyed = r.destroy + destroyed := r.destroy r.destroyLock.Unlock() if destroyed { r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) @@ -338,11 +308,34 @@ func (r *TaskRunner) run() { return } - // Set force start because we are restarting the task. - forceStart = true + // Clear the handle so a new driver will be created. + r.handle = nil } } +func (r *TaskRunner) startTask() error { + // Create a driver + driver, err := r.createDriver() + if err != nil { + r.logger.Printf("[ERR] client: failed to create driver of task '%s' for alloc '%s': %v", + r.task.Name, r.alloc.ID, err) + return err + } + + // Start the job + handle, err := driver.Start(r.ctx, r.task) + if err != nil { + r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", + r.task.Name, r.alloc.ID, err) + return err + } + + r.handleLock.Lock() + r.handle = handle + r.handleLock.Unlock() + return nil +} + // handleUpdate takes an updated allocation and updates internal state to // reflect the new config for the task. func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { diff --git a/client/task_runner_test.go b/client/task_runner_test.go index f31348560e6..a0870379476 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -96,14 +96,29 @@ func TestTaskRunner_Destroy(t *testing.T) { // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" - tr.task.Config["args"] = []string{"10"} + tr.task.Config["args"] = []string{"1000"} go tr.Run() + testutil.WaitForResult(func() (bool, error) { + if l := len(upd.events); l != 2 { + return false, fmt.Errorf("Expect two events; got %v", l) + } + + if upd.events[0].Type != structs.TaskReceived { + return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskStarted { + return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + // Begin the tear down - go func() { - time.Sleep(100 * time.Millisecond) - tr.Destroy() - }() + tr.Destroy() select { case <-tr.WaitCh(): @@ -119,14 +134,6 @@ func TestTaskRunner_Destroy(t *testing.T) { t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if upd.events[0].Type != structs.TaskReceived { - t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) - } - - if upd.events[1].Type != structs.TaskStarted { - t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) - } - if upd.events[2].Type != structs.TaskKilled { t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled) } diff --git a/command/alloc_status.go b/command/alloc_status.go index 9d462b29ea1..34a5a2b9694 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -190,10 +190,22 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) { // Build up the description based on the event type. var desc string switch event.Type { + case api.TaskStarted: + desc = "Task started by client" + case api.TaskReceived: + desc = "Task received by client" case api.TaskDriverFailure: - desc = event.DriverError + if event.DriverError != "" { + desc = event.DriverError + } else { + desc = "Failed to start task" + } case api.TaskKilled: - desc = event.KillError + if event.KillError != "" { + desc = event.KillError + } else { + desc = "Task successfully killed" + } case api.TaskTerminated: var parts []string parts = append(parts, fmt.Sprintf("Exit Code: %d", event.ExitCode)) @@ -206,6 +218,10 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) { parts = append(parts, fmt.Sprintf("Exit Message: %q", event.Message)) } desc = strings.Join(parts, ", ") + case api.TaskRestarting: + desc = fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay)) + case api.TaskNotRestarting: + desc = "Task exceeded restart policy" } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1aa47ea3a1f..0bc741de256 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1689,6 +1689,16 @@ func (ts *TaskState) Copy() *TaskState { return copy } +// Failed returns if the task has has failed. +func (ts *TaskState) Failed() bool { + l := len(ts.Events) + if ts.State != TaskStateDead || l == 0 { + return false + } + + return ts.Events[l-1].Type == TaskNotRestarting +} + const ( // A Driver failure indicates that the task could not be started due to a // failure in the driver. @@ -1707,6 +1717,13 @@ const ( // Task Killed indicates a user has killed the task. TaskKilled = "Killed" + + // TaskRestarting indicates that task terminated and is being restarted. + TaskRestarting = "Restarting" + + // TaskNotRestarting indicates that the task has failed and is not being + // restarted because it has exceeded its restart policy. + TaskNotRestarting = "Restarts Exceeded" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -1725,6 +1742,13 @@ type TaskEvent struct { // Task Killed Fields. KillError string // Error killing the task. + + // TaskRestarting fields. + StartDelay int64 // The sleep period before restarting the task in unix nanoseconds. +} + +func (te *TaskEvent) GoString() string { + return fmt.Sprintf("%v at %v", te.Type, te.Time) } func (te *TaskEvent) Copy() *TaskEvent { @@ -1774,6 +1798,11 @@ func (e *TaskEvent) SetKillError(err error) *TaskEvent { return e } +func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent { + e.StartDelay = int64(delay) + return e +} + // Validate is used to sanity check a task group func (t *Task) Validate() error { var mErr multierror.Error From 343fddb4039014819c914d5d72fea9c900d7220c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 29 Feb 2016 10:45:08 -0800 Subject: [PATCH 2/2] Acquire lock around handle --- client/task_runner.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/task_runner.go b/client/task_runner.go index a25835e268e..4167dfd567f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -309,7 +309,9 @@ func (r *TaskRunner) run() { } // Clear the handle so a new driver will be created. + r.handleLock.Lock() r.handle = nil + r.handleLock.Unlock() } }