Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding metadata to taskSpec in PipelineTask #2826

Merged
merged 1 commit into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ spec:
In the [`taskSpec` in `pipelineSpec` example](../examples/v1beta1/pipelineruns/pipelinerun-with-pipelinespec-and-taskspec.yaml)
it's `Tasks` all the way down!

You can also specify labels and annotations with `taskSpec` which are propagated to each `taskRun` and then to the
respective pods. These labels can be used to identify and filter pods for further actions (such as collecting pod metrics,
and cleaning up completed pod with certain labels, etc) even being part of one single Pipeline.

```yaml
spec:
pipelineSpec:
tasks:
- name: task1
metadata:
labels:
pipeline-sdk-type: kfp
taskSpec:
...
- name: task2
metadata:
labels:
pipeline-sdk-type: tfx
taskSpec:
...
```

## Specifying `Resources`

A `Pipeline` requires [`PipelineResources`](resources.md) to provide inputs and store outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ spec:
tasks:
- name: echo-good-morning
taskSpec:
metadata:
labels:
app: "example"
steps:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated an example to show the usage

- name: echo
image: ubuntu
Expand Down
15 changes: 14 additions & 1 deletion internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,20 @@ func PipelineRunResult(name, value string) PipelineRunStatusOp {
// PipelineTaskSpec sets the TaskSpec on a PipelineTask.
func PipelineTaskSpec(spec *v1beta1.TaskSpec) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
pt.TaskSpec = spec
if pt.TaskSpec == nil {
pt.TaskSpec = &v1beta1.EmbeddedTask{}
}
pt.TaskSpec.TaskSpec = spec
}
}

// TaskSpecMetadata sets the Metadata on a TaskSpec within PipelineTask.
func TaskSpecMetadata(metadata v1beta1.PipelineTaskMetadata) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
if pt.TaskSpec == nil {
pt.TaskSpec = &v1beta1.EmbeddedTask{}
}
pt.TaskSpec.Metadata = metadata
}
}

Expand Down
40 changes: 28 additions & 12 deletions internal/builder/v1beta1/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ func TestPipeline(t *testing.T) {
tb.RunAfter("foo"),
tb.PipelineTaskTimeout(5*time.Second),
),
tb.PipelineTask("foo", "", tb.PipelineTaskSpec(&v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "step",
Image: "myimage",
}}},
})),
tb.PipelineTask("foo", "", tb.PipelineTaskSpec(getTaskSpec())),
tb.PipelineTask("task-with-taskSpec", "",
tb.TaskSpecMetadata(v1beta1.PipelineTaskMetadata{
Labels: map[string]string{"label": "labelvalue"},
Annotations: map[string]string{"annotation": "annotationvalue"}},
),
tb.PipelineTaskSpec(getTaskSpec()),
),
tb.PipelineWorkspaceDeclaration("workspace1"),
),
tb.PipelineCreationTimestamp(creationTime),
Expand Down Expand Up @@ -137,13 +139,19 @@ func TestPipeline(t *testing.T) {
Timeout: &metav1.Duration{Duration: 5 * time.Second},
}, {
Name: "foo",
TaskSpec: &v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "step",
Image: "myimage",
}}},
TaskSpec: &v1beta1.EmbeddedTask{
TaskSpec: getTaskSpec()},
}, {
Name: "task-with-taskSpec",
TaskSpec: &v1beta1.EmbeddedTask{
Metadata: v1beta1.PipelineTaskMetadata{
Labels: map[string]string{"label": "labelvalue"},
Annotations: map[string]string{"annotation": "annotationvalue"},
},
TaskSpec: getTaskSpec(),
},
}},
},
},
Workspaces: []v1beta1.PipelineWorkspaceDeclaration{{
Name: "workspace1",
}},
Expand Down Expand Up @@ -426,5 +434,13 @@ func TestPipelineRunWithFinalTask(t *testing.T) {
if diff := cmp.Diff(expectedPipelineRun, pipelineRun); diff != "" {
t.Fatalf("PipelineRun diff -want, +got: %s", diff)
}
}

func getTaskSpec() *v1beta1.TaskSpec {
return &v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "step",
Image: "myimage",
}}},
}
}
6 changes: 3 additions & 3 deletions pkg/apis/pipeline/v1alpha1/pipeline_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (source *PipelineTask) ConvertTo(ctx context.Context, sink *v1beta1.Pipelin
sink.Name = source.Name
sink.TaskRef = source.TaskRef
if source.TaskSpec != nil {
sink.TaskSpec = &v1beta1.TaskSpec{}
if err := source.TaskSpec.ConvertTo(ctx, sink.TaskSpec); err != nil {
sink.TaskSpec = &v1beta1.EmbeddedTask{TaskSpec: &v1beta1.TaskSpec{}}
if err := source.TaskSpec.ConvertTo(ctx, sink.TaskSpec.TaskSpec); err != nil {
return err
}
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (sink *PipelineTask) ConvertFrom(ctx context.Context, source v1beta1.Pipeli
sink.TaskRef = source.TaskRef
if source.TaskSpec != nil {
sink.TaskSpec = &TaskSpec{}
if err := sink.TaskSpec.ConvertFrom(ctx, source.TaskSpec); err != nil {
if err := sink.TaskSpec.ConvertFrom(ctx, source.TaskSpec.TaskSpec); err != nil {
return err
}
}
Expand Down
44 changes: 26 additions & 18 deletions pkg/apis/pipeline/v1beta1/pipeline_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,41 +85,49 @@ func TestPipelineSpec_SetDefaults(t *testing.T) {
desc: "pipeline task with taskSpec - default param type must be " + string(v1beta1.ParamTypeString),
ps: &v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "foo", TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
}},
Name: "foo", TaskSpec: &v1beta1.EmbeddedTask{
TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
}},
},
},
}},
},
want: &v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "foo", TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
Type: v1beta1.ParamTypeString,
}},
Name: "foo", TaskSpec: &v1beta1.EmbeddedTask{
TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
Type: v1beta1.ParamTypeString,
}},
},
},
}},
},
}, {
desc: "final pipeline task with taskSpec - default param type must be " + string(v1beta1.ParamTypeString),
ps: &v1beta1.PipelineSpec{
Finally: []v1beta1.PipelineTask{{
Name: "foo", TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
}},
Name: "foo", TaskSpec: &v1beta1.EmbeddedTask{
TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
}},
},
},
}},
},
want: &v1beta1.PipelineSpec{
Finally: []v1beta1.PipelineTask{{
Name: "foo", TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
Type: v1beta1.ParamTypeString,
}},
Name: "foo", TaskSpec: &v1beta1.EmbeddedTask{
TaskSpec: &v1beta1.TaskSpec{
Params: []v1beta1.ParamSpec{{
Name: "string-param",
Type: v1beta1.ParamTypeString,
}},
},
},
}},
},
Expand Down
23 changes: 22 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ type PipelineResult struct {
Value string `json:"value"`
}

