Skip to content

Commit

Permalink
feature: add traffic(from cloud to edge) collector metrics for yurthub (
Browse files Browse the repository at this point in the history
  • Loading branch information
rambohe-ch authored Jul 28, 2021
1 parent 0c0fa0c commit 3c3e412
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 7 deletions.
18 changes: 18 additions & 0 deletions pkg/yurthub/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type HubMetrics struct {
inFlightRequestsGauge prometheus.Gauge
rejectedRequestsCounter prometheus.Counter
closableConnsCollector *prometheus.GaugeVec
proxyTrafficCollector *prometheus.CounterVec
}

func newHubMetrics() *HubMetrics {
Expand Down Expand Up @@ -80,17 +81,27 @@ func newHubMetrics() *HubMetrics {
Help: "collector of underlay tcp connection from hub agent to remote server",
},
[]string{"server"})
proxyTrafficCollector := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "proxy_traffic_collector",
Help: "collector of proxy response traffic by hub agent(unit: byte)",
},
[]string{"client", "verb", "resource", "subresources"})
prometheus.MustRegister(serversHealthyCollector)
prometheus.MustRegister(inFlightRequestsCollector)
prometheus.MustRegister(inFlightRequestsGauge)
prometheus.MustRegister(rejectedRequestsCounter)
prometheus.MustRegister(closableConnsCollector)
prometheus.MustRegister(proxyTrafficCollector)
return &HubMetrics{
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
inFlightRequestsGauge: inFlightRequestsGauge,
rejectedRequestsCounter: rejectedRequestsCounter,
closableConnsCollector: closableConnsCollector,
proxyTrafficCollector: proxyTrafficCollector,
}
}

Expand All @@ -99,6 +110,7 @@ func (hm *HubMetrics) Reset() {
hm.inFlightRequestsCollector.Reset()
hm.inFlightRequestsGauge.Set(float64(0))
hm.closableConnsCollector.Reset()
hm.proxyTrafficCollector.Reset()
}

func (hm *HubMetrics) ObserveServerHealthy(server string, status int) {
Expand Down Expand Up @@ -130,3 +142,9 @@ func (hm *HubMetrics) DecClosableConns(server string) {
func (hm *HubMetrics) SetClosableConns(server string, cnt int) {
hm.closableConnsCollector.WithLabelValues(server).Set(float64(cnt))
}

func (hm *HubMetrics) AddProxyTrafficCollector(client, verb, resource, subresource string, size int) {
if size > 0 {
hm.proxyTrafficCollector.WithLabelValues(client, verb, resource, subresource).Add(float64(size))
}
}
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (lp *LocalProxy) localPost(w http.ResponseWriter, req *http.Request) error
ctx = util.WithRespContentType(ctx, reqContentType)
req = req.WithContext(ctx)
stopCh := make(chan struct{})
rc, prc := util.NewDualReadCloser(req.Body, false)
rc, prc := util.NewDualReadCloser(req, req.Body, false)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
klog.V(2).Infof("cache events when cluster is unhealthy, %v", lp.cacheMgr.CacheResponse(req, prc, stopCh))
}(req, prc, stopCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
}
}

rc, prc := util.NewDualReadCloser(resp.Body, true)
rc, prc := util.NewDualReadCloser(req, resp.Body, true)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
err := rp.cacheMgr.CacheResponse(req, prc, stopCh)
if err != nil && err != io.EOF && err != context.Canceled {
Expand Down
21 changes: 18 additions & 3 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
Expand Down Expand Up @@ -188,9 +190,10 @@ func Err(err error, w http.ResponseWriter, req *http.Request) {
}

// NewDualReadCloser create an dualReadCloser object
func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
func NewDualReadCloser(req *http.Request, rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
pr, pw := io.Pipe()
dr := &dualReadCloser{
req: req,
rc: rc,
pw: pw,
isRespBody: isRespBody,
Expand All @@ -200,8 +203,9 @@ func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.Rea
}

type dualReadCloser struct {
rc io.ReadCloser
pw *io.PipeWriter
req *http.Request
rc io.ReadCloser
pw *io.PipeWriter
// isRespBody shows rc(is.ReadCloser) is a response.Body
// or not(maybe a request.Body). if it is true(it's a response.Body),
// we should close the response body in Close func, else not,
Expand All @@ -211,6 +215,17 @@ type dualReadCloser struct {

// Read read data into p and write into pipe
func (dr *dualReadCloser) Read(p []byte) (n int, err error) {
defer func() {
if dr.req != nil && dr.isRespBody {
ctx := dr.req.Context()
info, _ := apirequest.RequestInfoFrom(ctx)
if info.IsResourceRequest {
comp, _ := ClientComponentFrom(ctx)
metrics.Metrics.AddProxyTrafficCollector(comp, info.Verb, info.Resource, info.Subresource, n)
}
}
}()

n, err = dr.rc.Read(p)
if n > 0 {
if n, err := dr.pw.Write(p[:n]); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDualReader(t *testing.T) {
src := []byte("hello, world")
rb := bytes.NewBuffer(src)
rc := ioutil.NopCloser(rb)
drc, prc := NewDualReadCloser(rc, true)
drc, prc := NewDualReadCloser(nil, rc, true)
rc = drc
dst1 := make([]byte, len(src))
dst2 := make([]byte, len(src))
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestDualReaderByPreClose(t *testing.T) {
src := []byte("hello, world")
rb := bytes.NewBuffer(src)
rc := ioutil.NopCloser(rb)
drc, prc := NewDualReadCloser(rc, true)
drc, prc := NewDualReadCloser(nil, rc, true)
rc = drc
dst := make([]byte, len(src))

Expand Down

0 comments on commit 3c3e412

Please sign in to comment.