Skip to content

Commit

Permalink
feature: add promtheus metrics for yurthub (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored Mar 19, 2021
1 parent a4e3240 commit 989d3f4
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/yurthub/healthchecker/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"

"k8s.io/klog"
Expand Down Expand Up @@ -121,6 +122,11 @@ func newChecker(url *url.URL, tp transport.Interface, failedRetry, healthyThresh
klog.Errorf("cluster(%s) init status: unhealthy, %v", c.serverHealthzAddr, err)
}
c.clusterHealthy = initHealthyStatus
if c.clusterHealthy {
metrics.Metrics.ObserveServerHealthy(c.netAddress, 1)
} else {
metrics.Metrics.ObserveServerHealthy(c.netAddress, 0)
}

go c.healthyCheckLoop(stopCh)
return c, nil
Expand Down Expand Up @@ -172,6 +178,7 @@ func (c *checker) healthyCheckLoop(stopCh <-chan struct{}) {
klog.Infof("cluster becomes unhealthy from %v, healthy status lasts %v", now, now.Sub(c.lastTime))
c.onFailureFunc(c.netAddress)
c.lastTime = now
metrics.Metrics.ObserveServerHealthy(c.netAddress, 0)
}
} else {
// with continuous 2 times cluster healthy, unhealthy will changed to healthy
Expand All @@ -183,6 +190,7 @@ func (c *checker) healthyCheckLoop(stopCh <-chan struct{}) {
now := time.Now()
klog.Infof("cluster becomes healthy from %v, unhealthy status lasts %v", now, now.Sub(c.lastTime))
c.lastTime = now
metrics.Metrics.ObserveServerHealthy(c.netAddress, 1)
}
}
}
Expand Down
132 changes: 132 additions & 0 deletions pkg/yurthub/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"strings"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/prometheus/client_golang/prometheus"
)

var (
namespace = "node"
subsystem = strings.ReplaceAll(projectinfo.GetHubName(), "-", "_")
)

var (
// Metrics provides access to all hub agent metrics.
Metrics = newHubMetrics()
)

type HubMetrics struct {
serversHealthyCollector *prometheus.GaugeVec
inFlightRequestsCollector *prometheus.GaugeVec
inFlightRequestsGauge prometheus.Gauge
rejectedRequestsCounter prometheus.Counter
closableConnsCollector *prometheus.GaugeVec
}

func newHubMetrics() *HubMetrics {
serversHealthyCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "server_healthy_status",
Help: "healthy status of remote servers. 1: healthy, 0: unhealthy",
},
[]string{"server"})
inFlightRequestsCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "in_flight_requests_collector",
Help: "collector of in flight requests handling by hub agent",
},
[]string{"verb", "resource", "subresources", "client"})
inFlightRequestsGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "in_flight_requests_total",
Help: "total of in flight requests handling by hub agent",
})
rejectedRequestsCounter := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rejected_requests_counter",
Help: "counter of rejected requests for exceeding in flight limit in hub agent",
})
closableConnsCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "closable_conns_collector",
Help: "collector of underlay tcp connection from hub agent to remote server",
},
[]string{"server"})
prometheus.MustRegister(serversHealthyCollector)
prometheus.MustRegister(inFlightRequestsCollector)
prometheus.MustRegister(inFlightRequestsGauge)
prometheus.MustRegister(rejectedRequestsCounter)
prometheus.MustRegister(closableConnsCollector)
return &HubMetrics{
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
inFlightRequestsGauge: inFlightRequestsGauge,
rejectedRequestsCounter: rejectedRequestsCounter,
closableConnsCollector: closableConnsCollector,
}
}

func (hm *HubMetrics) Reset() {
hm.serversHealthyCollector.Reset()
hm.inFlightRequestsCollector.Reset()
hm.inFlightRequestsGauge.Set(float64(0))
hm.closableConnsCollector.Reset()
}

func (hm *HubMetrics) ObserveServerHealthy(server string, status int) {
hm.serversHealthyCollector.WithLabelValues(server).Set(float64(status))
}

