Skip to content

Commit

Permalink
Fixe ratelimiter decision sync (#349)
Browse files Browse the repository at this point in the history
Co-authored-by: Harjot Gill <[email protected]>
  • Loading branch information
hasit and harjotgill committed Sep 9, 2022
1 parent 7b7c185 commit eee6f64
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 93 deletions.
4 changes: 2 additions & 2 deletions api/aperture/policy/language/v1/policy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,10 @@ message RateLimiter {
extensions: {
key: "x-go-default"
value: {
bool_value: true
bool_value: false
}
}
}]; // @gotags: default:"true"
}]; // @gotags: default:"false"

// Number of times to lazy sync within the _limit\_reset\_interval_.
uint32 num_sync = 2 [(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_field) = {
Expand Down
2 changes: 1 addition & 1 deletion api/gen/openapiv2/aperture.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ definitions:
type: boolean
description: TODO document what happens when lazy sync is disabled
title: Enables lazy sync
x-go-default: true
x-go-default: false
num_sync:
type: integer
format: int64
Expand Down
4 changes: 2 additions & 2 deletions api/gen/proto/go/aperture/policy/language/v1/policy.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/gen/policies/gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ definitions:
description: TODO document what happens when lazy sync is disabled
type: boolean
title: Enables lazy sync
x-go-default: true
x-go-default: false
x-order: 0
num_sync:
description: Number of times to lazy sync within the _limit\_reset\_interval_.
Expand Down
2 changes: 1 addition & 1 deletion docs/gen/policies/policy.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ eg. {any: {of: [expr1, expr2]}}.
<dt>enabled</dt>
<dd>

(bool, default: `true`) Enables lazy sync
(bool) Enables lazy sync

TODO document what happens when lazy sync is disabled

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/Henry-Sarabia/sliceconv v1.0.2
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/buger/jsonparser v1.1.1
github.com/buraksezer/olric v0.0.0
github.com/buraksezer/olric v0.4.5
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.1.3
github.com/clarketm/json v1.17.1
Expand Down
43 changes: 19 additions & 24 deletions pkg/config/koanf-unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,51 +411,45 @@ func jsonOverrideHookFunc(replaceSlice bool) mapstructure.DecodeHookFunc {
}

func merge(a, b map[string]interface{}, replaceSlice bool) {
log.Trace().Bool("ReplaceSlice", replaceSlice).Interface("a", a).Interface("b", b).Msg("MERGE")
for key, val := range a {
// Does the key exist in the target map?
// If no, add it and move on.
bVal, ok := b[key]
if !ok {
log.Trace().Str("key", key).Msg("override")
log.Trace().Str("key", key).Interface("Value", val).Msg("override")
b[key] = val
continue
}

if !replaceSlice {
// merge slice B into slice A values
if sliceA, ok := val.([]interface{}); ok {
if sliceB, ok := val.([]interface{}); ok {
sliceMapA := []map[string]interface{}{}
sliceMapB := []map[string]interface{}{}
for _, v := range sliceA {
if sliceB, ok := bVal.([]interface{}); ok {
// iterate slices and merge their map[string]interface{}
for i, v := range sliceA {
if m, ok := v.(map[string]interface{}); ok {
sliceMapA = append(sliceMapA, m)
}
}
for _, v := range sliceB {
if m, ok := v.(map[string]interface{}); ok {
sliceMapB = append(sliceMapB, m)
}
}
for i, sliceValA := range sliceMapA {
if i < len(sliceB) {
log.Trace().Str("key", key).Int("index", i).Msg("merging")
// merge
merge(sliceValA, sliceMapB[i], replaceSlice)
} else {
log.Trace().Str("key", key).Int("index", i).Msg("overriding")
// append
sliceMapB = append(sliceMapB, sliceValA)
if len(sliceB) > i {
if m2, ok := sliceB[i].(map[string]interface{}); ok {
merge(m, m2, replaceSlice)
} else {
log.Warn().Str("key", key).Interface("Value", val).Msg("unable to merge slice")
}
} else {
sliceB = append(sliceB, m)
}
}
}
b[key] = sliceB
continue
}
}
}

// If the incoming val is not a map, do a direct merge.
if _, ok := val.(map[string]interface{}); !ok {
log.Trace().Str("key", key).Msg("override")
b[key] = val
log.Trace().Str("key", key).Interface("Value", val).Interface("b[key]", b[key]).Msg("override")
continue
}

Expand All @@ -465,10 +459,11 @@ func merge(a, b map[string]interface{}, replaceSlice bool) {
log.Trace().Str("key", key).Msg("merge")
merge(val.(map[string]interface{}), v, replaceSlice)
default:
log.Trace().Str("key", key).Msg("override")
log.Trace().Str("key", key).Interface("Value", val).Msg("override")
b[key] = val
}
}
log.Trace().Bool("ReplaceSlice", replaceSlice).Interface("a", a).Interface("b", b).Msg("MERGE DONE")
}

var json = jsoniter.Config{
Expand Down
2 changes: 1 addition & 1 deletion pkg/flowcontrol/common/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (h *Handler) CheckWithValues(
serviceIDs []services.ServiceID,
labels selectors.Labels,
) *flowcontrolv1.CheckResponse {
log.Trace().Msg("FlowControl.CheckWithValues()")
log.Trace().Interface("labels", labels.ToPlainMap()).Interface("serviceIDs", serviceIDs).Str("controlPoint", controlPoint.String()).Msg("FlowControl.CheckWithValues()")

checkResponse := h.engine.ProcessRequest(controlPoint, serviceIDs, labels)
h.metrics.CheckResponse(checkResponse.DecisionType, checkResponse.GetDecisionReason())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"errors"
"path"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/fx"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"

configv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/common/config/v1"
policydecisionsv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/decisions/v1"
policylangv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/language/v1"
Expand All @@ -14,10 +19,6 @@ import (
"github.com/fluxninja/aperture/pkg/paths"
"github.com/fluxninja/aperture/pkg/policies/controlplane/iface"
"github.com/fluxninja/aperture/pkg/policies/controlplane/runtime"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/fx"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
)

type rateLimiterSync struct {
Expand Down Expand Up @@ -133,11 +134,11 @@ func (limiterSync *rateLimiterSync) publishLimit(limitValue float64) error {
limiterSync.decision.Limit = limitValue
// Publish decision
log.Debug().Float64("limit", limitValue).Msg("publishing rate limiter decision")
wrapper := &configv1.RateLimiterWrapper{
RateLimiter: limiterSync.rateLimiterProto,
ComponentIndex: int64(limiterSync.componentIndex),
PolicyName: limiterSync.policyReadAPI.GetPolicyName(),
PolicyHash: limiterSync.policyReadAPI.GetPolicyHash(),
wrapper := &configv1.RateLimiterDecisionWrapper{
RateLimiterDecision: limiterSync.decision,
ComponentIndex: int64(limiterSync.componentIndex),
PolicyName: limiterSync.policyReadAPI.GetPolicyName(),
PolicyHash: limiterSync.policyReadAPI.GetPolicyHash(),
}
dat, err := proto.Marshal(wrapper)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (conLimiter *concurrencyLimiter) GetSelector() *selectorv1.Selector {
}

// RunLimiter .
func (conLimiter *concurrencyLimiter) RunLimiter(labels selectors.Labels) *flowcontrolv1.LimiterDecision {
func (conLimiter *concurrencyLimiter) RunLimiter(labels selectors.Labels, decision *flowcontrolv1.LimiterDecision) {
var matchedWorkloadProto *policylangv1.Scheduler_Workload
var matchedWorkloadIndex string
// match labels against conLimiter.workloadMultiMatcher
Expand Down Expand Up @@ -489,16 +489,13 @@ func (conLimiter *concurrencyLimiter) RunLimiter(labels selectors.Labels) *flowc
if accepted {
conLimiter.acceptedConcurrencyCounter.Add(float64(reqContext.Tokens))
}

return &flowcontrolv1.LimiterDecision{
PolicyName: conLimiter.GetPolicyName(),
PolicyHash: conLimiter.GetPolicyHash(),
ComponentIndex: conLimiter.GetComponentIndex(),
Dropped: !accepted,
Details: &flowcontrolv1.LimiterDecision_ConcurrencyLimiter_{
ConcurrencyLimiter: &flowcontrolv1.LimiterDecision_ConcurrencyLimiter{
WorkloadIndex: matchedWorkloadIndex,
},
decision.PolicyName = conLimiter.GetPolicyName()
decision.PolicyHash = conLimiter.GetPolicyHash()
decision.ComponentIndex = conLimiter.GetComponentIndex()
decision.Dropped = !accepted
decision.Details = &flowcontrolv1.LimiterDecision_ConcurrencyLimiter_{
ConcurrencyLimiter: &flowcontrolv1.LimiterDecision_ConcurrencyLimiter{
WorkloadIndex: matchedWorkloadIndex,
},
}
}
Expand Down
45 changes: 22 additions & 23 deletions pkg/policies/dataplane/actuators/rate/rate-limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func setupRateLimiterFactory(
ai *agentinfo.AgentInfo,
) error {
agentGroupName := ai.GetAgentGroup()
rateLimitDecisionsWatcher, err := etcdwatcher.NewWatcher(etcdClient,
path.Join(paths.RateLimiterDecisionsPath, paths.AgentGroupPrefix(agentGroupName)))
etcdPath := path.Join(paths.RateLimiterDecisionsPath)
rateLimitDecisionsWatcher, err := etcdwatcher.NewWatcher(etcdClient, etcdPath)
if err != nil {
return err
}
Expand All @@ -109,7 +109,9 @@ func setupRateLimiterFactory(
}

fxDriver := &notifiers.FxDriver{
FxOptionsFuncs: []notifiers.FxOptionsFunc{rateLimiterFactory.newRateLimiterOptions},
FxOptionsFuncs: []notifiers.FxOptionsFunc{
rateLimiterFactory.newRateLimiterOptions,
},
UnmarshalPrefixNotifier: notifiers.UnmarshalPrefixNotifier{
GetUnmarshallerFunc: config.NewProtobufUnmarshaller,
},
Expand Down Expand Up @@ -162,11 +164,11 @@ func (rateLimiterFactory *rateLimiterFactory) newRateLimiterOptions(
return fx.Options(), err
}

rateLimiterMessage := wrapperMessage.RateLimiter
rateLimiterProto := wrapperMessage.RateLimiter

rateLimiter := &rateLimiter{
Component: wrapperMessage,
rateLimiterProto: rateLimiterMessage,
rateLimiterProto: rateLimiterProto,
rateLimiterFactory: rateLimiterFactory,
statusRegistry: reg,
}
Expand Down Expand Up @@ -199,10 +201,9 @@ func (rateLimiter *rateLimiter) setup(lifecycle fx.Lifecycle) error {
if err != nil {
return err
}
decisionKey := paths.DataplaneComponentKey(rateLimiter.rateLimiterFactory.agentGroupName, rateLimiter.GetPolicyName(), rateLimiter.GetComponentIndex())
decisionNotifier := notifiers.NewUnmarshalKeyNotifier(
notifiers.Key(paths.DataplaneComponentKey(rateLimiter.rateLimiterFactory.agentGroupName,
rateLimiter.GetPolicyName(),
rateLimiter.GetComponentIndex())),
notifiers.Key(decisionKey),
unmarshaller,
rateLimiter.decisionUpdateCallback,
)
Expand All @@ -216,7 +217,8 @@ func (rateLimiter *rateLimiter) setup(lifecycle fx.Lifecycle) error {
label := rateLimiter.rateLimiterProto.GetLabelKey() + ":" + override.GetLabelValue()
rateLimiter.rateLimitChecker.AddOverride(label, override.GetLimitScaleFactor())
}
rateLimiter.rateTracker, err = ratetracker.NewDistCacheRateTracker(rateLimiter.rateLimitChecker,
rateLimiter.rateTracker, err = ratetracker.NewDistCacheRateTracker(
rateLimiter.rateLimitChecker,
rateLimiter.rateLimiterFactory.distCache,
rateLimiter.name,
rateLimiter.rateLimiterProto.GetLimitResetInterval().AsDuration())
Expand Down Expand Up @@ -282,7 +284,7 @@ func (rateLimiter *rateLimiter) GetSelector() *selectorv1.Selector {
}

// RunLimiter runs the limiter.
func (rateLimiter *rateLimiter) RunLimiter(labels selectors.Labels) *flowcontrolv1.LimiterDecision {
func (rateLimiter *rateLimiter) RunLimiter(labels selectors.Labels, decision *flowcontrolv1.LimiterDecision) {
reason := flowcontrolv1.LimiterDecision_LIMITER_REASON_UNSPECIFIED

label, ok, remaining, current := rateLimiter.TakeN(labels, 1)
Expand All @@ -291,18 +293,16 @@ func (rateLimiter *rateLimiter) RunLimiter(labels selectors.Labels) *flowcontrol
reason = flowcontrolv1.LimiterDecision_LIMITER_REASON_KEY_NOT_FOUND
}

return &flowcontrolv1.LimiterDecision{
PolicyName: rateLimiter.GetPolicyName(),
PolicyHash: rateLimiter.GetPolicyHash(),
ComponentIndex: rateLimiter.GetComponentIndex(),
Dropped: !ok,
Reason: reason,
Details: &flowcontrolv1.LimiterDecision_RateLimiter_{
RateLimiter: &flowcontrolv1.LimiterDecision_RateLimiter{
Label: label,
Remaining: int64(remaining),
Current: int64(current),
},
decision.PolicyName = rateLimiter.GetPolicyName()
decision.PolicyHash = rateLimiter.GetPolicyHash()
decision.ComponentIndex = rateLimiter.GetComponentIndex()
decision.Dropped = !ok
decision.Reason = reason
decision.Details = &flowcontrolv1.LimiterDecision_RateLimiter_{
RateLimiter: &flowcontrolv1.LimiterDecision_RateLimiter{
Label: label,
Remaining: int64(remaining),
Current: int64(current),
},
}
}
Expand All @@ -320,7 +320,6 @@ func (rateLimiter *rateLimiter) TakeN(labels selectors.Labels, n int) (label str
label = labelKey + ":" + labelValue

ok, remaining, current = rateLimiter.rateTracker.TakeN(label, n)

return
}

Expand Down
38 changes: 28 additions & 10 deletions pkg/policies/dataplane/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (e *Engine) ProcessRequest(controlPoint selectors.ControlPoint, serviceIDs
return
}

// execute rate limiters first
// execute concurrency limiters
concurrencyLimiters := make([]iface.Limiter, len(mmr.concurrencyLimiters))
copy(concurrencyLimiters, mmr.concurrencyLimiters)

Expand All @@ -118,21 +118,39 @@ func (e *Engine) ProcessRequest(controlPoint selectors.ControlPoint, serviceIDs
}

func runLimiters(limiters []iface.Limiter, labels selectors.Labels) ([]*flowcontrolv1.LimiterDecision, flowcontrolv1.DecisionType) {
decisionType := flowcontrolv1.DecisionType_DECISION_TYPE_ACCEPTED
var wg sync.WaitGroup
limiterDecisions := make([]*flowcontrolv1.LimiterDecision, len(limiters))
for i, limiter := range limiters {
wg.Add(1)
panichandler.Go(func() {
var once sync.Once

decisionType := flowcontrolv1.DecisionType_DECISION_TYPE_ACCEPTED

setDecisionRejected := func() {
decisionType = flowcontrolv1.DecisionType_DECISION_TYPE_REJECTED
}

execLimiter := func(limiter iface.Limiter, decision *flowcontrolv1.LimiterDecision) func() {
return func() {
defer wg.Done()
decision := limiter.RunLimiter(labels)
limiter.RunLimiter(labels, decision)
if decision.Dropped {
decisionType = flowcontrolv1.DecisionType_DECISION_TYPE_REJECTED
once.Do(setDecisionRejected)
}
limiterDecisions[i] = decision
})
}
}

limiterDecisions := make([]*flowcontrolv1.LimiterDecision, len(limiters))
// execute limiters
for i, limiter := range limiters {
wg.Add(1)
decision := &flowcontrolv1.LimiterDecision{}
limiterDecisions[i] = decision
if i == len(limiters)-1 {
execLimiter(limiter, decision)()
} else {
panichandler.Go(execLimiter(limiter, decision))
}
}
wg.Wait()

return limiterDecisions, decisionType
}

Expand Down
Loading

0 comments on commit eee6f64

Please sign in to comment.