Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate jobrunner to cloudevent v2 #2867

Merged
merged 7 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/adapter/ping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/robfig/cron"
"go.uber.org/zap"
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
)

type envConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/ping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestStart_ServeHTTP(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ce := adaptertest.NewTestClient()
ce := adaptertest.NewTestClient(nil)

a := &pingAdapter{
Schedule: tc.schedule,
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {

ce := adaptertest.NewTestClient()
ce := adaptertest.NewTestClient(nil)

a := &pingAdapter{
Data: "data",
Expand Down
78 changes: 9 additions & 69 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package adapter
import (
"context"
"errors"
"fmt"

"knative.dev/eventing/pkg/adapter/v2/metrics"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand Down Expand Up @@ -48,14 +49,14 @@ func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides
return &client{
ceClient: ceClient,
ceOverrides: ceOverrides,
reporter: reporter,
reporter: metrics.NewStatsReporterAdapter(reporter),
}, nil
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
reporter metrics.StatsReporterAdapter
}

var _ cloudevents.Client = (*client)(nil)
Expand All @@ -64,14 +65,17 @@ var _ cloudevents.Client = (*client)(nil)
func (c *client) Send(ctx context.Context, out event.Event) protocol.Result {
c.applyOverrides(ctx, &out)
res := c.ceClient.Send(ctx, out)
return c.reportCount(ctx, out, res)
return c.reporter.ReportCount(ctx, out, res)

}

// Request implements client.Request
func (c *client) Request(ctx context.Context, out event.Event) (*event.Event, protocol.Result) {
c.applyOverrides(ctx, &out)

resp, res := c.ceClient.Request(ctx, out)
return resp, c.reportCount(ctx, out, res)
return resp, c.reporter.ReportCount(ctx, out, res)

}

// StartReceiver implements client.StartReceiver
Expand All @@ -86,67 +90,3 @@ func (c *client) applyOverrides(ctx context.Context, event *cloudevents.Event) {
}
}
}

func (c *client) reportCount(ctx context.Context, event cloudevents.Event, result protocol.Result) error {
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
}

if cloudevents.IsACK(result) {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return fmt.Errorf("protocol.Result is not http.Result")
}

_ = c.reporter.ReportEventCount(reportArgs, res.StatusCode)
} else {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return result
}

if rErr := c.reporter.ReportEventCount(reportArgs, res.StatusCode); rErr != nil {
// metrics is not important enough to return an error if it is setup wrong.
// So combine reporter error with ce error if not nil.
if result != nil {
result = fmt.Errorf("%w\nmetrics reporter errror: %s", result, rErr)
}
}
}
return result
}

// Metric context

type MetricTag struct {
Name string
Namespace string
ResourceGroup string
}

type metricKey struct{}

// ContextWithMetricTag returns a copy of parent context in which the
// value associated with metric key is the supplied metric tag.
func ContextWithMetricTag(ctx context.Context, metric *MetricTag) context.Context {
return context.WithValue(ctx, metricKey{}, metric)
}

// MetricTagFromContext returns the metric tag stored in context.
// Returns nil if no metric tag is set in context, or if the stored value is
// not of correct type.
func MetricTagFromContext(ctx context.Context) *MetricTag {
if logger, ok := ctx.Value(metricKey{}).(*MetricTag); ok {
return logger
}
return &MetricTag{
Name: "unknown",
Namespace: "unknown",
ResourceGroup: "unknown",
}
}
32 changes: 11 additions & 21 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,15 @@ import (
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/source"

"knative.dev/eventing/pkg/adapter/v2/test"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type mockReporter struct {
eventCount int
}

var (
fakeMasterURL = "test-source"
)

func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error {
r.eventCount += 1
return nil
}

func TestNewCloudEventsClient_send(t *testing.T) {
testCases := map[string]struct {
ceOverrides *duckv1.CloudEventOverrides
Expand Down Expand Up @@ -70,7 +60,8 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, &mockReporter{})
reporter := &rectesting.MockStatsReporter{}
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, reporter)
if err != nil {
t.Fail()
}
Expand All @@ -87,7 +78,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
t.Fatal(err)
}
validateSent(t, innerClient, tc.event.Type())
validateMetric(t, got.reporter, 1)
validateMetric(t, reporter, 1)
} else {
validateNotSent(t, innerClient)
}
Expand Down Expand Up @@ -125,7 +116,8 @@ func TestNewCloudEventsClient_request(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, &mockReporter{})
reporter := &rectesting.MockStatsReporter{}
ceClient, err := NewCloudEventsClient(fakeMasterURL, tc.ceOverrides, reporter)
if err != nil {
t.Fail()
}
Expand All @@ -142,7 +134,7 @@ func TestNewCloudEventsClient_request(t *testing.T) {
t.Fatal(err)
}
validateSent(t, innerClient, tc.event.Type())
validateMetric(t, got.reporter, 1)
validateMetric(t, reporter, 1)
} else {
validateNotSent(t, innerClient)
}
Expand All @@ -166,10 +158,8 @@ func validateNotSent(t *testing.T, ce *test.TestCloudEventsClient) {
}
}

