Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add traffic(from cloud to edge) collector metrics for yurthub #398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/yurthub/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type HubMetrics struct {
inFlightRequestsGauge prometheus.Gauge
rejectedRequestsCounter prometheus.Counter
closableConnsCollector *prometheus.GaugeVec
proxyTrafficCollector *prometheus.CounterVec
}

func newHubMetrics() *HubMetrics {
Expand Down Expand Up @@ -80,17 +81,27 @@ func newHubMetrics() *HubMetrics {
Help: "collector of underlay tcp connection from hub agent to remote server",
},
[]string{"server"})
proxyTrafficCollector := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "proxy_traffic_collector",
Help: "collector of proxy response traffic by hub agent(unit: byte)",
},
[]string{"client", "verb", "resource", "subresources"})
prometheus.MustRegister(serversHealthyCollector)
prometheus.MustRegister(inFlightRequestsCollector)
prometheus.MustRegister(inFlightRequestsGauge)
prometheus.MustRegister(rejectedRequestsCounter)
prometheus.MustRegister(closableConnsCollector)
prometheus.MustRegister(proxyTrafficCollector)
return &HubMetrics{
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
inFlightRequestsGauge: inFlightRequestsGauge,
rejectedRequestsCounter: rejectedRequestsCounter,
closableConnsCollector: closableConnsCollector,
proxyTrafficCollector: proxyTrafficCollector,
}
}

Expand All @@ -99,6 +110,7 @@ func (hm *HubMetrics) Reset() {
hm.inFlightRequestsCollector.Reset()
hm.inFlightRequestsGauge.Set(float64(0))
hm.closableConnsCollector.Reset()
hm.proxyTrafficCollector.Reset()
}

func (hm *HubMetrics) ObserveServerHealthy(server string, status int) {
Expand Down Expand Up @@ -130,3 +142,9 @@ func (hm *HubMetrics) DecClosableConns(server string) {
func (hm *HubMetrics) SetClosableConns(server string, cnt int) {
hm.closableConnsCollector.WithLabelValues(server).Set(float64(cnt))
}

func (hm *HubMetrics) AddProxyTrafficCollector(client, verb, resource, subresource string, size int) {
if size > 0 {
hm.proxyTrafficCollector.WithLabelValues(client, verb, resource, subresource).Add(float64(size))
}
}
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (lp *LocalProxy) localPost(w http.ResponseWriter, req *http.Request) error
ctx = util.WithRespContentType(ctx, reqContentType)
req = req.WithContext(ctx)
stopCh := make(chan struct{})
rc, prc := util.NewDualReadCloser(req.Body, false)
rc, prc := util.NewDualReadCloser(req, req.Body, false)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
klog.V(2).Infof("cache events when cluster is unhealthy, %v", lp.cacheMgr.CacheResponse(req, prc, stopCh))
}(req, prc, stopCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {
}
}

rc, prc := util.NewDualReadCloser(resp.Body, true)
rc, prc := util.NewDualReadCloser(req, resp.Body, true)
go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) {
err := rp.cacheMgr.CacheResponse(req, prc, stopCh)
if err != nil && err != io.EOF && err != context.Canceled {
Expand Down
21 changes: 18 additions & 3 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"

"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
Expand Down Expand Up @@ -188,9 +190,10 @@ func Err(err error, w http.ResponseWriter, req *http.Request) {
}

// NewDualReadCloser create an dualReadCloser object
func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
func NewDualReadCloser(req *http.Request, rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {
pr, pw := io.Pipe()
dr := &dualReadCloser{
req: req,
rc: rc,
pw: pw,
isRespBody: isRespBody,
Expand All @@ -200,8 +203,9 @@ func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.Rea
}

type dualReadCloser struct {
rc io.ReadCloser
pw *io.PipeWriter
req *http.Request
rc io.ReadCloser
pw *io.PipeWriter
// isRespBody shows rc(is.ReadCloser) is a response.Body
// or not(maybe a request.Body). if it is true(it's a response.Body),
// we should close the response body in Close func, else not,
Expand All @@ -211,6 +215,17 @@ type dualReadCloser struct {

// Read read data into p and write into pipe
func (dr *dualReadCloser) Read(p []byte) (n int, err error) {
defer func() {
if dr.req != nil && dr.isRespBody {
ctx := dr.req.Context()
info, _ := apirequest.RequestInfoFrom(ctx)
if info.IsResourceRequest {
comp, _ := ClientComponentFrom(ctx)
metrics.Metrics.AddProxyTrafficCollector(comp, info.Verb, info.Resource, info.Subresource, n)
}
}
}()

n, err = dr.rc.Read(p)
if n > 0 {
if n, err := dr.pw.Write(p[:n]); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDualReader(t *testing.T) {
src := []byte("hello, world")
rb := bytes.NewBuffer(src)
rc := ioutil.NopCloser(rb)
drc, prc := NewDualReadCloser(rc, true)
drc, prc := NewDualReadCloser(nil, rc, true)
rc = drc
dst1 := make([]byte, len(src))
dst2 := make([]byte, len(src))
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestDualReaderByPreClose(t *testing.T) {
src := []byte("hello, world")
rb := bytes.NewBuffer(src)
rc := ioutil.NopCloser(rb)
drc, prc := NewDualReadCloser(rc, true)
drc, prc := NewDualReadCloser(nil, rc, true)
rc = drc
dst := make([]byte, len(src))

Expand Down