Skip to content

Commit

Permalink
Migrate jobrunner to cloudevent v2 (#2867)
Browse files Browse the repository at this point in the history
* use go client v2

* port jobrunner to cloudevent v2

* format

* leave the cloudevent client under adapter

* rebase

* Use !IsACK

* fix unit test
  • Loading branch information
lionelvillard authored Apr 1, 2020
1 parent 49789a8 commit 6386f24
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 167 deletions.
2 changes: 1 addition & 1 deletion pkg/adapter/ping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/robfig/cron"
"go.uber.org/zap"
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
)

type envConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/ping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestStart_ServeHTTP(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ce := adaptertest.NewTestClient()
ce := adaptertest.NewTestClient(nil)

a := &pingAdapter{
Schedule: tc.schedule,
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {

ce := adaptertest.NewTestClient()
ce := adaptertest.NewTestClient(nil)

a := &pingAdapter{
Data: "data",
Expand Down
78 changes: 9 additions & 69 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package adapter
import (
"context"
"errors"
"fmt"

"knative.dev/eventing/pkg/adapter/v2/metrics"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand Down Expand Up @@ -48,14 +49,14 @@ func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides
return &client{
ceClient: ceClient,
ceOverrides: ceOverrides,
reporter: reporter,
reporter: metrics.NewStatsReporterAdapter(reporter),
}, nil
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
reporter metrics.StatsReporterAdapter
}

var _ cloudevents.Client = (*client)(nil)
Expand All @@ -64,14 +65,17 @@ var _ cloudevents.Client = (*client)(nil)
func (c *client) Send(ctx context.Context, out event.Event) protocol.Result {
c.applyOverrides(ctx, &out)
res := c.ceClient.Send(ctx, out)
return c.reportCount(ctx, out, res)
return c.reporter.ReportCount(ctx, out, res)

}

// Request implements client.Request
func (c *client) Request(ctx context.Context, out event.Event) (*event.Event, protocol.Result) {
c.applyOverrides(ctx, &out)

resp, res := c.ceClient.Request(ctx, out)
return resp, c.reportCount(ctx, out, res)
return resp, c.reporter.ReportCount(ctx, out, res)

}

// StartReceiver implements client.StartReceiver
Expand All @@ -86,67 +90,3 @@ func (c *client) applyOverrides(ctx context.Context, event *cloudevents.Event) {
}
}
}

func (c *client) reportCount(ctx context.Context, event cloudevents.Event, result protocol.Result) error {
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
}

if cloudevents.IsACK(result) {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return fmt.Errorf("protocol.Result is not http.Result")
}

_ = c.reporter.ReportEventCount(reportArgs, res.StatusCode)
} else {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return result
}

if rErr := c.reporter.ReportEventCount(reportArgs, res.StatusCode); rErr != nil {
// metrics is not important enough to return an error if it is setup wrong.
// So combine reporter error with ce error if not nil.
if result != nil {
result = fmt.Errorf("%w\nmetrics reporter errror: %s", result, rErr)
}
}
}
return result
}

// Metric context

type MetricTag struct {
Name string
Namespace string
ResourceGroup string
}

type metricKey struct{}

// ContextWithMetricTag returns a copy of parent context in which the
// value associated with metric key is the supplied metric tag.
func ContextWithMetricTag(ctx context.Context, metric *MetricTag) context.Context {
return context.WithValue(ctx, metricKey{}, metric)
}

// MetricTagFromContext returns the metric tag stored in context.
// Returns nil if no metric tag is set in context, or if the stored value is
// not of correct type.
func MetricTagFromContext(ctx context.Context) *MetricTag {
if logger, ok := ctx.Value(metricKey{}).(*MetricTag); ok {
return logger
}
return &MetricTag{
Name: "unknown",
Namespace: "unknown",
ResourceGroup: "unknown",
}
}
32 changes: 11 additions & 21 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,15 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter/v2/test"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type mockReporter struct {
eventCount int
}

var (
fakeMasterURL = "test-source"
)

func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error {
r.eventCount += 1
return nil
}

