Skip to content

Commit

Permalink
Eventing TLS: Add scheme label to metrics (#7581)
Browse files Browse the repository at this point in the history
* Eventing TLS: Add scheme label to metrics

Signed-off-by: sadath-12 <[email protected]>

* setting the scheme

Signed-off-by: sadath-12 <[email protected]>

* fix scheme

Signed-off-by: sadath-12 <[email protected]>

* fix minors

Signed-off-by: sadath-12 <[email protected]>

* vendor undo

Signed-off-by: sadath-12 <[email protected]>

* minorfix

Signed-off-by: sadath-12 <[email protected]>

* fix scheme

Signed-off-by: sadath-12 <[email protected]>

* scheme fix

Signed-off-by: sadath-12 <[email protected]>

* fixscheme

Signed-off-by: sadath-12 <[email protected]>

* scheme fix

Signed-off-by: sadath-12 <[email protected]>

* null pointer fix

Signed-off-by: sadath-12 <[email protected]>

* lintfix

Signed-off-by: sadath-12 <[email protected]>

* replace scheme place

Signed-off-by: sadath-12 <[email protected]>

* minor advances

Signed-off-by: sadath-12 <[email protected]>

* minoradvc

Signed-off-by: sadath-12 <[email protected]>

* minor adv

Signed-off-by: sadath-12 <[email protected]>

* restructure subscription handling

Signed-off-by: sadath-12 <[email protected]>

* lintfix

Signed-off-by: sadath-12 <[email protected]>

* lintfix

Signed-off-by: sadath-12 <[email protected]>

---------

Signed-off-by: sadath-12 <[email protected]>
  • Loading branch information
sadath-12 authored Jan 22, 2024
1 parent 8d6c6e4 commit 44ff98b
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 46 deletions.
22 changes: 16 additions & 6 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,19 @@ func NewClient(cfg ClientConfig) (Client, error) {
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
oidcTokenProvider: cfg.TokenProvider,
scheme: "http",
}

if cfg.Env != nil {
client.audience = cfg.Env.GetAudience()
client.oidcServiceAccountName = cfg.Env.GetOIDCServiceAccountName()
sinkURI := cfg.Env.GetSink()
if sinkURI != "" {
parsedUrl, err := url.Parse(sinkURI)
if err == nil {
client.scheme = parsedUrl.Scheme
}
}
}

return client, nil
Expand All @@ -234,12 +242,12 @@ func setTimeOut(duration time.Duration) http.Option {
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler

ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
scheme string
oidcTokenProvider *auth.OIDCTokenProvider
audience *string
oidcServiceAccountName *types.NamespacedName
Expand Down Expand Up @@ -302,13 +310,15 @@ func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, res
if c.reporter == nil {
return
}

tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
EventSource: event.Source(),
EventType: event.Type(),
Name: tags.Name,
ResourceGroup: tags.ResourceGroup,
EventScheme: c.scheme,
}

var rres *http.RetriesResult
Expand Down
19 changes: 18 additions & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve
requestType: "reply_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to reply", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -277,6 +283,12 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event
requestType: "dls_forward",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

h.logger.Info("sending to dls", zap.Any("target", target))

// since the broker-filter acts here like a proxy, we don't filter headers
Expand Down Expand Up @@ -312,6 +324,12 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
requestType: "filter",
}

if request.TLS != nil {
reportArgs.requestScheme = "https"
} else {
reportArgs.requestScheme = "http"
}

subscriberURI := trigger.Status.SubscriberURI
if subscriberURI == nil {
// Record the event count.
Expand Down Expand Up @@ -342,7 +360,6 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {

additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down
27 changes: 15 additions & 12 deletions pkg/broker/filter/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ var (
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
}

func init() {
Expand Down Expand Up @@ -116,19 +118,19 @@ func register() {
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: processingTimeInMsecM.Description(),
Measure: processingTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, broker.UniqueTagKey, broker.ContainerTagKey},
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
)
if err != nil {
Expand Down Expand Up @@ -190,6 +192,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/broker/filter/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
requestScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -110,6 +111,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
broker.LabelUniqueName: "testpod",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
6 changes: 6 additions & 0 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
eventType: event.Type(),
}

if request.TLS != nil {
reporterArgs.eventScheme = "https"
} else {
reporterArgs.eventScheme = "http"
}

statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, broker)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
Expand Down
19 changes: 13 additions & 6 deletions pkg/broker/ingress/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ var (
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
eventTypeKey = tag.MustNewKey(eventingmetrics.LabelEventType)
eventSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)
)

type ReportArgs struct {
ns string
broker string
eventType string
ns string
broker string
eventType string
eventScheme string
}

func init() {
Expand All @@ -75,8 +77,10 @@ type StatsReporter interface {
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
var emptyContext = context.Background()
var (
_ StatsReporter = (*reporter)(nil)
emptyContext = context.Background()
)

// Reporter holds cached metric objects to report ingress metrics.
type reporter struct {
Expand All @@ -95,10 +99,12 @@ func NewStatsReporter(container, uniqueName string) StatsReporter {
func register() {
tagKeys := []tag.Key{
eventTypeKey,
eventSchemeKey,
responseCodeKey,
responseCodeClassKey,
broker.ContainerTagKey,
broker.UniqueTagKey}
broker.UniqueTagKey,
}

// Create view to see our measurements.
err := metrics.RegisterResourceView(
Expand Down Expand Up @@ -154,6 +160,7 @@ func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Cont
tag.Insert(broker.ContainerTagKey, r.container),
tag.Insert(broker.UniqueTagKey, r.uniqueName),
tag.Insert(eventTypeKey, args.eventType),
tag.Insert(eventSchemeKey, args.eventScheme),
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
}
8 changes: 5 additions & 3 deletions pkg/broker/ingress/stats_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func TestStatsReporter(t *testing.T) {
setup()

args := &ReportArgs{
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
ns: "testns",
broker: "testbroker",
eventType: "testeventtype",
eventScheme: "http",
}

r := NewStatsReporter("testcontainer", "testpod")
Expand All @@ -45,6 +46,7 @@ func TestStatsReporter(t *testing.T) {
metrics.LabelResponseCodeClass: "2xx",
broker.LabelUniqueName: "testpod",
broker.LabelContainerName: "testcontainer",
metrics.LabelEventScheme: "http",
}

resource := resource.Resource{
Expand Down
6 changes: 6 additions & 0 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth

args.Ns = channel.Namespace

if request.TLS != nil {
args.EventScheme = "https"
} else {
args.EventScheme = "http"
}

event, err := http.NewEventFromHTTPRequest(request)
if err != nil {
r.logger.Warn("failed to extract event from request", zap.Error(err))
Expand Down
Loading

0 comments on commit 44ff98b

Please sign in to comment.