Skip to content

Commit

Permalink
Set pipeline status when all tasks complete
Browse files Browse the repository at this point in the history
We used to set the pipeline status to failed as soon as the first
task in the pipeline failed or was cancelled.

As soon as the first task in the pipeline fails or is cancelled, we
stop scheduling new tasks, as we did before, but we will report
status Unknown until all Tasks are complete.

This allows to:
- the completion time at the same time that the status is set
  and avoid inconsistencies
- wait until all tasks are complete before we cleanup the pipeline
  artifact storage, affinity assistant and record metrics
- report the correct number of failed / cancelled tasks, as there
  may be more than one. Other tasks that were already running
  when the first failure happened may fail too
- prepare the pipeline controller more complex workflows, where
  the controller may continue working scheduling after failures
  • Loading branch information
afrittoli committed Jun 7, 2020
1 parent 194102f commit f4513a1
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 88 deletions.
16 changes: 9 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...)
if err != nil {
c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
Expand All @@ -484,13 +491,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

for _, rprt := range nextRprts {
if rprt == nil {
continue
Expand Down Expand Up @@ -521,6 +521,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(c.Recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
Expand Down
94 changes: 52 additions & 42 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const (
// ReasonConditionCheckFailed indicates that the reason for the failure status is that the
// condition check associated to the pipeline task evaluated to false
ReasonConditionCheckFailed = "ConditionCheckFailed"

// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
ReasonStopping = "PipelineRunStopping"
)

// TaskNotFoundError indicates that the resolution failed because a referenced Task couldn't be retrieved
Expand Down Expand Up @@ -410,7 +414,7 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus,
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running
if pr.IsTimedOut() {
Expand All @@ -422,67 +426,73 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
}
}

// A single failed task mean we fail the pipeline
for _, rprt := range state {
if rprt.IsCancelled() {
logger.Infof("TaskRun %s is cancelled, so PipelineRun %s is cancelled", rprt.TaskRunName, pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCancelled,
Message: fmt.Sprintf("TaskRun %s has cancelled", rprt.TaskRun.Name),
}
}

if rprt.IsFailure() { //IsDone ensures we have crossed the retry limit
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed, retries done: %b", rprt.TaskRunName, pr.Name, len(rprt.TaskRun.Status.RetriesStatus))
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonFailed,
Message: fmt.Sprintf("TaskRun %s has failed", rprt.TaskRun.Name),
}
}
}

allTasks := []string{}
successOrSkipTasks := []string{}
completedTasks := []string{}
skipTasks := int(0)
failedTasks := int(0)
cancelledTasks := int(0)
reason := ReasonSucceeded

// Check to see if all tasks are success or skipped
//
// The completion reason is also calculated here, but it will only be used
// if all tasks are completed.
//
// The pipeline run completion reason is set from the taskrun completion reason
// according to the following logic:
//
// - All successful: ReasonSucceeded
// - Some successful, some skipped: ReasonCompleted
// - Some cancelled, none failed: ReasonCancelled
// - At least one failed: ReasonFailed
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
if rprt.IsSuccessful() {
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
}
if isSkipped(rprt, state.ToMap(), dag) {
switch {
case rprt.IsSuccessful():
completedTasks = append(completedTasks, rprt.PipelineTask.Name)
case isSkipped(rprt, state.ToMap(), dag):
skipTasks++
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
}
}

if reflect.DeepEqual(allTasks, successOrSkipTasks) {
completedTasks = append(completedTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
if reason == ReasonSucceeded:
reason = ReasonCompleted
case rprt.IsCancelled():
cancelledTasks++
failedOrCancelledTasks = append(failedOrCancelledTasks, rprt.PipelineTask.Name)
if reason != ReasonFailed:
reason = ReasonCancelled
case rprt.IsFailure():
failedTasks++
failedOrCancelledTasks = append(failedOrCancelledTasks, rprt.PipelineTask.Name)
reason = ReasonFailed
}
}

if reflect.DeepEqual(allTasks, completedTasks) {
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
reason := ReasonSucceeded
if skipTasks != 0 {
reason = ReasonCompleted
}

return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, skipTasks),
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d, Failed: %d, Cancelled %d",
len(successOrSkipTasks)-skipTasks, skipTasks, failedTasks, cancelledTasks),
}
}

// Hasn't timed out; no taskrun failed yet; and not all tasks have finished....
// Hasn't timed out; not all tasks have finished....
// Must keep running then....
if reason == ReasonCancelled || reason == ReasonFailed {
reason = ReasonStopping
} else {
reason = ReasonRunning
}
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: ReasonRunning,
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks),
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d, Failed: %d, Cancelled %d",
len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks, failedTasks, cancelledTasks),
}
}

