Skip to content

Commit

Permalink
Add metrics about watch counts seen by APF
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeSpreitzer committed Nov 1, 2021
1 parent 945f960 commit 154bf6a
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func WithPriorityAndFairness(
}

var classification *PriorityAndFairnessClassification
note := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) {
estimateWork := func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) flowcontrolrequest.WorkEstimate {
classification = &PriorityAndFairnessClassification{
FlowSchemaName: fs.Name,
FlowSchemaUID: fs.UID,
Expand All @@ -111,6 +111,7 @@ func WithPriorityAndFairness(
httplog.AddKeyValue(ctx, "apf_pl", truncateLogField(pl.Name))
httplog.AddKeyValue(ctx, "apf_fs", truncateLogField(fs.Name))
httplog.AddKeyValue(ctx, "apf_fd", truncateLogField(flowDistinguisher))
return workEstimator(r, fs.Name, pl.Name)
}

var served bool
Expand All @@ -137,13 +138,9 @@ func WithPriorityAndFairness(
}
}

// find the estimated amount of work of the request
// TODO: Estimate cost should also take fcIfc.GetWatchCount(requestInfo) as a parameter.
workEstimate := workEstimator.EstimateWork(r)
digest := utilflowcontrol.RequestDigest{
RequestInfo: requestInfo,
User: user,
WorkEstimate: workEstimate,
RequestInfo: requestInfo,
User: user,
}

if isWatchRequest {
Expand Down Expand Up @@ -179,7 +176,7 @@ func WithPriorityAndFairness(
execute := func() {
startedAt := time.Now()
defer func() {
httplog.AddKeyValue(ctx, "apf_init_latency", time.Now().Sub(startedAt))
httplog.AddKeyValue(ctx, "apf_init_latency", time.Since(startedAt))
}()
noteExecutingDelta(1)
defer noteExecutingDelta(-1)
Expand Down Expand Up @@ -238,7 +235,7 @@ func WithPriorityAndFairness(
// Note that Handle will return irrespective of whether the request
// executes or is rejected. In the latter case, the function will return
// without calling the passed `execute` function.
fcIfc.Handle(handleCtx, digest, note, queueNote, execute)
fcIfc.Handle(handleCtx, digest, estimateWork, queueNote, execute)
}()

select {
Expand Down Expand Up @@ -269,7 +266,7 @@ func WithPriorityAndFairness(
handler.ServeHTTP(w, r)
}

fcIfc.Handle(ctx, digest, note, queueNote, execute)
fcIfc.Handle(ctx, digest, estimateWork, queueNote, execute)
}

if !served {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const (
decisionSkipFilter
)

var defaultRequestWorkEstimator = func(*http.Request) fcrequest.WorkEstimate {
var defaultRequestWorkEstimator = func(req *http.Request, fsName, plName string) fcrequest.WorkEstimate {
return fcrequest.WorkEstimate{InitialSeats: 1}
}

Expand All @@ -87,14 +87,14 @@ func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) {

func (t fakeApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
) {
if t.mockDecision == decisionSkipFilter {
panic("Handle should not be invoked")
}
noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
workEstimator(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault, requestDigest.User.GetName())
switch t.mockDecision {
case decisionNoQueuingExecute:
execFn()
Expand Down Expand Up @@ -390,7 +390,7 @@ func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {

func (f *fakeWatchApfFilter) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ fq.QueueNoteFn,
execFn func(),
) {
Expand Down Expand Up @@ -635,14 +635,16 @@ func TestContextClosesOnRequestProcessed(t *testing.T) {
type fakeFilterRequestDigest struct {
*fakeApfFilter
requestDigestGot *utilflowcontrol.RequestDigest
workEstimateGot fcrequest.WorkEstimate
}

func (f *fakeFilterRequestDigest) Handle(ctx context.Context,
requestDigest utilflowcontrol.RequestDigest,
_ func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
_ fq.QueueNoteFn, _ func(),
) {
f.requestDigestGot = &requestDigest
f.workEstimateGot = workEstimator(bootstrap.MandatoryFlowSchemaCatchAll, bootstrap.MandatoryPriorityLevelConfigurationCatchAll, "")
}

func TestApfWithRequestDigest(t *testing.T) {
Expand All @@ -652,17 +654,17 @@ func TestApfWithRequestDigest(t *testing.T) {
reqDigestExpected := &utilflowcontrol.RequestDigest{
RequestInfo: &apirequest.RequestInfo{Verb: "get"},
User: &user.DefaultInfo{Name: "foo"},
WorkEstimate: fcrequest.WorkEstimate{
InitialSeats: 5,
FinalSeats: 7,
AdditionalLatency: 3 * time.Second,
},
}
workExpected := fcrequest.WorkEstimate{
InitialSeats: 5,
FinalSeats: 7,
AdditionalLatency: 3 * time.Second,
}

handler := WithPriorityAndFairness(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) {}),
longRunningFunc,
fakeFilter,
func(_ *http.Request) fcrequest.WorkEstimate { return reqDigestExpected.WorkEstimate },
func(_ *http.Request, _, _ string) fcrequest.WorkEstimate { return workExpected },
)

w := httptest.NewRecorder()
Expand All @@ -678,6 +680,9 @@ func TestApfWithRequestDigest(t *testing.T) {
if !reflect.DeepEqual(reqDigestExpected, fakeFilter.requestDigestGot) {
t.Errorf("Expected RequestDigest to match, diff: %s", cmp.Diff(reqDigestExpected, fakeFilter.requestDigestGot))
}
if !reflect.DeepEqual(workExpected, fakeFilter.workEstimateGot) {
t.Errorf("Expected WorkEstimate to match, diff: %s", cmp.Diff(workExpected, fakeFilter.workEstimateGot))
}
}

func TestPriorityAndFairnessWithPanicRecoveryAndTimeoutFilter(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, af

// RequestDigest holds necessary info from request for flow-control
type RequestDigest struct {
RequestInfo *request.RequestInfo
User user.Info
WorkEstimate fcrequest.WorkEstimate
RequestInfo *request.RequestInfo
User user.Info
}

// `*configController` maintains eventual consistency with the API
Expand Down Expand Up @@ -800,7 +799,7 @@ func (immediateRequest) Finish(execute func()) bool {
// The returned bool indicates whether the request is exempt from
// limitation. The startWaitingTime is when the request started
// waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time, flowDistinguisher string) {
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
Expand Down Expand Up @@ -832,24 +831,26 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}, ""
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
}
var numQueues int32
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
var flowDistinguisher string
var hashValue uint64
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
}
workEstimate := workEstimator(selectedFlowSchema, plState.pl, flowDistinguisher)
startWaitingTime = time.Now()
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &rd.WorkEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {
cfgCtlr.maybeReapReadLocked(plName, plState)
}
return selectedFlowSchema, plState.pl, false, req, startWaitingTime, flowDistinguisher
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
}

// maybeReap will remove the last internal traces of the named
Expand Down
13 changes: 7 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand All @@ -41,8 +42,9 @@ const ConfigConsumerAsFieldManager = "api-priority-and-fairness-config-consumer-
// Interface defines how the API Priority and Fairness filter interacts with the underlying system.
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification. If the
// characterized by the given digest. The given `workEstimator` will be
// invoked with the results of request classification and must return the
// work parameters for the request. If the
// request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides
Expand All @@ -53,7 +55,7 @@ type Interface interface {
// ctx is cancelled or times out.
Handle(ctx context.Context,
requestDigest RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func(),
)
Expand Down Expand Up @@ -148,12 +150,11 @@ func NewTestable(config TestableConfig) Interface {
}

func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
workEstimator func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string) fcrequest.WorkEstimate,
queueNoteFn fq.QueueNoteFn,
execFn func()) {
fs, pl, isExempt, req, startWaitingTime, flowDistinguisher := cfgCtlr.startRequest(ctx, requestDigest, queueNoteFn)
fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, workEstimator, queueNoteFn)
queued := startWaitingTime != time.Time{}
noteFn(fs, pl, flowDistinguisher)
if req == nil {
if queued {
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
startWG.Add(1)
go func(matches, isResource bool, rdu RequestDigest) {
expectedMatch := matches && ftr.wellFormed && (fsPrecedes(fs, catchAlls[isResource]) || fs.Name == catchAlls[isResource].Name)
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) {
ctlr.Handle(ctx, rdu, func(matchFS *flowcontrol.FlowSchema, matchPL *flowcontrol.PriorityLevelConfiguration, _ string) fcrequest.WorkEstimate {
matchIsExempt := matchPL.Spec.Type == flowcontrol.PriorityLevelEnablementExempt
if testDebugLogs {
t.Logf("Considering FlowSchema %s, expectedMatch=%v, isResource=%v: Handle(%#+v) => note(fs=%s, pl=%s, isExempt=%v)", fs.Name, expectedMatch, isResource, rdu, matchFS.Name, matchPL.Name, matchIsExempt)
Expand All @@ -475,6 +475,7 @@ func checkNewFS(cts *ctlrTestState, rng *rand.Rand, trialName string, ftr *fsTes
t.Errorf("Fail at %s/%s: expected=%v, actual=%v", trialName, fs.Name, fs.Spec.PriorityLevelConfiguration.Name, matchPL.Name)
}
}
return fcrequest.WorkEstimate{InitialSeats: 1}
}, func(inQueue bool) {
}, func() {
startWG.Done()
Expand Down
10 changes: 3 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
)

func TestMatching(t *testing.T) {
Expand Down Expand Up @@ -102,8 +101,7 @@ func TestLiterals(t *testing.T) {
Name: "eman",
Parts: []string{"goodrscs", "eman"},
},
User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
User: ui,
}
reqRU := RequestDigest{
RequestInfo: &request.RequestInfo{
Expand All @@ -118,17 +116,15 @@ func TestLiterals(t *testing.T) {
Name: "eman",
Parts: []string{"goodrscs", "eman"},
},
User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
User: ui,
}
reqN := RequestDigest{
RequestInfo: &request.RequestInfo{
IsResourceRequest: false,
Path: "/openapi/v2",
Verb: "goodverb",
},
User: ui,
WorkEstimate: fcrequest.WorkEstimate{InitialSeats: 1},
User: ui,
}
checkRules(t, true, reqRN, []flowcontrol.PolicyRulesWithSubjects{{
Subjects: []flowcontrol.Subject{{Kind: flowcontrol.SubjectKindUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ var (
},
[]string{priorityLevel, flowSchema},
)
watchCountSamples = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "watch_count_samples",
Help: "count of watchers for mutating requests in API Priority and Fairness",
Buckets: []float64{0, 1, 10, 100, 1000, 10000},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel, flowSchema},
)
apiserverEpochAdvances = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Expand All @@ -315,6 +326,7 @@ var (
apiserverCurrentExecutingRequests,
apiserverRequestWaitingSeconds,
apiserverRequestExecutionSeconds,
watchCountSamples,
apiserverEpochAdvances,
}.
Append(PriorityLevelExecutionSeatsObserverGenerator.metrics()...).
Expand Down Expand Up @@ -383,6 +395,12 @@ func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema str
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
}

// ObserveWatchCount notes a sampling of a watch count
func ObserveWatchCount(ctx context.Context, priorityLevel, flowSchema string, count int) {
watchCountSamples.WithLabelValues(priorityLevel, flowSchema).Observe(float64(count))
}

// AddEpochAdvance notes an advance of the progress meter baseline for a given priority level
func AddEpochAdvance(ctx context.Context, priorityLevel string, success bool) {
apiserverEpochAdvances.WithContext(ctx).WithLabelValues(priorityLevel, strconv.FormatBool(success)).Inc()
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type listWorkEstimator struct {
countGetterFn objectCountGetterFunc
}

func (e *listWorkEstimator) estimate(r *http.Request) WorkEstimate {
func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
// no RequestInfo should never happen, but to be on the safe side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
)

const (
Expand Down Expand Up @@ -49,7 +50,9 @@ type mutatingWorkEstimator struct {
enabled bool
}

func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
// TODO(wojtekt): Remove once we tune the algorithm to not fail
// scalability tests.
if !e.enabled {
return WorkEstimate{
InitialSeats: 1,
Expand All @@ -67,6 +70,7 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request) WorkEstimate {
}
}
watchCount := e.countFn(requestInfo)
metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount)

// The cost of the request associated with the watchers of that event
// consists of three parts:
Expand Down
Loading

0 comments on commit 154bf6a

Please sign in to comment.