func TestNewCloudEventsClient_send(t *testing.T) {
testCases := map[string]struct {
ceOverrides *duckv1.CloudEventOverrides
Expand Down Expand Up @@ -70,7 +60,8 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, &mockReporter{})
reporter := &rectesting.MockStatsReporter{}
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, reporter)
if err != nil {
t.Fail()
}
Expand All @@ -87,7 +78,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
t.Fatal(err)
}
validateSent(t, innerClient, tc.event.Type())
validateMetric(t, got.reporter, 1)
validateMetric(t, reporter, 1)
} else {
validateNotSent(t, innerClient)
}
Expand Down Expand Up @@ -125,7 +116,8 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, &mockReporter{})
reporter := &rectesting.MockStatsReporter{}
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, reporter)
if err != nil {
t.Fail()
}
Expand All @@ -142,7 +134,7 @@ func TestNewCloudEventsClient_request(t *testing.T) {
t.Fatal(err)
}
validateSent(t, innerClient, tc.event.Type())
validateMetric(t, got.reporter, 1)
validateMetric(t, reporter, 1)
} else {
validateNotSent(t, innerClient)
}
Expand All @@ -166,10 +158,8 @@ func validateNotSent(t *testing.T, ce *test.TestCloudEventsClient) {
}
}

func validateMetric(t *testing.T, reporter source.StatsReporter, want int) {
if mockReporter, ok := reporter.(*mockReporter); !ok {
t.Errorf("reporter is not a mockReporter")
} else if mockReporter.eventCount != want {
t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount)
func validateMetric(t *testing.T, reporter *rectesting.MockStatsReporter, want int) {
if err := reporter.ValidateEventCount(want); err != nil {
t.Error(err)
}
}
1 change: 1 addition & 0 deletions pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand Down
103 changes: 103 additions & 0 deletions pkg/adapter/v2/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/protocol/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/pkg/source"
)

type StatsReporterAdapter interface {
ReportCount(ctx context.Context, event cloudevents.Event, result cloudevents.Result) error
}

type statsReporterAdapter struct {
source.StatsReporter
}

func NewStatsReporterAdapter(reporter source.StatsReporter) StatsReporterAdapter {
return &statsReporterAdapter{StatsReporter: reporter}
}

func (c *statsReporterAdapter) ReportCount(ctx context.Context, event cloudevents.Event, result cloudevents.Result) error {
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
}

if cloudevents.IsACK(result) {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return fmt.Errorf("protocol.Result is not http.Result")
}

_ = c.ReportEventCount(reportArgs, res.StatusCode)
} else {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return result
}

if rErr := c.ReportEventCount(reportArgs, res.StatusCode); rErr != nil {
// metrics is not important enough to return an error if it is setup wrong.
// So combine reporter error with ce error if not nil.
if result != nil {
result = fmt.Errorf("%w\nmetrics reporter errror: %s", result, rErr)
}
}
}

return result
}

// Metric context

type MetricTag struct {
Name string
Namespace string
ResourceGroup string
}

type metricKey struct{}

// ContextWithMetricTag returns a copy of parent context in which the
// value associated with metric key is the supplied metric tag.
func ContextWithMetricTag(ctx context.Context, metric *MetricTag) context.Context {
return context.WithValue(ctx, metricKey{}, metric)
}

// MetricTagFromContext returns the metric tag stored in context.
// Returns nil if no metric tag is set in context, or if the stored value is
// not of correct type.
func MetricTagFromContext(ctx context.Context) *MetricTag {
if logger, ok := ctx.Value(metricKey{}).(*MetricTag); ok {
return logger
}
return &MetricTag{
Name: "unknown",
Namespace: "unknown",
ResourceGroup: "unknown",
}
}
48 changes: 48 additions & 0 deletions pkg/adapter/v2/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics

import (
"context"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
)

func TestStatsReporterAdapter(t *testing.T) {
stats := &rectesting.MockStatsReporter{}
reporter := NewStatsReporterAdapter(stats)

event := cloudevents.NewEvent()
event.SetID("abc-123")
event.SetSource("unit/test")
event.SetType("unit.type")

ctx := ContextWithMetricTag(context.Background(), &MetricTag{
Name: "test-name",
Namespace: "test-ns",
ResourceGroup: "test-rg",
})

if result := reporter.ReportCount(ctx, event, cloudevents.NewHTTPResult(200, "%w", cloudevents.ResultACK)); !cloudevents.IsACK(result) {
t.Errorf("unexpected result %v", result)
}

if err := stats.ValidateEventCount(1); err != nil {
t.Errorf("unexpected error %v", err)
}
}
Loading

0 comments on commit 6386f24

Please sign in to comment.