Skip to content

Commit

Permalink
Update telemetry schema with workflows
Browse files Browse the repository at this point in the history
Closes: GH-267
  • Loading branch information
IridiumOxide committed Sep 7, 2022
1 parent ede3271 commit ebcee5e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 21 deletions.
4 changes: 4 additions & 0 deletions pkg/otelcollector/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ const (
ConcurrencyLimitersLabel = "concurrency_limiters"
// DroppingConcurrencyLimitersLabel describes rate limiters dropping the traffic.
DroppingConcurrencyLimitersLabel = "dropping_concurrency_limiters"
// WorkloadsLabel describes workloads matched to the traffic.
WorkloadsLabel = "workloads"
// DroppingWorkloadsLabel describes workloads dropping the traffic.
DroppingWorkloadsLabel = "dropping_workloads"
// FluxMetersLabel describes flux meters matched to the traffic.
FluxMetersLabel = "flux_meters"
// FlowLabelKeysLabel describes keys of flow labels matched to the traffic.
Expand Down
20 changes: 15 additions & 5 deletions pkg/otelcollector/metricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c
otelcollector.DroppingRateLimitersLabel: pcommon.NewValueSlice(),
otelcollector.ConcurrencyLimitersLabel: pcommon.NewValueSlice(),
otelcollector.DroppingConcurrencyLimitersLabel: pcommon.NewValueSlice(),
otelcollector.WorkloadsLabel: pcommon.NewValueSlice(),
otelcollector.DroppingWorkloadsLabel: pcommon.NewValueSlice(),
otelcollector.FluxMetersLabel: pcommon.NewValueSlice(),
otelcollector.FlowLabelKeysLabel: pcommon.NewValueSlice(),
otelcollector.DecisionTypeLabel: pcommon.NewValueString(checkResponse.DecisionType.String()),
Expand Down Expand Up @@ -176,21 +178,29 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()),
fmt.Sprintf("%s:%v", metrics.WorkloadIndexLabel, cl.GetWorkloadIndex()),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()),
}
value := strings.Join(rawValue, ",")
labels[otelcollector.ConcurrencyLimitersLabel].SliceVal().AppendEmpty().SetStringVal(value)
if decision.Dropped {
labels[otelcollector.DroppingConcurrencyLimitersLabel].SliceVal().AppendEmpty().SetStringVal(value)
}

workloadsRawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, decision.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.ComponentIndexLabel, decision.GetComponentIndex()),
fmt.Sprintf("%s:%v", metrics.WorkloadIndexLabel, cl.GetWorkloadIndex()),
fmt.Sprintf("%s:%v", metrics.PolicyHashLabel, decision.GetPolicyHash()),
}
value = strings.Join(workloadsRawValue, ",")
labels[otelcollector.WorkloadsLabel].SliceVal().AppendEmpty().SetStringVal(value)
if decision.Dropped {
labels[otelcollector.DroppingWorkloadsLabel].SliceVal().AppendEmpty().SetStringVal(value)
}
}
}
for _, fluxMeter := range checkResponse.FluxMeters {
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.FluxMeterNameLabel, fluxMeter.GetFluxMeterName()),
}
value := strings.Join(rawValue, ",")
value := fluxMeter.GetFluxMeterName()
labels[otelcollector.FluxMetersLabel].SliceVal().AppendEmpty().SetStringVal(value)
}

Expand Down
69 changes: 53 additions & 16 deletions pkg/otelcollector/metricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ var _ = Describe("Metrics Processor", func() {
FluxMeterName: "bar",
},
},
FlowLabelKeys: []string{
"someLabel",
},
},
&flowcontrolv1.AuthzResponse{
Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR,
Expand All @@ -117,11 +120,14 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED",
otelcollector.DecisionErrorReasonLabel: "",
otelcollector.DecisionRejectReasonLabel: "",
otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"},
otelcollector.FluxMetersLabel: []interface{}{"bar"},
otelcollector.FlowLabelKeysLabel: []interface{}{"someLabel"},
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
},
),

Expand All @@ -145,7 +151,8 @@ var _ = Describe("Metrics Processor", func() {
},
},
},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FlowLabelKeys: []string{},
},
&flowcontrolv1.AuthzResponse{
Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR,
Expand All @@ -166,8 +173,10 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionRejectReasonLabel: "REJECT_REASON_RATE_LIMITED",
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
},
),

Expand Down Expand Up @@ -213,7 +222,8 @@ var _ = Describe("Metrics Processor", func() {
},
},
},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FlowLabelKeys: []string{},
},
&flowcontrolv1.AuthzResponse{
Status: flowcontrolv1.AuthzResponse_STATUS_NO_ERROR,
Expand Down Expand Up @@ -247,11 +257,20 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{
"policy_name:foo,component_index:1,policy_hash:foo-hash",
"policy_name:fizz,component_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,policy_hash:fizz-hash",
},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{
"policy_name:foo,component_index:1,policy_hash:foo-hash",
"policy_name:fizz,component_index:1,policy_hash:fizz-hash",
},
otelcollector.WorkloadsLabel: []interface{}{
"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash",
"policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash",
},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{
otelcollector.DroppingWorkloadsLabel: []interface{}{
"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash",
"policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash",
},
Expand Down Expand Up @@ -314,6 +333,9 @@ var _ = Describe("Metrics Processor", func() {
FluxMeterName: "bar",
},
},
FlowLabelKeys: []string{
"someLabel",
},
},
nil,
`# HELP workload_latency_ms Latency histogram of workload
Expand All @@ -329,11 +351,14 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED",
otelcollector.DecisionErrorReasonLabel: "",
otelcollector.DecisionRejectReasonLabel: "",
otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"},
otelcollector.FluxMetersLabel: []interface{}{"bar"},
otelcollector.FlowLabelKeysLabel: []interface{}{"someLabel"},
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
},
),

Expand All @@ -357,7 +382,8 @@ var _ = Describe("Metrics Processor", func() {
},
},
},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FlowLabelKeys: []string{},
},
nil,
`# HELP workload_latency_ms Latency histogram of workload
Expand All @@ -374,8 +400,10 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionRejectReasonLabel: "REJECT_REASON_RATE_LIMITED",
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,policy_hash:foo-hash"},
otelcollector.WorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
otelcollector.DroppingWorkloadsLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
},
),

Expand Down Expand Up @@ -423,7 +451,8 @@ var _ = Describe("Metrics Processor", func() {
},
},
},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FluxMeters: []*flowcontrolv1.FluxMeter{},
FlowLabelKeys: []string{},
},
nil,
`# HELP workload_latency_ms Latency histogram of workload
Expand Down Expand Up @@ -457,10 +486,18 @@ var _ = Describe("Metrics Processor", func() {
"policy_name:foo,component_index:1,policy_hash:foo-hash",
},
otelcollector.ConcurrencyLimitersLabel: []interface{}{
"policy_name:fizz,component_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,policy_hash:fizz-hash",
},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{
"policy_name:fizz,component_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,policy_hash:fizz-hash",
},
otelcollector.WorkloadsLabel: []interface{}{
"policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash",
},
otelcollector.DroppingConcurrencyLimitersLabel: []interface{}{
otelcollector.DroppingWorkloadsLabel: []interface{}{
"policy_name:fizz,component_index:1,workload_index:1,policy_hash:fizz-hash",
"policy_name:fizz,component_index:2,workload_index:2,policy_hash:fizz-hash",
},
Expand Down

0 comments on commit ebcee5e

Please sign in to comment.