Expand Down
78 changes: 39 additions & 39 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,109 +974,103 @@ func TestSuccessfulPipelineTaskNames(t *testing.T) {
}
}

func getExpectedMessage(status corev1.ConditionStatus, completed, incomplete, skipped, failed, cancelled int) string {
if status == corev1.ConditionFalse || status == corev1.ConditionTrue {
return fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d, Failed: %d, Cancelled %d",
completed, incomplete, skipped, failed, cancelled)
}
return fmt.Sprintf("Tasks Completed: %d, Skipped: %d, Failed: %d, Cancelled %d",
completed, skipped, failed, cancelled)
}

func TestGetPipelineConditionStatus(t *testing.T) {

var taskRetriedState = PipelineRunState{{
PipelineTask: &pts[3], // 1 retry needed
TaskRunName: "pipelinerun-mytask1",
TaskRun: withCancelled(makeRetried(trs[0])),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}

tcs := []struct {
name string
state []*ResolvedPipelineRunTask
expectedStatus corev1.ConditionStatus
name string
state []*ResolvedPipelineRunTask
expectedStatus corev1.ConditionStatus
expectedReason string
expectedCounters []int
}{{
name: "no-tasks-started",
state: noneStartedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "one-task-started",
state: oneStartedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "one-task-finished",
state: oneFinishedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "one-task-failed",
state: oneFailedState,
expectedStatus: corev1.ConditionFalse,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "all-finished",
state: allFinishedState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonSucceeded,
}, {
name: "one-retry-needed",
state: taskRetriedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "condition-success-no-task started",
state: conditionCheckSuccessNoTaskStartedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "condition-check-in-progress",
state: conditionCheckStartedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}, {
name: "condition-failed-no-other-tasks", // 1 task pipeline with a condition that fails
state: conditionCheckFailedWithNoOtherTasksState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonCompleted,
}, {
name: "condition-failed-another-task-succeeded", // 1 task skipped due to condition, but others pass
state: conditionCheckFailedWithOthersPassedState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonCompleted,
}, {
name: "condition-failed-another-task-failed", // 1 task skipped due to condition, but others failed
state: conditionCheckFailedWithOthersFailedState,
expectedStatus: corev1.ConditionFalse,
}, {
name: "no-tasks-started",
state: noneStartedState,
expectedStatus: corev1.ConditionUnknown,
}, {
name: "one-task-started",
state: oneStartedState,
expectedStatus: corev1.ConditionUnknown,
}, {
name: "one-task-finished",
state: oneFinishedState,
expectedStatus: corev1.ConditionUnknown,
}, {
name: "one-task-failed",
state: oneFailedState,
expectedStatus: corev1.ConditionFalse,
}, {
name: "all-finished",
state: allFinishedState,
expectedStatus: corev1.ConditionTrue,
}, {
name: "one-retry-needed",
state: taskRetriedState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonFailed,
}, {
name: "task skipped due to condition failure in parent",
state: taskWithParentSkippedState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonCompleted,
}, {
name: "task with multiple parent tasks -> one of which is skipped",
state: taskWithMultipleParentsSkippedState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonCompleted,
}, {
name: "task with grand parent task skipped",
state: taskWithGrandParentSkippedState,
expectedStatus: corev1.ConditionTrue,
expectedReason: ReasonCompleted,
}, {
name: "task with grand parents; one parent failed",
state: taskWithGrandParentsOneFailedState,
expectedStatus: corev1.ConditionFalse,
expectedReason: ReasonFailed,
}, {
name: "task with grand parents; one not run yet",
state: taskWithGrandParentsOneNotRunState,
expectedStatus: corev1.ConditionUnknown,
expectedReason: ReasonRunning,
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -1086,8 +1080,14 @@ func TestGetPipelineConditionStatus(t *testing.T) {
t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err)
}
c := GetPipelineConditionStatus(pr, tc.state, zap.NewNop().Sugar(), dag)
if c.Status != tc.expectedStatus {
t.Fatalf("Expected to get status %s but got %s for state %v", tc.expectedStatus, c.Status, tc.state)
wantCondition = &apis.Condition{
Type: apis.ConditionSucceeded,
Status: tc.expectedStatus,
Reason: tc.expectedReason,
Message: getExpectedMessage(tc.expectedStatus, tc.ExpectedCounters...),
}
if got, want := c, wantCondition; got != want {
t.Fatalf("Expected to get condition %s but got %s for state %v", want, got, tc.state)
}
})
}
Expand Down

0 comments on commit f4513a1

Please sign in to comment.