From 989d3f461cf112ad0e1ec66235b6d5f486e7bdb2 Mon Sep 17 00:00:00 2001 From: rambohe Date: Sat, 20 Mar 2021 01:26:19 +0800 Subject: [PATCH] feature: add promtheus metrics for yurthub (#238) --- pkg/yurthub/healthchecker/health_checker.go | 8 ++ pkg/yurthub/metrics/metrics.go | 132 ++++++++++++++++++++ pkg/yurthub/proxy/util/util.go | 14 ++- pkg/yurthub/server/server.go | 7 +- pkg/yurthub/util/connrotation.go | 6 + 5 files changed, 165 insertions(+), 2 deletions(-) create mode 100644 pkg/yurthub/metrics/metrics.go diff --git a/pkg/yurthub/healthchecker/health_checker.go b/pkg/yurthub/healthchecker/health_checker.go index d47c34c976f..ae31bfae76b 100644 --- a/pkg/yurthub/healthchecker/health_checker.go +++ b/pkg/yurthub/healthchecker/health_checker.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/openyurtio/openyurt/pkg/yurthub/metrics" "github.com/openyurtio/openyurt/pkg/yurthub/transport" "k8s.io/klog" @@ -121,6 +122,11 @@ func newChecker(url *url.URL, tp transport.Interface, failedRetry, healthyThresh klog.Errorf("cluster(%s) init status: unhealthy, %v", c.serverHealthzAddr, err) } c.clusterHealthy = initHealthyStatus + if c.clusterHealthy { + metrics.Metrics.ObserveServerHealthy(c.netAddress, 1) + } else { + metrics.Metrics.ObserveServerHealthy(c.netAddress, 0) + } go c.healthyCheckLoop(stopCh) return c, nil @@ -172,6 +178,7 @@ func (c *checker) healthyCheckLoop(stopCh <-chan struct{}) { klog.Infof("cluster becomes unhealthy from %v, healthy status lasts %v", now, now.Sub(c.lastTime)) c.onFailureFunc(c.netAddress) c.lastTime = now + metrics.Metrics.ObserveServerHealthy(c.netAddress, 0) } } else { // with continuous 2 times cluster healthy, unhealthy will changed to healthy @@ -183,6 +190,7 @@ func (c *checker) healthyCheckLoop(stopCh <-chan struct{}) { now := time.Now() klog.Infof("cluster becomes healthy from %v, unhealthy status lasts %v", now, now.Sub(c.lastTime)) c.lastTime = now + metrics.Metrics.ObserveServerHealthy(c.netAddress, 1) } } } diff --git a/pkg/yurthub/metrics/metrics.go b/pkg/yurthub/metrics/metrics.go new file mode 100644 index 00000000000..d098ae3634b --- /dev/null +++ b/pkg/yurthub/metrics/metrics.go @@ -0,0 +1,132 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "strings" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + namespace = "node" + subsystem = strings.ReplaceAll(projectinfo.GetHubName(), "-", "_") +) + +var ( + // Metrics provides access to all hub agent metrics. + Metrics = newHubMetrics() +) + +type HubMetrics struct { + serversHealthyCollector *prometheus.GaugeVec + inFlightRequestsCollector *prometheus.GaugeVec + inFlightRequestsGauge prometheus.Gauge + rejectedRequestsCounter prometheus.Counter + closableConnsCollector *prometheus.GaugeVec +} + +func newHubMetrics() *HubMetrics { + serversHealthyCollector := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "server_healthy_status", + Help: "healthy status of remote servers. 1: healthy, 0: unhealthy", + }, + []string{"server"}) + inFlightRequestsCollector := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "in_flight_requests_collector", + Help: "collector of in flight requests handling by hub agent", + }, + []string{"verb", "resource", "subresources", "client"}) + inFlightRequestsGauge := prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "in_flight_requests_total", + Help: "total of in flight requests handling by hub agent", + }) + rejectedRequestsCounter := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rejected_requests_counter", + Help: "counter of rejected requests for exceeding in flight limit in hub agent", + }) + closableConnsCollector := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "closable_conns_collector", + Help: "collector of underlay tcp connection from hub agent to remote server", + }, + []string{"server"}) + prometheus.MustRegister(serversHealthyCollector) + prometheus.MustRegister(inFlightRequestsCollector) + prometheus.MustRegister(inFlightRequestsGauge) + prometheus.MustRegister(rejectedRequestsCounter) + prometheus.MustRegister(closableConnsCollector) + return &HubMetrics{ + serversHealthyCollector: serversHealthyCollector, + inFlightRequestsCollector: inFlightRequestsCollector, + inFlightRequestsGauge: inFlightRequestsGauge, + rejectedRequestsCounter: rejectedRequestsCounter, + closableConnsCollector: closableConnsCollector, + } +} + +func (hm *HubMetrics) Reset() { + hm.serversHealthyCollector.Reset() + hm.inFlightRequestsCollector.Reset() + hm.inFlightRequestsGauge.Set(float64(0)) + hm.closableConnsCollector.Reset() +} + +func (hm *HubMetrics) ObserveServerHealthy(server string, status int) { + hm.serversHealthyCollector.WithLabelValues(server).Set(float64(status)) +} + +func (hm *HubMetrics) IncInFlightRequests(verb, resource, subresource, client string) { + hm.inFlightRequestsCollector.WithLabelValues(verb, resource, subresource, client).Inc() + hm.inFlightRequestsGauge.Inc() +} + +func (hm *HubMetrics) DecInFlightRequests(verb, resource, subresource, client string) { + hm.inFlightRequestsCollector.WithLabelValues(verb, resource, subresource, client).Dec() + hm.inFlightRequestsGauge.Dec() +} + +func (hm *HubMetrics) IncRejectedRequestCounter() { + hm.rejectedRequestsCounter.Inc() +} + +func (hm *HubMetrics) IncClosableConns(server string) { + hm.closableConnsCollector.WithLabelValues(server).Inc() +} + +func (hm *HubMetrics) DecClosableConns(server string) { + hm.closableConnsCollector.WithLabelValues(server).Dec() +} + +func (hm *HubMetrics) SetClosableConns(server string, cnt int) { + hm.closableConnsCollector.WithLabelValues(server).Set(float64(cnt)) +} diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index 63956e82ed8..b06f21bb7b4 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -22,7 +22,9 @@ import ( "strings" "time" + "github.com/openyurtio/openyurt/pkg/yurthub/metrics" "github.com/openyurtio/openyurt/pkg/yurthub/util" + "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -144,10 +146,18 @@ func (wrw *wrapperResponseWriter) WriteHeader(statusCode int) { // WithRequestTrace used to trace status code and handle time for request. func WithRequestTrace(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + info, ok := apirequest.RequestInfoFrom(req.Context()) + client, _ := util.ClientComponentFrom(req.Context()) + if ok && info.IsResourceRequest { + metrics.Metrics.IncInFlightRequests(info.Verb, info.Resource, info.Subresource, client) + defer metrics.Metrics.DecInFlightRequests(info.Verb, info.Resource, info.Subresource, client) + } wrapperRW := newWrapperResponseWriter(w) start := time.Now() + defer func() { + klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start)) + }() handler.ServeHTTP(wrapperRW, req) - klog.Infof("%s with status code %d, spent %v", util.ReqString(req), wrapperRW.statusCode, time.Since(start)) }) } @@ -162,6 +172,7 @@ func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { select { case reqChan <- true: + klog.V(2).Infof("start proxying: %s %s, in flight requests: %d", strings.ToLower(req.Method), req.URL.String(), len(reqChan)) defer func() { <-reqChan klog.V(5).Infof("%s request completed, left %d requests in flight", util.ReqString(req), len(reqChan)) @@ -170,6 +181,7 @@ func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler { default: // Return a 429 status indicating "Too Many Requests" klog.Errorf("Too many requests, please try again later, %s", util.ReqString(req)) + metrics.Metrics.IncRejectedRequestCounter() w.Header().Set("Retry-After", "1") util.Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req) } diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 2c5e8188403..7c09dcffaf2 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -20,10 +20,12 @@ import ( "fmt" "net/http" - "github.com/gorilla/mux" "github.com/openyurtio/openyurt/cmd/yurthub/app/config" "github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces" "github.com/openyurtio/openyurt/pkg/yurthub/profile" + + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // Server is an interface for providing http service for yurthub @@ -90,6 +92,9 @@ func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certifica if cfg.EnableProfiling { profile.Install(c) } + + // register handler for metrics + c.Handle("/metrics", promhttp.Handler()) } // healthz returns ok for healthz request diff --git a/pkg/yurthub/util/connrotation.go b/pkg/yurthub/util/connrotation.go index 3dafee6d5c5..1c48ebf5fdd 100644 --- a/pkg/yurthub/util/connrotation.go +++ b/pkg/yurthub/util/connrotation.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/openyurtio/openyurt/pkg/yurthub/metrics" + "k8s.io/klog" ) @@ -44,6 +46,7 @@ func (c *closableConn) Close() error { } c.dialer.mu.Unlock() klog.Infof("close connection from %s to %s for %s dialer, remain %d connections", c.Conn.LocalAddr().String(), c.addr, c.dialer.name, remain) + metrics.Metrics.SetClosableConns(c.addr, remain) return c.Conn.Close() } @@ -89,6 +92,7 @@ func (d *Dialer) CloseAll() { for conn := range conns { conn.Conn.Close() delete(conns, conn) + metrics.Metrics.DecClosableConns(addr) } delete(addrConns, addr) } @@ -107,6 +111,7 @@ func (d *Dialer) Close(address string) { for conn := range conns { conn.Conn.Close() delete(conns, conn) + metrics.Metrics.DecClosableConns(address) } } @@ -138,5 +143,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net. d.mu.Unlock() klog.Infof("create a connection from %s to %s, total %d connections in %s dialer", conn.LocalAddr().String(), address, len(d.addrConns[address]), d.name) + metrics.Metrics.IncClosableConns(address) return closable, nil }