diff --git a/pkg/yurthub/metrics/metrics.go b/pkg/yurthub/metrics/metrics.go index d098ae3634b..409519cba44 100644 --- a/pkg/yurthub/metrics/metrics.go +++ b/pkg/yurthub/metrics/metrics.go @@ -39,6 +39,7 @@ type HubMetrics struct { inFlightRequestsGauge prometheus.Gauge rejectedRequestsCounter prometheus.Counter closableConnsCollector *prometheus.GaugeVec + proxyTrafficCollector *prometheus.CounterVec } func newHubMetrics() *HubMetrics { @@ -80,17 +81,27 @@ func newHubMetrics() *HubMetrics { Help: "collector of underlay tcp connection from hub agent to remote server", }, []string{"server"}) + proxyTrafficCollector := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "proxy_traffic_collector", + Help: "collector of proxy response traffic by hub agent(unit: byte)", + }, + []string{"client", "verb", "resource", "subresources"}) prometheus.MustRegister(serversHealthyCollector) prometheus.MustRegister(inFlightRequestsCollector) prometheus.MustRegister(inFlightRequestsGauge) prometheus.MustRegister(rejectedRequestsCounter) prometheus.MustRegister(closableConnsCollector) + prometheus.MustRegister(proxyTrafficCollector) return &HubMetrics{ serversHealthyCollector: serversHealthyCollector, inFlightRequestsCollector: inFlightRequestsCollector, inFlightRequestsGauge: inFlightRequestsGauge, rejectedRequestsCounter: rejectedRequestsCounter, closableConnsCollector: closableConnsCollector, + proxyTrafficCollector: proxyTrafficCollector, } } @@ -99,6 +110,7 @@ func (hm *HubMetrics) Reset() { hm.inFlightRequestsCollector.Reset() hm.inFlightRequestsGauge.Set(float64(0)) hm.closableConnsCollector.Reset() + hm.proxyTrafficCollector.Reset() } func (hm *HubMetrics) ObserveServerHealthy(server string, status int) { @@ -130,3 +142,9 @@ func (hm *HubMetrics) DecClosableConns(server string) { func (hm *HubMetrics) SetClosableConns(server string, cnt int) { hm.closableConnsCollector.WithLabelValues(server).Set(float64(cnt)) } + +func (hm *HubMetrics) AddProxyTrafficCollector(client, verb, resource, subresource string, size int) { + if size > 0 { + hm.proxyTrafficCollector.WithLabelValues(client, verb, resource, subresource).Add(float64(size)) + } +} diff --git a/pkg/yurthub/proxy/local/local.go b/pkg/yurthub/proxy/local/local.go index d0bb295aade..bd67b87881b 100644 --- a/pkg/yurthub/proxy/local/local.go +++ b/pkg/yurthub/proxy/local/local.go @@ -116,7 +116,7 @@ func (lp *LocalProxy) localPost(w http.ResponseWriter, req *http.Request) error ctx = util.WithRespContentType(ctx, reqContentType) req = req.WithContext(ctx) stopCh := make(chan struct{}) - rc, prc := util.NewDualReadCloser(req.Body, false) + rc, prc := util.NewDualReadCloser(req, req.Body, false) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { klog.V(2).Infof("cache events when cluster is unhealthy, %v", lp.cacheMgr.CacheResponse(req, prc, stopCh)) }(req, prc, stopCh) diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index a9028dab592..9f4b66f0b74 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -132,7 +132,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error { } } - rc, prc := util.NewDualReadCloser(resp.Body, true) + rc, prc := util.NewDualReadCloser(req, resp.Body, true) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { err := rp.cacheMgr.CacheResponse(req, prc, stopCh) if err != nil && err != io.EOF && err != context.Canceled { diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 1bf752d6ba7..7d7f74f7035 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -27,6 +27,8 @@ import ( "strings" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/metrics" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" @@ -188,9 +190,10 @@ func Err(err error, w http.ResponseWriter, req *http.Request) { } // NewDualReadCloser create an dualReadCloser object -func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) { +func NewDualReadCloser(req *http.Request, rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) { pr, pw := io.Pipe() dr := &dualReadCloser{ + req: req, rc: rc, pw: pw, isRespBody: isRespBody, @@ -200,8 +203,9 @@ func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.Rea } type dualReadCloser struct { - rc io.ReadCloser - pw *io.PipeWriter + req *http.Request + rc io.ReadCloser + pw *io.PipeWriter // isRespBody shows rc(is.ReadCloser) is a response.Body // or not(maybe a request.Body). if it is true(it's a response.Body), // we should close the response body in Close func, else not, @@ -211,6 +215,17 @@ type dualReadCloser struct { // Read read data into p and write into pipe func (dr *dualReadCloser) Read(p []byte) (n int, err error) { + defer func() { + if dr.req != nil && dr.isRespBody { + ctx := dr.req.Context() + info, _ := apirequest.RequestInfoFrom(ctx) + if info.IsResourceRequest { + comp, _ := ClientComponentFrom(ctx) + metrics.Metrics.AddProxyTrafficCollector(comp, info.Verb, info.Resource, info.Subresource, n) + } + } + }() + n, err = dr.rc.Read(p) if n > 0 { if n, err := dr.pw.Write(p[:n]); err != nil { diff --git a/pkg/yurthub/util/util_test.go b/pkg/yurthub/util/util_test.go index 0da45e382a9..b9370f86909 100644 --- a/pkg/yurthub/util/util_test.go +++ b/pkg/yurthub/util/util_test.go @@ -27,7 +27,7 @@ func TestDualReader(t *testing.T) { src := []byte("hello, world") rb := bytes.NewBuffer(src) rc := ioutil.NopCloser(rb) - drc, prc := NewDualReadCloser(rc, true) + drc, prc := NewDualReadCloser(nil, rc, true) rc = drc dst1 := make([]byte, len(src)) dst2 := make([]byte, len(src)) @@ -67,7 +67,7 @@ func TestDualReaderByPreClose(t *testing.T) { src := []byte("hello, world") rb := bytes.NewBuffer(src) rc := ioutil.NopCloser(rb) - drc, prc := NewDualReadCloser(rc, true) + drc, prc := NewDualReadCloser(nil, rc, true) rc = drc dst := make([]byte, len(src))