From ebcee5eb6697faa1185adb0a6eea233125d09a81 Mon Sep 17 00:00:00 2001 From: Filip Chmielewski Date: Wed, 7 Sep 2022 03:17:35 +0200 Subject: [PATCH] Update telemetry schema with workflows Closes: GH-267 --- pkg/otelcollector/consts.go | 4 ++ .../metricsprocessor/processor.go | 20 ++++-- .../metricsprocessor/processor_test.go | 69 ++++++++++++++----- 3 files changed, 72 insertions(+), 21 deletions(-) diff --git a/pkg/otelcollector/consts.go b/pkg/otelcollector/consts.go index 68c104c17e..b0e54b316f 100644 --- a/pkg/otelcollector/consts.go +++ b/pkg/otelcollector/consts.go @@ -63,6 +63,10 @@ const ( ConcurrencyLimitersLabel = "concurrency_limiters" // DroppingConcurrencyLimitersLabel describes rate limiters dropping the traffic. DroppingConcurrencyLimitersLabel = "dropping_concurrency_limiters" + // WorkloadsLabel describes workloads matched to the traffic. + WorkloadsLabel = "workloads" + // DroppingWorkloadsLabel describes workloads dropping the traffic. + DroppingWorkloadsLabel = "dropping_workloads" // FluxMetersLabel describes flux meters matched to the traffic. FluxMetersLabel = "flux_meters" // FlowLabelKeysLabel describes keys of flow labels matched to the traffic. diff --git a/pkg/otelcollector/metricsprocessor/processor.go b/pkg/otelcollector/metricsprocessor/processor.go index 36b9760c71..2a352b5e6e 100644 --- a/pkg/otelcollector/metricsprocessor/processor.go +++ b/pkg/otelcollector/metricsprocessor/processor.go @@ -149,6 +149,8 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c otelcollector.DroppingRateLimitersLabel: pcommon.NewValueSlice(), otelcollector.ConcurrencyLimitersLabel: pcommon.NewValueSlice(), otelcollector.DroppingConcurrencyLimitersLabel: pcommon.NewValueSlice(), + otelcollector.WorkloadsLabel: pcommon.NewValueSlice(), + otelcollector.DroppingWorkloadsLabel: pcommon.NewValueSlice(), otelcollector.FluxMetersLabel: pcommon.NewValueSlice(), otelcollector.FlowLabelKeysLabel: pcommon.NewValueSlice(), otelcollector.DecisionTypeLabel: pcommon.NewValueString(checkResponse.DecisionType.String()), @@ -176,7 +178,6 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c rawValue := []string{ fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()), fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()), - fmt.Sprintf("%s:%v", metrics.WorkloadIndexLabel, cl.GetWorkloadIndex()), fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()), } value := strings.Join(rawValue, ",") @@ -184,13 +185,22 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c if decision.Dropped { labels[otelcollector.DroppingConcurrencyLimitersLabel].SliceVal().AppendEmpty().SetStringVal(value) } + + workloadsRawValue := []string{ + fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()), + fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()), + fmt.Sprintf("%s:%v", metrics.WorkloadIndexLabel, cl.GetWorkloadIndex()), + fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()), + } + value = strings.Join(workloadsRawValue, ",") + labels[otelcollector.WorkloadsLabel].SliceVal().AppendEmpty().SetStringVal(value) + if decision.Dropped { + labels[otelcollector.DroppingWorkloadsLabel].SliceVal().AppendEmpty().SetStringVal(value) + } } } for _, fluxMeter := range checkResponse.FluxMeters { - rawValue := []string{ - fmt.Sprintf("%s:%v", metrics.FluxMeterNameLabel, fluxMeter.GetFluxMeterName()), - } - value := strings.Join(rawValue, ",") + value := fluxMeter.GetFluxMeterName() labels[otelcollector.FluxMetersLabel].SliceVal().AppendEmpty().SetStringVal(value) } diff --git a/pkg/otelcollector/metricsprocessor/processor_test.go b/pkg/otelcollector/metricsprocessor/processor_test.go index 433444de52..423dc2fcf9 100644 --- a/pkg/otelcollector/metricsprocessor/processor_test.go +++ b/pkg/otelcollector/metricsprocessor/processor_test.go @@ -98,6 +98,9 @@ var _ = Describe("Metrics Processor", func() { FluxMeterName: "bar", }, }, + FlowLabelKeys: []string{ + "someLabel", + }, }, &flowcontrolv1.AuthzResponse{ Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR, @@ -117,11 +120,14 @@ var _ = Describe("Metrics Processor", func() { otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED", otelcollector.DecisionErrorReasonLabel: "", otelcollector.DecisionRejectReasonLabel: "", - otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"}, + otelcollector.FluxMetersLabel: []interface{}{"bar"}, + otelcollector.FlowLabelKeysLabel: []interface{}{"someLabel"}, otelcollector.RateLimitersLabel: []interface{}{}, otelcollector.DroppingRateLimitersLabel: []interface{}{}, - otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, }, ), @@ -145,7 +151,8 @@ var _ = Describe("Metrics Processor", func() { }, }, }, - FluxMeters: []*flowcontrolv1.FluxMeter{}, + FluxMeters: []*flowcontrolv1.FluxMeter{}, + FlowLabelKeys: []string{}, }, &flowcontrolv1.AuthzResponse{ Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR, @@ -166,8 +173,10 @@ var _ = Describe("Metrics Processor", func() { otelcollector.DecisionRejectReasonLabel: "REJECT_REASON_RATE_LIMITED", otelcollector.RateLimitersLabel: []interface{}{}, otelcollector.DroppingRateLimitersLabel: []interface{}{}, - otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, }, ), @@ -213,7 +222,8 @@ var _ = Describe("Metrics Processor", func() { }, }, }, - FluxMeters: []*flowcontrolv1.FluxMeter{}, + FluxMeters: []*flowcontrolv1.FluxMeter{}, + FlowLabelKeys: []string{}, }, &flowcontrolv1.AuthzResponse{ Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR, @@ -247,11 +257,20 @@ var _ = Describe("Metrics Processor", func() { otelcollector.RateLimitersLabel: []interface{}{}, otelcollector.DroppingRateLimitersLabel: []interface{}{}, otelcollector.ConcurrencyLimitersLabel: []interface{}{ + "policy_name:foo,component_index:1,policy_hash:foo-hash", + "policy_name:fizz,component_index:1,policy_hash:fizz-hash", + "policy_name:fizz,component_index:2,policy_hash:fizz-hash", + }, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{ + "policy_name:foo,component_index:1,policy_hash:foo-hash", + "policy_name:fizz,component_index:1,policy_hash:fizz-hash", + }, + otelcollector.WorkloadsLabel: []interface{}{ "policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash", "policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash", "policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash", }, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{ + otelcollector.DroppingWorkloadsLabel: []interface{}{ "policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash", "policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash", }, @@ -314,6 +333,9 @@ var _ = Describe("Metrics Processor", func() { FluxMeterName: "bar", }, }, + FlowLabelKeys: []string{ + "someLabel", + }, }, nil, `# HELP workload_latency_ms Latency histogram of workload @@ -329,11 +351,14 @@ var _ = Describe("Metrics Processor", func() { otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED", otelcollector.DecisionErrorReasonLabel: "", otelcollector.DecisionRejectReasonLabel: "", - otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"}, + otelcollector.FluxMetersLabel: []interface{}{"bar"}, + otelcollector.FlowLabelKeysLabel: []interface{}{"someLabel"}, otelcollector.RateLimitersLabel: []interface{}{}, otelcollector.DroppingRateLimitersLabel: []interface{}{}, - otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, }, ), @@ -357,7 +382,8 @@ var _ = Describe("Metrics Processor", func() { }, }, }, - FluxMeters: []*flowcontrolv1.FluxMeter{}, + FluxMeters: []*flowcontrolv1.FluxMeter{}, + FlowLabelKeys: []string{}, }, nil, `# HELP workload_latency_ms Latency histogram of workload @@ -374,8 +400,10 @@ var _ = Describe("Metrics Processor", func() { otelcollector.DecisionRejectReasonLabel: "REJECT_REASON_RATE_LIMITED", otelcollector.RateLimitersLabel: []interface{}{}, otelcollector.DroppingRateLimitersLabel: []interface{}{}, - otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"}, + otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, + otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"}, }, ), @@ -423,7 +451,8 @@ var _ = Describe("Metrics Processor", func() { }, }, }, - FluxMeters: []*flowcontrolv1.FluxMeter{}, + FluxMeters: []*flowcontrolv1.FluxMeter{}, + FlowLabelKeys: []string{}, }, nil, `# HELP workload_latency_ms Latency histogram of workload @@ -457,10 +486,18 @@ var _ = Describe("Metrics Processor", func() { "policy_name:foo,component_index:1,policy_hash:foo-hash", }, otelcollector.ConcurrencyLimitersLabel: []interface{}{ + "policy_name:fizz,component_index:1,policy_hash:fizz-hash", + "policy_name:fizz,component_index:2,policy_hash:fizz-hash", + }, + otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{ + "policy_name:fizz,component_index:1,policy_hash:fizz-hash", + "policy_name:fizz,component_index:2,policy_hash:fizz-hash", + }, + otelcollector.WorkloadsLabel: []interface{}{ "policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash", "policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash", }, - otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{ + otelcollector.DroppingWorkloadsLabel: []interface{}{ "policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash", "policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash", },