type PipelineTaskMetadata struct {
// +optional
Labels map[string]string `json:"labels,omitempty"`

// +optional
Annotations map[string]string `json:"annotations,omitempty"`
}
pritidesai marked this conversation as resolved.
Show resolved Hide resolved

type EmbeddedTask struct {
// +optional
Metadata PipelineTaskMetadata `json:"metadata,omitempty"`

// TaskSpec is a specification of a task
// +optional
*TaskSpec `json:",inline,omitempty"`
}

// PipelineTask defines a task in a Pipeline, passing inputs from both
// Params and from the output of previous tasks.
type PipelineTask struct {
Expand All @@ -104,7 +121,7 @@ type PipelineTask struct {

// TaskSpec is a specification of a task
// +optional
TaskSpec *TaskSpec `json:"taskSpec,omitempty"`
TaskSpec *EmbeddedTask `json:"taskSpec,inline,omitempty"`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing one more change which might break CLI (see issue #2748)


// Conditions is a list of conditions that need to be true for the task to run
// +optional
Expand Down Expand Up @@ -139,6 +156,10 @@ type PipelineTask struct {
Timeout *metav1.Duration `json:"timeout,omitempty"`
}

func (pt *PipelineTask) TaskSpecMetadata() PipelineTaskMetadata {
return pt.TaskSpec.Metadata
}

func (pt PipelineTask) HashKey() string {
return pt.Name
}
Expand Down
60 changes: 24 additions & 36 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,9 @@ func TestPipelineSpec_Validate_Failure(t *testing.T) {
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
}, {
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
TaskSpec: &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
}},
},
}, {
Expand Down Expand Up @@ -250,12 +246,8 @@ func TestValidatePipelineTasks_Success(t *testing.T) {
}, {
name: "pipeline task with valid taskspec",
tasks: []PipelineTask{{
Name: "foo",
TaskSpec: &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
Name: "foo",
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
}},
}}
for _, tt := range tests {
Expand All @@ -280,19 +272,15 @@ func TestValidatePipelineTasks_Failure(t *testing.T) {
}, {
name: "pipeline task with both taskref and taskspec",
tasks: []PipelineTask{{
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
TaskSpec: &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
Name: "foo",
TaskRef: &TaskRef{Name: "foo-task"},
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
}},
}, {
name: "pipeline task with invalid taskspec",
tasks: []PipelineTask{{
Name: "foo",
TaskSpec: &TaskSpec{},
TaskSpec: &EmbeddedTask{TaskSpec: &TaskSpec{}},
}},
}, {
name: "pipeline tasks invalid (duplicate tasks)",
Expand Down Expand Up @@ -660,14 +648,14 @@ func TestValidateGraph_Failure(t *testing.T) {
func TestValidateParamResults_Success(t *testing.T) {
desc := "valid pipeline task referencing task result along with parameter variable"
tasks := []PipelineTask{{
TaskSpec: &TaskSpec{
TaskSpec: &EmbeddedTask{TaskSpec: &TaskSpec{
Results: []TaskResult{{
Name: "output",
}},
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
}},
Name: "a-task",
}, {
Name: "foo",
Expand Down Expand Up @@ -1041,12 +1029,8 @@ func TestValidatePipelineWithFinalTasks_Success(t *testing.T) {
Name: "final-task-1",
TaskRef: &TaskRef{Name: "final-task"},
}, {
Name: "final-task-2",
TaskSpec: &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
Name: "final-task-2",
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
}},
},
},
Expand Down Expand Up @@ -1200,13 +1184,9 @@ func TestValidatePipelineWithFinalTasks_Failure(t *testing.T) {
TaskRef: &TaskRef{Name: "non-final-task"},
}},
Finally: []PipelineTask{{
Name: "final-task",
TaskRef: &TaskRef{Name: "non-final-task"},
TaskSpec: &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
},
Name: "final-task",
TaskRef: &TaskRef{Name: "non-final-task"},
TaskSpec: &EmbeddedTask{TaskSpec: getTaskSpec()},
}},
},
},
Expand Down Expand Up @@ -1491,3 +1471,11 @@ func TestContextInvalid(t *testing.T) {
})
}
}

func getTaskSpec() *TaskSpec {
return &TaskSpec{
Steps: []Step{{
Container: corev1.Container{Name: "foo", Image: "bar"},
}},
}
}
Loading