Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PipelineSpec and TaskSpec fields of PipelineRun and TaskRun Status fields #4891

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
pipelineSpec = resources.ApplyParameters(ctx, pipelineSpec, pr)
pipelineSpec = resources.ApplyContexts(ctx, pipelineSpec, pipelineMeta.Name, pr)
pipelineSpec = resources.ApplyWorkspaces(ctx, pipelineSpec, pr)
// Update pipelinespec of pipelinerun's status field
pr.Status.PipelineSpec = pipelineSpec

// pipelineState holds a list of pipeline tasks after resolving conditions and pipeline resources
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created
Expand Down
49 changes: 28 additions & 21 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
defer c.durationAndCountMetrics(ctx, tr)
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

ts := updateTaskSpecParamsContextsResults(tr, rtr)
tr.Status.TaskSpec = ts

// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
var pod *corev1.Pod
var err error
Expand Down Expand Up @@ -440,8 +444,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
// This is used by createPod below. Changes to the Spec are not updated.
tr.Spec.Workspaces = taskRunWorkspaces
}

pod, err = c.createPod(ctx, tr, rtr)
pod, err = c.createPod(ctx, ts, tr, rtr)
if err != nil {
newErr := c.handlePodCreationError(ctx, tr, err)
logger.Errorf("Failed to create task run pod for taskrun %q: %v", tr.Name, newErr)
Expand Down Expand Up @@ -617,9 +620,8 @@ func (c *Reconciler) failTaskRun(ctx context.Context, tr *v1beta1.TaskRun, reaso

// createPod creates a Pod based on the Task's configuration, with pvcName as a volumeMount
// TODO(dibyom): Refactor resource setup/substitution logic to its own function in the resources package
func (c *Reconciler) createPod(ctx context.Context, tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) {
func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) {
logger := logging.FromContext(ctx)
ts := rtr.TaskSpec.DeepCopy()
inputResources, err := resourceImplBinding(rtr.Inputs, c.Images)
if err != nil {
logger.Errorf("Failed to initialize input resources: %v", err)
Expand Down Expand Up @@ -650,16 +652,6 @@ func (c *Reconciler) createPod(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
return nil, err
}

var defaults []v1beta1.ParamSpec
if len(ts.Params) > 0 {
defaults = append(defaults, ts.Params...)
}
// Apply parameter substitution from the taskrun.
ts = resources.ApplyParameters(ts, tr, defaults...)

// Apply context substitution from the taskrun
ts = resources.ApplyContexts(ts, rtr.TaskName, tr)

// Apply bound resource substitution from the taskrun.
ts = resources.ApplyResources(ts, inputResources, "inputs")
ts = resources.ApplyResources(ts, outputResources, "outputs")
Expand All @@ -670,13 +662,7 @@ func (c *Reconciler) createPod(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
// Apply workspace resource substitution
ts = resources.ApplyWorkspaces(ctx, ts, ts.Workspaces, tr.Spec.Workspaces, workspaceVolumes)

// Apply task result substitution
ts = resources.ApplyTaskResults(ts)

// Apply step exitCode path substitution
ts = resources.ApplyStepExitCodePath(ts)

if validateErr := ts.Validate(config.WithinSubstituted(ctx)); validateErr != nil {
if validateErr := ts.Validate(ctx); validateErr != nil {
logger.Errorf("Failed to create a pod for taskrun: %s due to task validation error %v", tr.Name, validateErr)
return nil, validateErr
}
Expand Down Expand Up @@ -712,6 +698,27 @@ func (c *Reconciler) createPod(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
return pod, err
}

func updateTaskSpecParamsContextsResults(tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources) *v1beta1.TaskSpec {
ts := rtr.TaskSpec.DeepCopy()
var defaults []v1beta1.ParamSpec
if len(ts.Params) > 0 {
defaults = append(defaults, ts.Params...)
}
// Apply parameter substitution from the taskrun.
ts = resources.ApplyParameters(ts, tr, defaults...)

// Apply context substitution from the taskrun
ts = resources.ApplyContexts(ts, rtr.TaskName, tr)

// Apply task result substitution
ts = resources.ApplyTaskResults(ts)

// Apply step exitCode path substitution
ts = resources.ApplyStepExitCodePath(ts)

return ts
}

func isExceededResourceQuotaError(err error) bool {
return err != nil && k8serrors.IsForbidden(err) && strings.Contains(err.Error(), "exceeded quota")
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,8 +2079,8 @@ spec:
Kind: "Task",
TaskSpec: &v1beta1.TaskSpec{Steps: simpleTask.Spec.Steps, Workspaces: simpleTask.Spec.Workspaces},
}

pod, err := r.createPod(testAssets.Ctx, taskRun, rtr)
taskSpec := updateTaskSpecParamsContextsResults(taskRun, rtr)
pod, err := r.createPod(testAssets.Ctx, taskSpec, taskRun, rtr)

if err != nil {
t.Fatalf("create pod threw error %v", err)
Expand Down Expand Up @@ -2183,7 +2183,8 @@ spec:
TaskSpec: &v1beta1.TaskSpec{Steps: simpleTask.Spec.Steps, Workspaces: simpleTask.Spec.Workspaces},
}

_, err := r.createPod(testAssets.Ctx, taskRun, rtr)
taskSpec := updateTaskSpecParamsContextsResults(taskRun, rtr)
_, err := r.createPod(testAssets.Ctx, taskSpec, taskRun, rtr)

if err == nil || err.Error() != expectedError {
t.Errorf("Expected to fail validation for duplicate Workspace mount paths, error was %v", err)
Expand Down
162 changes: 160 additions & 2 deletions test/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,132 @@ var (
cond1Name = "cond-1"
)

func TestPipelineRunStatusSpec(t *testing.T) {
t.Parallel()
type tests struct {
name string
testSetup func(ctx context.Context, t *testing.T, c *clients, namespace string, index int) (map[string]*v1alpha1.PipelineResource, *v1beta1.Pipeline)
expectedTaskRuns []string
expectedNumberOfEvents int
pipelineRunFunc func(*testing.T, int, string, string, map[string]*v1alpha1.PipelineResource) *v1beta1.PipelineRun
}

tds := []tests{{
name: "pipeline status spec updated",
testSetup: func(ctx context.Context, t *testing.T, c *clients, namespace string, _ int) (map[string]*v1alpha1.PipelineResource, *v1beta1.Pipeline) {
t.Helper()
task := parse.MustParseTask(t, fmt.Sprintf(`
metadata:
name: pipeline-status-spec-updated
namespace: %s
spec:
params:
- name: HELLO
default: "Hi!"
steps:
- image: ubuntu
script: |
#!/usr/bin/env bash
echo "$(params.HELLO)"
`, namespace))
if _, err := c.TaskClient.Create(ctx, task, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task `%s`: %s", task.Name, err)
}

p := getUpdatedStatusSpecPipeline(t, namespace, task.Name)
if _, err := c.PipelineClient.Create(ctx, p, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", p.Name, err)
}

return nil, p
},
expectedTaskRuns: []string{"task1"},
// 1 from PipelineRun; 0 from taskrun since it should not be executed due to condition failing
expectedNumberOfEvents: 2,
pipelineRunFunc: getUpdatedStatusSpecPipelineRun,
}}

for i, td := range tds {
i := i // capture range variable
td := td // capture range variable
t.Run(td.name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)

knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

t.Logf("Setting up test resources for %q test in namespace %s", td.name, namespace)
resources, p := td.testSetup(ctx, t, c, namespace, i)

pipelineRun := td.pipelineRunFunc(t, i, namespace, p.Name, resources)
prName := pipelineRun.Name
_, err := c.PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create PipelineRun `%s`: %s", prName, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to complete", prName, namespace)
if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSuccess"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to finish: %s", prName, err)
}
t.Logf("Making sure the expected TaskRuns %s were created", td.expectedTaskRuns)
actualTaskrunList, err := c.TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("tekton.dev/pipelineRun=%s", prName)})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", prName, err)
}
expectedTaskRunNames := []string{}
for _, runName := range td.expectedTaskRuns {
taskRunName := strings.Join([]string{prName, runName}, "-")
// check the actual task name starting with prName+runName with a random suffix
for _, actualTaskRunItem := range actualTaskrunList.Items {
if strings.HasPrefix(actualTaskRunItem.Name, taskRunName) {
taskRunName = actualTaskRunItem.Name
}
}
expectedTaskRunNames = append(expectedTaskRunNames, taskRunName)
r, err := c.TaskRunClient.Get(ctx, taskRunName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Couldn't get expected TaskRun %s: %s", taskRunName, err)
}
if !r.Status.GetCondition(apis.ConditionSucceeded).IsTrue() {
t.Fatalf("Expected TaskRun %s to have succeeded but Status is %v", taskRunName, r.Status)
}
}

matchKinds := map[string][]string{"PipelineRun": {prName}, "TaskRun": expectedTaskRunNames}

events, err := collectMatchingEvents(ctx, c.KubeClient, namespace, matchKinds, "Succeeded")
if err != nil {
t.Fatalf("Failed to collect matching events: %q", err)
}
if len(events) != td.expectedNumberOfEvents {
collectedEvents := ""
for i, event := range events {
collectedEvents += fmt.Sprintf("%#v", event)
if i < (len(events) - 1) {
collectedEvents += ", "
}
}
t.Fatalf("Expected %d number of successful events from pipelinerun and taskrun but got %d; list of receieved events : %#v", td.expectedNumberOfEvents, len(events), collectedEvents)
}
t.Log("Checking if parameter replacements have been updated in the spec.")
cl, _ := c.PipelineRunClient.Get(ctx, prName, metav1.GetOptions{})
if cl.Status.PipelineSpec.Tasks[0].Params[0].Value.StringVal != "Hello World!" {
t.Fatalf(`Expected replaced parameter value %s but found %s`, "Hello World!", cl.Status.PipelineSpec.Tasks[0].Params[0].Value.StringVal)
}
tl, _ := c.TaskRunClient.Get(ctx, "pipeline-task-update-task1", metav1.GetOptions{})
if !strings.Contains(tl.Status.TaskSpec.Steps[0].Script, "Hello World!") {
t.Fatalf(`Expected replaced parameter value : %s in Script: %s But not found`, "Hello World!", tl.Status.TaskSpec.Steps[0].Script)
}
t.Logf("Successfully finished test %q", td.name)
})
}
}

