Skip to content

Commit

Permalink
Start emitting CloudEvents for TaskRuns
Browse files Browse the repository at this point in the history
Add a new method 'SendCloudEventWithRetries' to the cloud events
controller. It that allows emitting cloud events asynchronously
(in a go routine), taking benefit of the retries capabilities of
the cloudevents go sdk.

Rework the fake client to allow for unit testing of
'SendCloudEventWithRetries'. The new implementation is similar
to that of the fake recorder from client-go, which allows to
unit test k8s events and cloud events in a similar fashion.

Add a new config option default-cloud-events-sink in the
defaults config map. This options allows setting a default sink
for cloud events. If the default sink is setup, cloud events are
sent, else they're disabled.

Invoke 'SendCloudEventWithRetries' from the taskrun controller
in the same places where we emit k8s events, except not for
errors.
  • Loading branch information
afrittoli committed Jun 25, 2020
1 parent 44f22a0 commit 83beab3
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 28 deletions.
9 changes: 8 additions & 1 deletion config/config-defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ data:
# default-pod-template contains the default pod template to use
# TaskRun and PipelineRun, if none is specified. If a pod template
# is specified, the default pod template is ignored.
# default-pod-template:
# default-pod-template:
# default-cloud-events-sink contains the default CloudEvents sink to be
# used for TaskRun and PipelineRun, when no sink is specified.
# Note that right now it is still not possible to set a PipelineRun or
# TaskRun specific sink, so the default is the only option available.
# If no sink is specified, no CloudEvent is generated
# default-cloud-events-sink:
27 changes: 25 additions & 2 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ weight: 2

Tekton runtime resources, specifically `TaskRuns` and `PipelineRuns`,
emit events when they are executed, so that users can monitor their lifecycle
and react to it. Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via
`kubectl describe [resource]`.
and react to it.

Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via `kubectl describe [resource]`.
[Optionally](#CloudEvents) Tekton can emit [CloudEvents](https://github.com/cloudevents/spec) too.

No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeline/issues/2461).

Expand Down Expand Up @@ -39,3 +41,24 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
- `Failed`: this is triggered if the `PipelineRun` is completed, but not
successfully. Causes of failure may be: one the `Tasks` failed or the
`PipelineRun` was cancelled.

# CloudEvents

