Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventing TLS: Add scheme label to metrics #7581

Merged
merged 19 commits into from
Jan 22, 2024
22 changes: 16 additions & 6 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,19 @@ func NewClient(cfg ClientConfig) (Client, error) {
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
oidcTokenProvider: cfg.TokenProvider,
scheme: "http",
}

if cfg.Env != nil {
client.audience = cfg.Env.GetAudience()
client.oidcServiceAccountName = cfg.Env.GetOIDCServiceAccountName()
sinkURI := cfg.Env.GetSink()
if sinkURI != "" {
parsedUrl, err := url.Parse(sinkURI)
if err == nil {
client.scheme = parsedUrl.Scheme
}
}
}

return client, nil
Expand All @@ -234,12 +242,12 @@ func setTimeOut(duration time.Duration) http.Option {
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler

ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
scheme string
oidcTokenProvider *auth.OIDCTokenProvider
audience *string
oidcServiceAccountName *types.NamespacedName
Expand Down Expand Up @@ -302,13 +310,15 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res
if c.reporter == nil {
return
}

tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
EventScheme: c.scheme,
}

var rres *http.RetriesResult
Expand Down
19 changes: 18 additions & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
requestType: "reply_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to reply", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -277,6 +283,12 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
requestType: "dls_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to dls", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -312,6 +324,12 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
requestType: "filter",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

subscriberURI := trigger.Status.SubscriberURI
if subscriberURI == nil {
// Record the event count.
Expand Down Expand Up @@ -342,7 +360,6 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {

additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down
27 changes: 15 additions & 12 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ var (
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
}

func init() {
Expand Down Expand Up @@ -116,19 +118,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
Expand Down Expand Up @@ -190,6 +192,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
requestScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -110,6 +111,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
broker.LabelUniqueName: "testpod",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
12 changes: 9 additions & 3 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in pkg/broker/ingress/ingress_handler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 9984713..577c308 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -257,16 +257,16 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } reporterArgs := &ReportArgs{ - ns: brokerNamespace, - broker: brokerName, - eventType: event.Type(), + ns: brokerNamespace, + broker: brokerName, + eventType: event.Type(), } - if request.TLS != nil { - reporterArgs.eventScheme = "https" - } else { - reporterArgs.eventScheme = "http" - } + if request.TLS != nil { + reporterArgs.eventScheme = "https" + } else { + reporterArgs.eventScheme = "http" + } statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker) if dispatchTime > kncloudevents.NoDuration {
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -257,11 +257,17 @@
}

reporterArgs := &ReportArgs{
ns: brokerNamespace,
broker: brokerName,
eventType: event.Type(),
ns: brokerNamespace,
broker: brokerName,
eventType: event.Type(),
}

if request.TLS != nil {
reporterArgs.eventScheme = "https"
} else {
reporterArgs.eventScheme = "http"
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
Expand Down
19 changes: 13 additions & 6 deletions pkg/broker/ingress/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ var (
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType)
eventSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
broker string
eventType string
ns string
broker string
eventType string
eventScheme string
}

func init() {
Expand All @@ -75,8 +77,10 @@ type StatsReporter interface {
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
var emptyContext = context.Background()
var (
_ StatsReporter = (*reporter)(nil)
emptyContext = context.Background()
)

// Reporter holds cached metric objects to report ingress metrics.
type reporter struct {
Expand All @@ -95,10 +99,12 @@ func NewStatsReporter(container, uniqueName string) StatsReporter {
func register() {
tagKeys := []tag.Key{
eventTypeKey,
eventSchemeKey,
responseCodeKey,
responseCodeClassKey,
broker.ContainerTagKey,
broker.UniqueTagKey}
broker.UniqueTagKey,
}

// Create view to see our measurements.
err := metrics.RegisterResourceView(
Expand Down Expand Up @@ -154,6 +160,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(eventTypeKey, args.eventType),
tag.Insert(eventSchemeKey, args.eventScheme),
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
}
8 changes: 5 additions & 3 deletions pkg/broker/ingress/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func TestStatsReporter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
eventScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -45,6 +46,7 @@ func TestStatsReporter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelUniqueName: "testpod",
broker.LabelContainerName: "testcontainer",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
6 changes: 6 additions & 0 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth

args.Ns = channel.Namespace

if request.TLS != nil {
args.EventScheme = "https"
} else {
args.EventScheme = "http"
}

event, err := http.NewEventFromHTTPRequest(request)
if err != nil {
r.logger.Warn("failed to extract event from request", zap.Error(err))
Expand Down
40 changes: 38 additions & 2 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*

Check failure on line 1 in pkg/channel/fanout/fanout_event_handler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 613e779..cfab386 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -267,7 +267,6 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace - if f.hasHttpSubs { reportArgs.EventScheme = "https" } else {
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -93,6 +93,8 @@
eventTypeHandler *eventtype.EventTypeAutoHandler
channelRef *duckv1.KReference
channelUID *types.UID
hasHttpSubs bool
hasHttpsSubs bool
}

// NewFanoutEventHandler creates a new fanout.EventHandler.
Expand All @@ -105,7 +107,6 @@
channelUID *types.UID,
eventDispatcher *kncloudevents.Dispatcher,
receiverOpts ...channel.EventReceiverOptions,

) (*FanoutEventHandler, error) {
handler := &FanoutEventHandler{
logger: logger,
Expand All @@ -119,6 +120,15 @@
}
handler.subscriptions = make([]Subscription, len(config.Subscriptions))
copy(handler.subscriptions, config.Subscriptions)

for _, sub := range handler.subscriptions {
if sub.Subscriber.URL != nil && sub.Subscriber.URL.Scheme == "https" {
handler.hasHttpsSubs = true
} else {
handler.hasHttpSubs = true
}
}
sadath-12 marked this conversation as resolved.
Show resolved Hide resolved

// The receiver function needs to point back at the handler itself, so set it up after
// initialization.
receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, receiverOpts...)
Expand Down Expand Up @@ -220,13 +230,24 @@
reportArgs.EventType = evnt.Type()
reportArgs.Ns = ref.Namespace

if f.hasHttpsSubs {
reportArgs.EventScheme = "https"
} else {
reportArgs.EventScheme = "http"
}
go func(e event.Event, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) {
// Run async dispatch with background context.
ctx = trace.NewContext(context.Background(), s)
h.Set(apis.KnNamespaceHeader, ref.Namespace)
// Any returned error is already logged in f.dispatch().
dispatchResultForFanout := f.dispatch(ctx, subs, e, h)
_ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args)
// If there are both http and https subscribers, we need to report the metrics for both of the type
// In this case we report http metrics because above we checked first for https and reported it so the left over metric to report is for http
if f.hasHttpSubs && f.hasHttpsSubs {
reportArgs.EventScheme = "http"
_ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args)
}
}(evnt, additionalHeaders, parentSpan, &f.reporter, &reportArgs)
return nil
}
Expand All @@ -245,9 +266,24 @@
reportArgs := channel.ReportArgs{}
reportArgs.EventType = event.Type()
reportArgs.Ns = ref.Namespace


if f.hasHttpSubs {
reportArgs.EventScheme = "https"
} else {
reportArgs.EventScheme = "http"
}

additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace)
dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders)
return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
err := ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
// If there are both http and https subscribers, we need to report the metrics for both of the type
// In this case we report http metrics because above we checked first for https and reported it so the left over metric to report is for http
if f.hasHttpSubs && f.hasHttpsSubs {
reportArgs.EventScheme = "http"
err = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
return err
}
}

Expand Down
Loading
Loading