From 127f563ca6b4260915d5f1485cc17284a4239d71 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Thu, 18 Jun 2020 22:29:29 +0100 Subject: [PATCH] Start emitting CloudEvents for TaskRuns Add a new method 'SendCloudEventWithRetries' to the cloud events controller. It that allows emitting cloud events asynchronously (in a go routine), taking benefit of the retries capabilities of the cloudevents go sdk. Rework the fake client to allow for unit testing of 'SendCloudEventWithRetries'. The new implementation is similar to that of the fake recorder from client-go, which allows to unit test k8s events and cloud events in a similar fashion. Add a new config option default-cloud-events-sink in the defaults config map. This options allows setting a default sink for cloud events. If the default sink is setup, cloud events are sent, else they're disabled. Invoke 'SendCloudEventWithRetries' from the taskrun controller in the same places where we emit k8s events, except not for errors. --- config/config-defaults.yaml | 9 +- docs/events.md | 27 ++- docs/install.md | 19 ++ go.sum | 1 + pkg/apis/config/default.go | 11 +- .../cloudevent/cloud_event_controller.go | 32 ++++ .../cloudevent/cloud_event_controller_test.go | 165 ++++++++++++++++++ .../events/cloudevent/cloudevent.go | 3 + .../events/cloudevent/cloudeventclient.go | 8 + .../cloudevent/cloudeventsfakeclient.go | 23 +-- pkg/reconciler/events/cloudevent/interface.go | 4 + pkg/reconciler/taskrun/taskrun.go | 28 ++- 12 files changed, 314 insertions(+), 16 deletions(-) diff --git a/config/config-defaults.yaml b/config/config-defaults.yaml index 1c23c72f20a..052f39c87a9 100644 --- a/config/config-defaults.yaml +++ b/config/config-defaults.yaml @@ -54,4 +54,11 @@ data: # default-pod-template contains the default pod template to use # TaskRun and PipelineRun, if none is specified. If a pod template # is specified, the default pod template is ignored. - # default-pod-template: \ No newline at end of file + # default-pod-template: + + # default-cloud-events-sink contains the default CloudEvents sink to be + # used for TaskRun and PipelineRun, when no sink is specified. + # Note that right now it is still not possible to set a PipelineRun or + # TaskRun specific sink, so the default is the only option available. + # If no sink is specified, no CloudEvent is generated + # default-cloud-events-sink: \ No newline at end of file diff --git a/docs/events.md b/docs/events.md index a0d9302d272..b270ff7479e 100644 --- a/docs/events.md +++ b/docs/events.md @@ -8,8 +8,10 @@ weight: 2 Tekton runtime resources, specifically `TaskRuns` and `PipelineRuns`, emit events when they are executed, so that users can monitor their lifecycle -and react to it. Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via -`kubectl describe [resource]`. +and react to it. + +Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via `kubectl describe [resource]`. +[Optionally](#CloudEvents) Tekton can emit [CloudEvents](https://github.com/cloudevents/spec) too. No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeline/issues/2461). @@ -39,3 +41,24 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli - `Failed`: this is triggered if the `PipelineRun` is completed, but not successfully. Causes of failure may be: one the `Tasks` failed or the `PipelineRun` was cancelled. + +# CloudEvents + +WHen a sink is [configured](../install.md#configuring-cloudevents-notifications), the following events +will be generated by appropriate controller when a lifecycle event happens for `TaskRun` or +`PipelineRun`. + +The complete table of events: + +Reasource |Event |Event Type +:-------------|:-------:|:---------------------------------------------------------- +TaskRun | Started | dev.tekton.event.taskrun.started.v1 +TaskRun | Running | dev.tekton.event.taskrun.runnning.v1 +TaskRun | Condition Change while Running | dev.tekton.event.taskrun.unknown.v1 +TaskRun | Succeed | dev.tekton.event.taskrun.successful.v1 +TaskRun | Failed | dev.tekton.event.taskrun.failed.v1 +PipelineRun | Started | dev.tekton.event.pipelinerun.started.v1 +PipelineRun | Running | dev.tekton.event.pipelinerun.runnning.v1 +PipelineRun | Condition Change while Running | dev.tekton.event.pipelinerun.unknown.v1 +PipelineRun | Succeed | dev.tekton.event.pipelinerun.successful.v1 +PipelineRun | Failed | dev.tekton.event.pipelinerun.failed.v1 \ No newline at end of file diff --git a/docs/install.md b/docs/install.md index 57d10df40fc..eaa36add7cf 100644 --- a/docs/install.md +++ b/docs/install.md @@ -235,6 +235,25 @@ data: bucket.service.account.field.name: GOOGLE_APPLICATION_CREDENTIALS ``` +## Configuring CloudEvents notifications + +When configured so, Tekton can generate `CloudEVents` for `TaskRun` and `PipelineRun` lifecycle +events. The only configuration parameter is the URL of the sink. When not set, no notification is +generared. + +``` +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-defaults + namespace: tekton-pipelines + labels: + app.kubernetes.io/instance: default + app.kubernetes.io/part-of: tekton-pipelines +data: + default-cloud-events-sink: https://my-sink-url +``` + ## Customizing basic execution parameters You can specify your own values that replace the default service account (`ServiceAccount`), timeout (`Timeout`), and Pod template (`PodTemplate`) values used by Tekton Pipelines in `TaskRun` and `PipelineRun` definitions. To do so, modify the ConfigMap `config-defaults` with your desired values. diff --git a/go.sum b/go.sum index 19a2ee3e5bb..2fa19689186 100644 --- a/go.sum +++ b/go.sum @@ -193,6 +193,7 @@ github.com/clarketm/json v1.13.4/go.mod h1:ynr2LRfb0fQU34l07csRNBTcivjySLLiY1YzQ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3 h1:DNM19kh6j6qGBx/FI7OmHKBL2vCW1eN28ESYK1+O5DY= github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZWMLGg3om8SswStBoY6/SHvcLM19MuZqwDtMtmzs= +github.com/cloudevents/sdk-go v1.2.0 h1:2AxI14EJUw1PclJ5gZJtzbxnHIfNMdi76Qq3P3G1BRU= github.com/cloudevents/sdk-go/v2 v2.0.0 h1:AUdGJwaSUnA+VvepKqgjy6XDkPcf0hf/3L7icEs1ibs= github.com/cloudevents/sdk-go/v2 v2.0.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= diff --git a/pkg/apis/config/default.go b/pkg/apis/config/default.go index 4f89ca4bd45..04b30a10a82 100644 --- a/pkg/apis/config/default.go +++ b/pkg/apis/config/default.go @@ -35,6 +35,8 @@ const ( defaultManagedByLabelValueKey = "default-managed-by-label-value" DefaultManagedByLabelValue = "tekton-pipelines" defaultPodTemplateKey = "default-pod-template" + defaultCloudEventsSinkKey = "default-cloud-events-sink" + DefaultCloudEventSinkValue = "" ) // Defaults holds the default configurations @@ -44,6 +46,7 @@ type Defaults struct { DefaultServiceAccount string DefaultManagedByLabelValue string DefaultPodTemplate *pod.Template + DefaultCloudEventsSink string } // GetBucketConfigName returns the name of the configmap containing all @@ -68,7 +71,8 @@ func (cfg *Defaults) Equals(other *Defaults) bool { return other.DefaultTimeoutMinutes == cfg.DefaultTimeoutMinutes && other.DefaultServiceAccount == cfg.DefaultServiceAccount && other.DefaultManagedByLabelValue == cfg.DefaultManagedByLabelValue && - other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) + other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) && + other.DefaultCloudEventsSink == cfg.DefaultCloudEventsSink } // NewDefaultsFromMap returns a Config given a map corresponding to a ConfigMap @@ -76,6 +80,7 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc := Defaults{ DefaultTimeoutMinutes: DefaultTimeoutMinutes, DefaultManagedByLabelValue: DefaultManagedByLabelValue, + DefaultCloudEventsSink: DefaultCloudEventSinkValue, } if defaultTimeoutMin, ok := cfgMap[defaultTimeoutMinutesKey]; ok { @@ -102,6 +107,10 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) { tc.DefaultPodTemplate = &podTemplate } + if defaultCloudEventsSink, ok := cfgMap[defaultCloudEventsSinkKey]; ok { + tc.DefaultCloudEventsSink = defaultCloudEventsSink + } + return &tc, nil } diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller.go b/pkg/reconciler/events/cloudevent/cloud_event_controller.go index 523520273b5..7e3db6d796f 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller.go @@ -18,6 +18,7 @@ package cloudevent import ( "context" + "errors" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -25,8 +26,11 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent" + "github.com/tektoncd/pipeline/pkg/reconciler/events" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + controller "knative.dev/pkg/controller" + "knative.dev/pkg/logging" ) // InitializeCloudEvents initializes the CloudEvents part of the @@ -108,3 +112,31 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared } return merr.ErrorOrNil() } + +// SendCloudEventWithRetries sends a cloud event for the specified resource. +// It does not block and it perform retries with backoff using the cloudevents +// sdk-go capabilities. +func SendCloudEventWithRetries(ctx context.Context, object objectWithCondition) error { + logger := logging.FromContext(ctx) + ceClient := Get(ctx) + if ceClient == nil { + return errors.New("No cloud events client found in the context") + } + event, err := EventForObjectWithCondition(object) + if err != nil { + return err + } + + go func() { + if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) { + logger.Warnf("Failed to send cloudevent: %s", result.Error()) + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + logger.Warnf("No recorder in context, cannot emit error event") + } + events.EmitError(recorder, result, object) + } + }() + + return nil +} diff --git a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go index df462fa8488..9afa07c5917 100644 --- a/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go +++ b/pkg/reconciler/events/cloudevent/cloud_event_controller_test.go @@ -17,7 +17,11 @@ limitations under the License. package cloudevent import ( + "context" + "fmt" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" @@ -25,8 +29,15 @@ import ( resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1" "github.com/tektoncd/pipeline/pkg/logging" "github.com/tektoncd/pipeline/test/diff" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" + duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" + "knative.dev/pkg/controller" + klogging "knative.dev/pkg/logging" + rtesting "knative.dev/pkg/reconciler/testing" ) func TestCloudEventDeliveryFromTargets(t *testing.T) { @@ -283,3 +294,157 @@ func TestInitializeCloudEvents(t *testing.T) { }) } } + +func TestSendCloudEventWithRetries(t *testing.T) { + + objectStatus := duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }}, + } + + tests := []struct { + name string + clientBehaviour FakeClientBehaviour + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-taskrun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/taskruns/test1", + }, + Status: v1beta1.TaskRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: true, + }, + object: &v1beta1.PipelineRun{ + ObjectMeta: metav1.ObjectMeta{ + SelfLink: "/pipelineruns/test1", + }, + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-failed", + clientBehaviour: FakeClientBehaviour{ + SendSuccessfully: false, + }, + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{Status: objectStatus}, + }, + wantCEvent: "", + wantEvent: "Warning Error ", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, _ := setupFakeContext(t, tc.clientBehaviour) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err != nil { + t.Fatalf("Unexpected error sending cloud events: %v", err) + } + ceClient := Get(ctx).(FakeClient) + err = checkCloudEvents(t, &ceClient, tc.name, tc.wantCEvent) + if err != nil { + t.Fatalf(err.Error()) + } + recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder) + err = checkEvents(t, recorder, tc.name, tc.wantEvent) + if err != nil { + t.Fatalf(err.Error()) + } + }) + } +} + +func TestSendCloudEventWithRetriesInvalid(t *testing.T) { + + tests := []struct { + name string + object objectWithCondition + wantCEvent string + wantEvent string + }{{ + name: "test-send-cloud-event-invalid-taskrun", + object: &v1beta1.TaskRun{ + Status: v1beta1.TaskRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }, { + name: "test-send-cloud-event-pipelinerun", + object: &v1beta1.PipelineRun{ + Status: v1beta1.PipelineRunStatus{}, + }, + wantCEvent: "Validation: valid", + wantEvent: "", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, _ := setupFakeContext(t, FakeClientBehaviour{ + SendSuccessfully: true, + }) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + err := SendCloudEventWithRetries(ctx, tc.object) + if err == nil { + t.Fatalf("Expected an error sending cloud events for invalid object, got none") + } + }) + } +} + +func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour) (context.Context, []controller.Informer) { + ctx, informer := rtesting.SetupFakeContext(t) + ctx = WithClient(ctx, &behaviour) + return klogging.WithLogger(ctx, testLogger(t)), informer +} + +func testLogger(t *testing.T) *zap.SugaredLogger { + logger, err := zap.NewDevelopment(zap.AddCaller()) + if err != nil { + t.Fatalf("failed to create logger: %s", err) + } + return logger.Sugar().Named(t.Name()) +} + +func eventFromChannel(c chan string, testName string, wantEvent string) error { + timer := time.NewTimer(1 * time.Second) + select { + case event := <-c: + if wantEvent == "" { + return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName) + } + if !(strings.HasPrefix(event, wantEvent)) { + return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName) + } + case <-timer.C: + if wantEvent != "" { + return fmt.Errorf("received no events for %s but %s expected", testName, wantEvent) + } + } + return nil +} + +func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fr.Events, testName, wantEvent) +} + +func checkCloudEvents(t *testing.T, fce *FakeClient, testName string, wantEvent string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvent) +} diff --git a/pkg/reconciler/events/cloudevent/cloudevent.go b/pkg/reconciler/events/cloudevent/cloudevent.go index 16634741907..c1328540571 100644 --- a/pkg/reconciler/events/cloudevent/cloudevent.go +++ b/pkg/reconciler/events/cloudevent/cloudevent.go @@ -133,6 +133,9 @@ func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event, func getEventType(runObject objectWithCondition) (*TektonEventType, error) { c := runObject.GetStatusCondition().GetCondition(apis.ConditionSucceeded) + if c == nil { + return nil, fmt.Errorf("no condition for in %T", runObject) + } var eventType TektonEventType switch { case c.IsUnknown(): diff --git a/pkg/reconciler/events/cloudevent/cloudeventclient.go b/pkg/reconciler/events/cloudevent/cloudeventclient.go index 372f8db4ad5..1fb7dec4f86 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventclient.go @@ -20,6 +20,7 @@ import ( "context" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/tektoncd/pipeline/pkg/apis/config" "k8s.io/client-go/rest" "knative.dev/pkg/injection" "knative.dev/pkg/logging" @@ -34,7 +35,9 @@ type CECKey struct{} func withCloudEventClient(ctx context.Context, cfg *rest.Config) context.Context { logger := logging.FromContext(ctx) + p, err := cloudevents.NewHTTP() + if err != nil { logger.Panicf("Error creating the cloudevents http protocol: %s", err) return ctx @@ -57,3 +60,8 @@ func Get(ctx context.Context) CEClient { } return untyped.(CEClient) } + +// ToContext adds the cloud events client to the context +func ToContext(ctx context.Context, cec CEClient) context.Context { + return context.WithValue(ctx, CECKey{}, cec) +} \ No newline at end of file diff --git a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go index a2665952b3d..c33ed444e6b 100644 --- a/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go +++ b/pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go @@ -24,44 +24,47 @@ import ( "github.com/cloudevents/sdk-go/v2/protocol" ) +const bufferSize = 100 + // FakeClientBehaviour defines how the client will behave type FakeClientBehaviour struct { SendSuccessfully bool } // FakeClient is a fake CloudEvent client for unit testing -// Holding a pointer to the behaviour allows to change the behaviour of a client +// Holding pointer to the behaviour allows to change the behaviour of a client type FakeClient struct { - behaviour *FakeClientBehaviour - event cloudevents.Event + behaviour *FakeClientBehaviour + // Modelled after k8s.io/client-go fake recorder + Events chan string } // NewFakeClient is a FakeClient factory, it returns a client for the target func NewFakeClient(behaviour *FakeClientBehaviour) cloudevents.Client { - c := FakeClient{ - behaviour: behaviour, + return FakeClient{ + behaviour: behaviour, + Events: make(chan string, bufferSize), } - return c } var _ cloudevents.Client = (*FakeClient)(nil) // Send fakes the Send method from cloudevents.Client func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) protocol.Result { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%s", event.String()) return nil } - return fmt.Errorf("%s had to fail", event.ID()) + return fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // Request fakes the Request method from cloudevents.Client func (c FakeClient) Request(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, protocol.Result) { - c.event = event if c.behaviour.SendSuccessfully { + c.Events <- fmt.Sprintf("%v", event.String()) return &event, nil } - return nil, fmt.Errorf("%s had to fail", event.ID()) + return nil, fmt.Errorf("Had to fail. Event ID: %s", event.ID()) } // StartReceiver fakes StartReceiver method from cloudevents.Client diff --git a/pkg/reconciler/events/cloudevent/interface.go b/pkg/reconciler/events/cloudevent/interface.go index de168fd9dbb..b58ad479d9b 100644 --- a/pkg/reconciler/events/cloudevent/interface.go +++ b/pkg/reconciler/events/cloudevent/interface.go @@ -18,12 +18,16 @@ package cloudevent import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "knative.dev/pkg/apis" ) // objectWithCondition is implemented by TaskRun and PipelineRun type objectWithCondition interface { + // Object requires GetObjectKind() and DeepCopyObject() + runtime.Object + // ObjectMetaAccessor requires a GetObjectMeta that returns the ObjectMeta metav1.ObjectMetaAccessor diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 7e558d05f15..d24fa243a49 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -26,6 +26,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/pipeline/pkg/apis/resource" @@ -88,6 +89,11 @@ var _ taskrunreconciler.Interface = (*Reconciler)(nil) func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkgreconciler.Event { logger := logging.FromContext(ctx) recorder := controller.GetEventRecorder(ctx) + sendCloudEvents := (config.DefaultCloudEventsSink != "") + if sendCloudEvents { + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + ctx = cloudevents.ContextWithTarget(ctx, config.DefaultCloudEventsSink) + } // If the TaskRun is just starting, this will also set the starttime, // from which the timeout will immediately begin counting down. @@ -105,6 +111,12 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg // on the event to perform user facing initialisations, such has reset a CI check status afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) events.Emit(recorder, nil, afterCondition, tr) + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, tr) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } } // If the TaskRun is complete, run some post run fixtures when applicable @@ -197,11 +209,23 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, tr *v1beta1.TaskRun, beforeCondition *apis.Condition, previousError error) error { recorder := controller.GetEventRecorder(ctx) + logger := logging.FromContext(ctx) afterCondition := tr.Status.GetCondition(apis.ConditionSucceeded) + + // Send k8s events and cloud events (when configured) events.Emit(recorder, beforeCondition, afterCondition, tr) - _, err := c.updateLabelsAndAnnotations(tr) - events.EmitError(recorder, err, tr) + if sendCloudEvents { + err := cloudevent.SendCloudEventWithRetries(ctx, tr) + if err != nil { + logger.Warnf("Failed to emit cloud events %v", err.Error()) + } + } + + _, err = c.updateLabelsAndAnnotations(tr) + if err != nil { + events.EmitError(recorder, err, tr) + } return multierror.Append(previousError, err).ErrorOrNil() }