WHen a sink is [configured](../install.md#configuring-cloudevents-notifications), the following events
will be generated by appropriate controller when a lifecycle event happens for `TaskRun` or
`PipelineRun`.

The complete table of events:

Reasource |Event |Event Type
:-------------|:-------:|:----------------------------------------------------------
TaskRun | Started | dev.tekton.event.taskrun.started.v1
TaskRun | Running | dev.tekton.event.taskrun.runnning.v1
TaskRun | Condition Change while Running | dev.tekton.event.taskrun.unknown.v1
TaskRun | Succeed | dev.tekton.event.taskrun.successful.v1
TaskRun | Failed | dev.tekton.event.taskrun.failed.v1
PipelineRun | Started | dev.tekton.event.pipelinerun.started.v1
PipelineRun | Running | dev.tekton.event.pipelinerun.runnning.v1
PipelineRun | Condition Change while Running | dev.tekton.event.pipelinerun.unknown.v1
PipelineRun | Succeed | dev.tekton.event.pipelinerun.successful.v1
PipelineRun | Failed | dev.tekton.event.pipelinerun.failed.v1
19 changes: 19 additions & 0 deletions docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,25 @@ data:
bucket.service.account.field.name: GOOGLE_APPLICATION_CREDENTIALS
```
## Configuring CloudEvents notifications
When configured so, Tekton can generate `CloudEVents` for `TaskRun` and `PipelineRun` lifecycle
events. The only configuration parameter is the URL of the sink. When not set, no notification is
generared.

```
apiVersion: v1
kind: ConfigMap
metadata:
name: config-defaults
namespace: tekton-pipelines
labels:
app.kubernetes.io/instance: default
app.kubernetes.io/part-of: tekton-pipelines
data:
default-cloud-events-sink: https://my-sink-url
```
## Customizing basic execution parameters
You can specify your own values that replace the default service account (`ServiceAccount`), timeout (`Timeout`), and Pod template (`PodTemplate`) values used by Tekton Pipelines in `TaskRun` and `PipelineRun` definitions. To do so, modify the ConfigMap `config-defaults` with your desired values.
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ github.com/clarketm/json v1.13.4/go.mod h1:ynr2LRfb0fQU34l07csRNBTcivjySLLiY1YzQ
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3 h1:DNM19kh6j6qGBx/FI7OmHKBL2vCW1eN28ESYK1+O5DY=
github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZWMLGg3om8SswStBoY6/SHvcLM19MuZqwDtMtmzs=
github.com/cloudevents/sdk-go v1.2.0 h1:2AxI14EJUw1PclJ5gZJtzbxnHIfNMdi76Qq3P3G1BRU=
github.com/cloudevents/sdk-go/v2 v2.0.0 h1:AUdGJwaSUnA+VvepKqgjy6XDkPcf0hf/3L7icEs1ibs=
github.com/cloudevents/sdk-go/v2 v2.0.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
11 changes: 10 additions & 1 deletion pkg/apis/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
defaultManagedByLabelValueKey = "default-managed-by-label-value"
DefaultManagedByLabelValue = "tekton-pipelines"
defaultPodTemplateKey = "default-pod-template"
defaultCloudEventsSinkKey = "default-cloud-events-sink"
DefaultCloudEventSinkValue = ""
)

// Defaults holds the default configurations
Expand All @@ -44,6 +46,7 @@ type Defaults struct {
DefaultServiceAccount string
DefaultManagedByLabelValue string
DefaultPodTemplate *pod.Template
DefaultCloudEventsSink string
}

// GetBucketConfigName returns the name of the configmap containing all
Expand All @@ -68,14 +71,16 @@ func (cfg *Defaults) Equals(other *Defaults) bool {
return other.DefaultTimeoutMinutes == cfg.DefaultTimeoutMinutes &&
other.DefaultServiceAccount == cfg.DefaultServiceAccount &&
other.DefaultManagedByLabelValue == cfg.DefaultManagedByLabelValue &&
other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate)
other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) &&
other.DefaultCloudEventsSink == cfg.DefaultCloudEventsSink
}

// NewDefaultsFromMap returns a Config given a map corresponding to a ConfigMap
func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) {
tc := Defaults{
DefaultTimeoutMinutes: DefaultTimeoutMinutes,
DefaultManagedByLabelValue: DefaultManagedByLabelValue,
DefaultCloudEventsSink: DefaultCloudEventSinkValue,
}

if defaultTimeoutMin, ok := cfgMap[defaultTimeoutMinutesKey]; ok {
Expand All @@ -102,6 +107,10 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) {
tc.DefaultPodTemplate = &podTemplate
}

if defaultCloudEventsSink, ok := cfgMap[defaultCloudEventsSinkKey]; ok {
tc.DefaultCloudEventsSink = defaultCloudEventsSink
}

return &tc, nil
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package cloudevent

import (
"context"
"errors"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/hashicorp/go-multierror"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
controller "knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

// InitializeCloudEvents initializes the CloudEvents part of the
Expand Down Expand Up @@ -108,3 +112,31 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared
}
return merr.ErrorOrNil()
}

// SendCloudEventWithRetries sends a cloud event for the specified resource.
// It does not block and it perform retries with backoff using the cloudevents
// sdk-go capabilities.
func SendCloudEventWithRetries(ctx context.Context, object objectWithCondition) error {
logger := logging.FromContext(ctx)
ceClient := Get(ctx)
if ceClient == nil {
return errors.New("No cloud events client found in the context")
}
event, err := EventForObjectWithCondition(object)
if err != nil {
return err
}

go func() {
if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) {
logger.Warnf("Failed to send cloudevent: %s", result.Error())
recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
logger.Warnf("No recorder in context, cannot emit error event")
}
events.EmitError(recorder, result, object)
}
}()

return nil
}
179 changes: 179 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ limitations under the License.
package cloudevent

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/logging"
"github.com/tektoncd/pipeline/test/diff"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
rtesting "knative.dev/pkg/reconciler/testing"
)

func TestCloudEventDeliveryFromTargets(t *testing.T) {
Expand Down Expand Up @@ -283,3 +293,172 @@ func TestInitializeCloudEvents(t *testing.T) {
})
}
}

func TestSendCloudEventWithRetries(t *testing.T) {

objectStatus := duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}},
}

tests := []struct {
name string
clientBehaviour FakeClientBehaviour
object objectWithCondition
wantCEvent string
wantEvent string
}{{
name: "test-send-cloud-event-taskrun",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: true,
},
object: &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/taskruns/test1",
},
Status: v1beta1.TaskRunStatus{Status: objectStatus},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-pipelinerun",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: true,
},
object: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/pipelineruns/test1",
},
Status: v1beta1.PipelineRunStatus{Status: objectStatus},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-failed",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: false,
},
object: &v1beta1.PipelineRun{
Status: v1beta1.PipelineRunStatus{Status: objectStatus},
},
wantCEvent: "",
wantEvent: "Warning Error ",
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, tc.clientBehaviour, true)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
if err != nil {
t.Fatalf("Unexpected error sending cloud events: %v", err)
}
ceClient := Get(ctx).(FakeClient)
err = checkCloudEvents(t, &ceClient, tc.name, tc.wantCEvent)
if err != nil {
t.Fatalf(err.Error())
}
recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
err = checkEvents(t, recorder, tc.name, tc.wantEvent)
if err != nil {
t.Fatalf(err.Error())
}
})
}
}

func TestSendCloudEventWithRetriesInvalid(t *testing.T) {

tests := []struct {
name string
object objectWithCondition
wantCEvent string
wantEvent string
}{{
name: "test-send-cloud-event-invalid-taskrun",
object: &v1beta1.TaskRun{
Status: v1beta1.TaskRunStatus{},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-pipelinerun",
object: &v1beta1.PipelineRun{
Status: v1beta1.PipelineRunStatus{},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, FakeClientBehaviour{
SendSuccessfully: true,
}, true)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
if err == nil {
t.Fatalf("Expected an error sending cloud events for invalid object, got none")
}
})
}
}

func TestSendCloudEventWithRetriesNoClient(t *testing.T) {

ctx := setupFakeContext(t, FakeClientBehaviour{}, false)
err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}})
if err == nil {
t.Fatalf("Expected an error sending cloud events with no client in the context, got none")
}
if d := cmp.Diff("No cloud events client found in the context", err.Error()); d != "" {
t.Fatalf("Unexpected error message %s", diff.PrintWantGot(d))
}
}

func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool) context.Context {
var ctx context.Context
ctx, _ = rtesting.SetupFakeContext(t)
if withClient {
ctx = WithClient(ctx, &behaviour)
}
return ctx
}

func testLogger(t *testing.T) *zap.SugaredLogger {
logger, err := zap.NewDevelopment(zap.AddCaller())
if err != nil {
t.Fatalf("failed to create logger: %s", err)
}
return logger.Sugar().Named(t.Name())
}

func eventFromChannel(c chan string, testName string, wantEvent string) error {
timer := time.NewTimer(1 * time.Second)
select {
case event := <-c:
if wantEvent == "" {
return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName)
}
if !(strings.HasPrefix(event, wantEvent)) {
return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName)
}
case <-timer.C:
if wantEvent != "" {
return fmt.Errorf("received no events for %s but %s expected", testName, wantEvent)
}
}
return nil
}

func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error {
t.Helper()
return eventFromChannel(fr.Events, testName, wantEvent)
}

func checkCloudEvents(t *testing.T, fce *FakeClient, testName string, wantEvent string) error {
t.Helper()
return eventFromChannel(fce.Events, testName, wantEvent)
}
3 changes: 3 additions & 0 deletions pkg/reconciler/events/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event,

func getEventType(runObject objectWithCondition) (*TektonEventType, error) {
c := runObject.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
if c == nil {
return nil, fmt.Errorf("no condition for in %T", runObject)
}
var eventType TektonEventType
switch {
case c.IsUnknown():
Expand Down
Loading

0 comments on commit 83beab3

Please sign in to comment.