func (hm *HubMetrics) IncInFlightRequests(verb, resource, subresource, client string) {
hm.inFlightRequestsCollector.WithLabelValues(verb, resource, subresource, client).Inc()
hm.inFlightRequestsGauge.Inc()
}

func (hm *HubMetrics) DecInFlightRequests(verb, resource, subresource, client string) {
hm.inFlightRequestsCollector.WithLabelValues(verb, resource, subresource, client).Dec()
hm.inFlightRequestsGauge.Dec()
}

func (hm *HubMetrics) IncRejectedRequestCounter() {
hm.rejectedRequestsCounter.Inc()
}

func (hm *HubMetrics) IncClosableConns(server string) {
hm.closableConnsCollector.WithLabelValues(server).Inc()
}

func (hm *HubMetrics) DecClosableConns(server string) {
hm.closableConnsCollector.WithLabelValues(server).Dec()
}

func (hm *HubMetrics) SetClosableConns(server string, cnt int) {
hm.closableConnsCollector.WithLabelValues(server).Set(float64(cnt))
}
14 changes: 13 additions & 1 deletion pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"strings"
"time"

"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/util"

"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -144,10 +146,18 @@ func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) {
// WithRequestTrace used to trace status code and handle time for request.
func WithRequestTrace(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := apirequest.RequestInfoFrom(req.Context())
client, _ := util.ClientComponentFrom(req.Context())
if ok && info.IsResourceRequest {
metrics.Metrics.IncInFlightRequests(info.Verb, info.Resource, info.Subresource, client)
defer metrics.Metrics.DecInFlightRequests(info.Verb, info.Resource, info.Subresource, client)
}
wrapperRW := newWrapperResponseWriter(w)
start := time.Now()
defer func() {
klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start))
}()
handler.ServeHTTP(wrapperRW, req)
klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start))
})
}

Expand All @@ -162,6 +172,7 @@ func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
select {
case reqChan <- true:
klog.V(2).Infof("start proxying: %s %s, in flight requests: %d", strings.ToLower(req.Method), req.URL.String(), len(reqChan))
defer func() {
<-reqChan
klog.V(5).Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan))
Expand All @@ -170,6 +181,7 @@ func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler {
default:
// Return a 429 status indicating "Too Many Requests"
klog.Errorf("Too many requests, please try again later, %s", util.ReqString(req))
metrics.Metrics.IncRejectedRequestCounter()
w.Header().Set("Retry-After", "1")
util.Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"net/http"

"github.com/gorilla/mux"
"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
"github.com/openyurtio/openyurt/pkg/yurthub/profile"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Server is an interface for providing http service for yurthub
Expand Down Expand Up @@ -90,6 +92,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica
if cfg.EnableProfiling {
profile.Install(c)
}

// register handler for metrics
c.Handle("/metrics", promhttp.Handler())
}

// healthz returns ok for healthz request
Expand Down
6 changes: 6 additions & 0 deletions pkg/yurthub/util/connrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/openyurtio/openyurt/pkg/yurthub/metrics"

"k8s.io/klog"
)

Expand All @@ -44,6 +46,7 @@ func (c *closableConn) Close() error {
}
c.dialer.mu.Unlock()
klog.Infof("close connection from %s to %s for %s dialer, remain %d connections", c.Conn.LocalAddr().String(), c.addr, c.dialer.name, remain)
metrics.Metrics.SetClosableConns(c.addr, remain)
return c.Conn.Close()
}

Expand Down Expand Up @@ -89,6 +92,7 @@ func (d *Dialer) CloseAll() {
for conn := range conns {
conn.Conn.Close()
delete(conns, conn)
metrics.Metrics.DecClosableConns(addr)
}
delete(addrConns, addr)
}
Expand All @@ -107,6 +111,7 @@ func (d *Dialer) Close(address string) {
for conn := range conns {
conn.Conn.Close()
delete(conns, conn)
metrics.Metrics.DecClosableConns(address)
}
}

Expand Down Expand Up @@ -138,5 +143,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
d.mu.Unlock()

klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, len(d.addrConns[address]), d.name)
metrics.Metrics.IncClosableConns(address)
return closable, nil
}

0 comments on commit 989d3f4

Please sign in to comment.