Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-couple FluxMeters from Policy, make them global #241

Merged
merged 5 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ message PolicyWrapper {
message FluxMeterWrapper {
// Flux Meter
policy.language.v1.FluxMeter flux_meter = 1;
// Name of the Policy.
string policy_name = 2;
// Hash of the entire Policy spec.
string policy_hash = 3;
// Name of fluxmeter metric.
string fluxmeter_name = 4;
}
Expand Down
4 changes: 1 addition & 3 deletions api/aperture/flowcontrol/v1/flowcontrol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,5 @@ message LimiterDecision {

// FluxMeter describes detail for each FluxMeter.
message FluxMeter {
string policy_name = 1;
string policy_hash = 2;
string flux_meter_name = 3;
string flux_meter_name = 1;
}
4 changes: 0 additions & 4 deletions api/gen/openapiv2/aperture.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ definitions:
properties:
flux_meter_name:
type: string
policy_hash:
type: string
policy_name:
type: string
description: FluxMeter describes detail for each FluxMeter.
googlerpcStatus:
type: object
Expand Down

Large diffs are not rendered by default.

92 changes: 36 additions & 56 deletions api/gen/proto/go/aperture/flowcontrol/v1/flowcontrol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions libsonnet/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
gen-lib:
@echo Generating library
@rm -rfd 1.0/_gen/*
@rm 1.0/*.libsonnet
@rm -rfd 1.0/_gen/* || true
@rm 1.0/*.libsonnet || true
@python scripts/jsonnet-lib-gen.py --output-dir 1.0/ ../docs/gen/policies/gen.yaml
@tk fmt 1.0/
@git add 1.0/*
Expand Down
5 changes: 1 addition & 4 deletions pkg/otelcollector/metricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (p *metricsProcessor) addCheckResponseBasedLabels(attributes pcommon.Map, c
}
for _, fluxMeter := range checkResponse.FluxMeters {
rawValue := []string{
fmt.Sprintf("%s:%v", metrics.PolicyNameLabel, fluxMeter.GetPolicyName()),
fmt.Sprintf("%s:%v", metrics.FluxMeterNameLabel, fluxMeter.GetFluxMeterName()),
}
value := strings.Join(rawValue, ",")
Expand Down Expand Up @@ -266,14 +265,12 @@ func (p *metricsProcessor) updateMetricsForFluxMeters(
latency float64,
) {
fluxmeterHistogram := p.cfg.engine.GetFluxMeterHist(
fluxMeter.GetPolicyName(),
fluxMeter.GetFluxMeterName(),
statusCode,
decisionType,
)
if fluxmeterHistogram == nil {
log.Debug().Str(metrics.PolicyNameLabel, fluxMeter.GetPolicyName()).
Str(metrics.FluxMeterNameLabel, fluxMeter.GetFluxMeterName()).
log.Debug().Str(metrics.FluxMeterNameLabel, fluxMeter.GetFluxMeterName()).
Str(metrics.DecisionTypeLabel, decisionType.String()).
Str(metrics.StatusCodeLabel, statusCode).
Msg("Fluxmeter not found")
Expand Down
10 changes: 3 additions & 7 deletions pkg/otelcollector/metricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ var _ = Describe("Metrics Processor", func() {
},
FluxMeters: []*flowcontrolv1.FluxMeter{
{
PolicyName: "foo",
PolicyHash: "foo-hash",
FluxMeterName: "bar",
},
},
Expand All @@ -121,7 +119,7 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED",
otelcollector.DecisionErrorReasonLabel: "",
otelcollector.DecisionRejectReasonLabel: "",
otelcollector.FluxMetersLabel: []interface{}{"policy_name:foo,flux_meter_name:bar"},
otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"},
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
Expand Down Expand Up @@ -317,8 +315,6 @@ var _ = Describe("Metrics Processor", func() {
},
FluxMeters: []*flowcontrolv1.FluxMeter{
{
PolicyName: "foo",
PolicyHash: "foo-hash",
FluxMeterName: "bar",
},
},
Expand All @@ -337,7 +333,7 @@ var _ = Describe("Metrics Processor", func() {
otelcollector.DecisionTypeLabel: "DECISION_TYPE_REJECTED",
otelcollector.DecisionErrorReasonLabel: "",
otelcollector.DecisionRejectReasonLabel: "",
otelcollector.FluxMetersLabel: []interface{}{"policy_name:foo,flux_meter_name:bar"},
otelcollector.FluxMetersLabel: []interface{}{"flux_meter_name:bar"},
otelcollector.RateLimitersLabel: []interface{}{},
otelcollector.DroppingRateLimitersLabel: []interface{}{},
otelcollector.ConcurrencyLimitersLabel: []interface{}{"policy_name:foo,component_index:1,workload_index:0,policy_hash:foo-hash"},
Expand Down Expand Up @@ -587,7 +583,7 @@ func expectEngineCalls(engine *mocks.MockEngine, checkResponse *flowcontrolv1.Ch
expectedCalls := make([]*gomock.Call, len(checkResponse.FluxMeters))
for i, fm := range checkResponse.FluxMeters {
// TODO actually return some Histogram
expectedCalls[i] = engine.EXPECT().GetFluxMeterHist(fm.GetPolicyName(), fm.GetFluxMeterName(), "201", flowcontrolv1.DecisionType_DECISION_TYPE_REJECTED).Return(nil)
expectedCalls[i] = engine.EXPECT().GetFluxMeterHist(fm.GetFluxMeterName(), "201", flowcontrolv1.DecisionType_DECISION_TYPE_REJECTED).Return(nil)
}
gomock.InOrder(expectedCalls...)
}
4 changes: 2 additions & 2 deletions pkg/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func DataplaneComponentKey(agentGroupName, policyName string, componentIndex int
}

// FluxMeterKey returns the identifier for FluxMeter in etcd.
func FluxMeterKey(agentGroupName, policyName, fluxMeterName string) string {
return PolicyPrefix(agentGroupName, policyName) + "-flux_meter-" + fluxMeterName
func FluxMeterKey(agentGroupName, fluxMeterName string) string {
return AgentGroupPrefix(agentGroupName) + "-flux_meter-" + fluxMeterName
}

// ClassifierKey returns the identifier for a Classifier in etcd.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewFluxMeterOptions(
agentGroup := selectorProto.GetAgentGroup()

etcdPath := path.Join(paths.FluxMeterConfigPath,
paths.FluxMeterKey(agentGroup, policyBaseAPI.GetPolicyName(), name))
paths.FluxMeterKey(agentGroup, name))
configSync := &fluxMeterConfigSync{
fluxMeterProto: fluxMeterProto,
policyBaseAPI: policyBaseAPI,
Expand All @@ -61,8 +61,6 @@ func (configSync *fluxMeterConfigSync) doSync(etcdClient *etcdclient.Client, lif
wrapper := &configv1.FluxMeterWrapper{
FluxmeterName: configSync.fluxmeterName,
FluxMeter: configSync.fluxMeterProto,
PolicyName: configSync.policyBaseAPI.GetPolicyName(),
PolicyHash: configSync.policyBaseAPI.GetPolicyHash(),
}
dat, err := proto.Marshal(wrapper)
if err != nil {
Expand Down
15 changes: 4 additions & 11 deletions pkg/policies/dataplane/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

selectorv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/common/selector/v1"
flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/v1"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/multimatcher"
"github.com/fluxninja/aperture/pkg/panichandler"
"github.com/fluxninja/aperture/pkg/policies/dataplane/iface"
Expand Down Expand Up @@ -71,8 +70,6 @@ func (e *Engine) ProcessRequest(controlPoint selectors.ControlPoint, serviceIDs
fluxMeterProtos := make([]*flowcontrolv1.FluxMeter, len(fluxMeters))
for i, fluxMeter := range fluxMeters {
fluxMeterProtos[i] = &flowcontrolv1.FluxMeter{
PolicyName: fluxMeter.GetPolicyName(),
PolicyHash: fluxMeter.GetPolicyHash(),
FluxMeterName: fluxMeter.GetFluxMeterName(),
}
}
Expand Down Expand Up @@ -169,7 +166,7 @@ func (e *Engine) UnregisterConcurrencyLimiter(cl iface.Limiter) error {
return e.unregister("ConcurrencyLimiter:"+cl.GetLimiterID().String(), selectorProto)
}

// RegisterFluxMeter adds fluxmeter to histogram map.
// RegisterFluxMeter adds fluxmeter to histogram map and multimatcher.
func (e *Engine) RegisterFluxMeter(fm iface.FluxMeter) error {
// Save the histogram in fluxMeterHists indexed by metric id
e.fluxMeterMapMutex.Lock()
Expand All @@ -193,7 +190,7 @@ func (e *Engine) RegisterFluxMeter(fm iface.FluxMeter) error {
return e.register("FluxMeter:"+fm.GetFluxMeterID().String(), selectorProto, fluxMeterMatchedCB)
}

// UnregisterFluxMeter removes fluxmeter from histogram map.
// UnregisterFluxMeter removes fluxmeter from histogram map and multimatcher.
func (e *Engine) UnregisterFluxMeter(fm iface.FluxMeter) error {
// Remove the histogram from fluxMeterHists indexed by metric id
e.fluxMeterMapMutex.Lock()
Expand All @@ -206,11 +203,10 @@ func (e *Engine) UnregisterFluxMeter(fm iface.FluxMeter) error {
}

// GetFluxMeterHist Lookup function for getting histogram.
func (e *Engine) GetFluxMeterHist(policyName, fluxMeterName, statusCode string, decisionType flowcontrolv1.DecisionType) prometheus.Observer {
func (e *Engine) GetFluxMeterHist(fluxMeterName, statusCode string, decisionType flowcontrolv1.DecisionType) prometheus.Observer {
e.fluxMeterMapMutex.RLock()
defer e.fluxMeterMapMutex.RUnlock()
fmID := iface.FluxMeterID{
PolicyName: policyName,
FluxMeterName: fluxMeterName,
}
fluxMeter := e.fluxMetersMap[fmID]
Expand Down Expand Up @@ -279,7 +275,6 @@ func (e *Engine) register(key string, selectorProto *selectorv1.Selector, matche

selector, err := selectors.FromProto(selectorProto)
if err != nil {
log.Warn().Err(err).Msg("Failed to parse selector")
return fmt.Errorf("failed to parse selector: %v", err)
}

Expand All @@ -302,15 +297,13 @@ func (e *Engine) unregister(key string, selectorProto *selectorv1.Selector) erro

selector, err := selectors.FromProto(selectorProto)
if err != nil {
log.Warn().Err(err).Msg("Failed to parse selector")
return fmt.Errorf("failed to parse selector: %v", err)
}

// check if multi matcher exists for this control point id
mm, ok := e.multiMatchers[selector.ControlPointID]
if !ok {
log.Warn().Msg("Unable to unregister, multi matcher not found for control point id")
return nil
return fmt.Errorf("unable to unregister, multi matcher not found for control point id: %v", selector.ControlPointID)
}
err = mm.RemoveEntry(key)
if err != nil {
Expand Down
Loading