Skip to content

Commit

Permalink
Add a metric to track usage of inflight request limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarek committed Jan 26, 2018
1 parent dd272ea commit 000d7ba
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
15 changes: 15 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ var (
},
[]string{"requestKind"},
)
// Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time
// it reports maximal usage during the last second.
currentInflightRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "apiserver_current_inflight_requests",
Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.",
},
[]string{"requestKind"},
)
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
)

Expand All @@ -104,6 +113,12 @@ func init() {
prometheus.MustRegister(requestLatenciesSummary)
prometheus.MustRegister(responseSizes)
prometheus.MustRegister(DroppedRequests)
prometheus.MustRegister(currentInflightRequests)
}

func UpdateInflightRequestMetrics(nonmutating, mutating int) {
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
}

// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/server/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
Expand Down
77 changes: 73 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,28 @@ package filters
import (
"fmt"
"net/http"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"

"github.com/golang/glog"
)

// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
const retryAfter = "1"
const (
// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
retryAfter = "1"

// How often inflight usage metric should be updated. Because
// the metrics tracks maximal value over period making this
// longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second
)

var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")

Expand All @@ -40,6 +50,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
glog.Errorf(err.Error())
}

// requestWatermark is used to trak maximal usage of inflight requests.
type requestWatermark struct {
lock sync.Mutex
readOnlyWatermark, mutatingWatermark int
}

func (w *requestWatermark) recordMutating(mutatingVal int) {
w.lock.Lock()
defer w.lock.Unlock()

if w.mutatingWatermark < mutatingVal {
w.mutatingWatermark = mutatingVal
}
}

func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.lock.Lock()
defer w.lock.Unlock()

if w.readOnlyWatermark < readOnlyVal {
w.readOnlyWatermark = readOnlyVal
}
}

var watermark = &requestWatermark{}

func startRecordingUsage() {
go func() {
wait.Forever(func() {
watermark.lock.Lock()
readOnlyWatermark := watermark.readOnlyWatermark
mutatingWatermark := watermark.mutatingWatermark
watermark.readOnlyWatermark = 0
watermark.mutatingWatermark = 0
watermark.lock.Unlock()

metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod)
}()
}

var startOnce sync.Once

// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit(
handler http.Handler,
Expand All @@ -48,6 +101,7 @@ func WithMaxInFlightLimit(
requestContextMapper apirequest.RequestContextMapper,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
startOnce.Do(startRecordingUsage)
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
Expand Down Expand Up @@ -92,7 +146,22 @@ func WithMaxInFlightLimit(

select {
case c <- true:
defer func() { <-c }()
var mutatingLen, readOnlyLen int
if isMutatingRequest {
mutatingLen = len(mutatingChan)
} else {
readOnlyLen = len(nonMutatingChan)
}

defer func() {
<-c
if isMutatingRequest {
watermark.recordMutating(mutatingLen)
} else {
watermark.recordReadOnly(readOnlyLen)
}

}()
handler.ServeHTTP(w, r)

default:
Expand Down

0 comments on commit 000d7ba

Please sign in to comment.