func TestPipelineRun(t *testing.T) {
t.Parallel()
type tests struct {
Expand Down Expand Up @@ -122,7 +248,6 @@ spec:
if _, err := c.TaskClient.Create(ctx, task, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task `%s`: %s", task.Name, err)
}

p := getHelloWorldPipelineWithSingularTask(t, namespace, task.Name)
if _, err := c.PipelineClient.Create(ctx, p, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", p.Name, err)
Expand Down Expand Up @@ -244,7 +369,6 @@ spec:
if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSuccess"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to finish: %s", prName, err)
}

t.Logf("Making sure the expected TaskRuns %s were created", td.expectedTaskRuns)
actualTaskrunList, err := c.TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("tekton.dev/pipelineRun=%s", prName)})
if err != nil {
Expand Down Expand Up @@ -314,6 +438,25 @@ spec:
}
}

func getUpdatedStatusSpecPipeline(t *testing.T, namespace string, taskName string) *v1beta1.Pipeline {
return parse.MustParsePipeline(t, fmt.Sprintf(`
metadata:
name: pipeline-status-spec-updated
namespace: %s
spec:
params:
- name: HELLO
type: string
tasks:
- name: %s
params:
- name: HELLO
value: "$(params.HELLO)"
taskRef:
name: %s
`, namespace, task1Name, taskName))
}

func getHelloWorldPipelineWithSingularTask(t *testing.T, namespace string, taskName string) *v1beta1.Pipeline {
return parse.MustParsePipeline(t, fmt.Sprintf(`
metadata:
Expand Down Expand Up @@ -715,6 +858,21 @@ func getPipelineRunSecret(suffix int, namespace string) *corev1.Secret {
}
}

func getUpdatedStatusSpecPipelineRun(t *testing.T, _ int, namespace string, pipelineName string, _ map[string]*v1alpha1.PipelineResource) *v1beta1.PipelineRun {
return parse.MustParsePipelineRun(t, fmt.Sprintf(`
metadata:
name: "pipeline-task-update"
namespace: %s
spec:
params:
- name: HELLO
value: "Hello World!"
pipelineRef:
name: %s
`, namespace, pipelineName))
// `, helpers.ObjectNameForTest(t), namespace, pipelineName))
}

func getHelloWorldPipelineRun(t *testing.T, suffix int, namespace string, pipelineName string, _ map[string]*v1alpha1.PipelineResource) *v1beta1.PipelineRun {
return parse.MustParsePipelineRun(t, fmt.Sprintf(`
metadata:
Expand Down