func validateMetric(t *testing.T, reporter source.StatsReporter, want int) {
if mockReporter, ok := reporter.(*mockReporter); !ok {
t.Errorf("reporter is not a mockReporter")
} else if mockReporter.eventCount != want {
t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount)
func validateMetric(t *testing.T, reporter *rectesting.MockStatsReporter, want int) {
if err := reporter.ValidateEventCount(want); err != nil {
t.Error(err)
}
}
1 change: 1 addition & 0 deletions pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand Down
103 changes: 103 additions & 0 deletions pkg/adapter/v2/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/protocol/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"knative.dev/pkg/source"
)

type StatsReporterAdapter interface {
ReportCount(ctx context.Context, event cloudevents.Event, result cloudevents.Result) error
}

type statsReporterAdapter struct {
source.StatsReporter
}

func NewStatsReporterAdapter(reporter source.StatsReporter) StatsReporterAdapter {
return &statsReporterAdapter{StatsReporter: reporter}
}

func (c *statsReporterAdapter) ReportCount(ctx context.Context, event cloudevents.Event, result cloudevents.Result) error {
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
}

if cloudevents.IsACK(result) {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return fmt.Errorf("protocol.Result is not http.Result")
}

_ = c.ReportEventCount(reportArgs, res.StatusCode)
} else {
var res *http.Result
if !cloudevents.ResultAs(result, &res) {
return result
}

if rErr := c.ReportEventCount(reportArgs, res.StatusCode); rErr != nil {
// metrics is not important enough to return an error if it is setup wrong.
// So combine reporter error with ce error if not nil.
if result != nil {
result = fmt.Errorf("%w\nmetrics reporter errror: %s", result, rErr)
}
}
}

return result
}

// Metric context

type MetricTag struct {
Name string
Namespace string
ResourceGroup string
}

type metricKey struct{}

// ContextWithMetricTag returns a copy of parent context in which the
// value associated with metric key is the supplied metric tag.
func ContextWithMetricTag(ctx context.Context, metric *MetricTag) context.Context {
return context.WithValue(ctx, metricKey{}, metric)
}

// MetricTagFromContext returns the metric tag stored in context.
// Returns nil if no metric tag is set in context, or if the stored value is
// not of correct type.
func MetricTagFromContext(ctx context.Context) *MetricTag {
if logger, ok := ctx.Value(metricKey{}).(*MetricTag); ok {
return logger
}
return &MetricTag{
Name: "unknown",
Namespace: "unknown",
ResourceGroup: "unknown",
}
}
48 changes: 48 additions & 0 deletions pkg/adapter/v2/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics

import (
"context"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
)

func TestStatsReporterAdapter(t *testing.T) {
stats := &rectesting.MockStatsReporter{}
reporter := NewStatsReporterAdapter(stats)

event := cloudevents.NewEvent()
event.SetID("abc-123")
event.SetSource("unit/test")
event.SetType("unit.type")

ctx := ContextWithMetricTag(context.Background(), &MetricTag{
Name: "test-name",
Namespace: "test-ns",
ResourceGroup: "test-rg",
})

if result := reporter.ReportCount(ctx, event, cloudevents.NewHTTPResult(200, "")); cloudevents.IsNACK(result) {
lionelvillard marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("unexpected result %v", result)
}

if err := stats.ValidateEventCount(1); err != nil {
t.Errorf("unexpected error %v", err)
}
}
Loading