Skip to content

Commit

Permalink
feature: add meta server for handling prometheus metrics and pprof by…
Browse files Browse the repository at this point in the history
… yurttunnel (#253)
  • Loading branch information
rambohe-ch authored Apr 9, 2021
1 parent 6672287 commit 02aa604
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/yurt-tunnel-agent/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
TunnelServerAddr string
Client kubernetes.Interface
AgentIdentifiers string
AgentMetaAddr string
}

type completedConfig struct {
Expand Down
12 changes: 11 additions & 1 deletion cmd/yurt-tunnel-agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package options
import (
"errors"
"fmt"
"net"
"os"
"strings"

"github.com/openyurtio/openyurt/cmd/yurt-tunnel-agent/app/config"
"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurttunnel/constants"
kubeutil "github.com/openyurtio/openyurt/pkg/yurttunnel/kubernetes"

"github.com/spf13/pflag"
Expand All @@ -42,11 +44,16 @@ type AgentOptions struct {
KubeConfig string
Version bool
AgentIdentifiers string
MetaHost string
MetaPort string
}

// NewAgentOptions creates a new AgentOptions with a default config.
func NewAgentOptions() *AgentOptions {
o := &AgentOptions{}
o := &AgentOptions{
MetaHost: "127.0.0.1",
MetaPort: constants.YurttunnelAgentMetaPort,
}

return o
}
Expand Down Expand Up @@ -83,6 +90,8 @@ func (o *AgentOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ApiserverAddr, "apiserver-addr", o.ApiserverAddr, "A reachable address of the apiserver.")
fs.StringVar(&o.KubeConfig, "kube-config", o.KubeConfig, "Path to the kubeconfig file.")
fs.StringVar(&o.AgentIdentifiers, "agent-identifiers", o.AgentIdentifiers, "The identifiers of the agent, which will be used by the server when choosing agent.")
fs.StringVar(&o.MetaHost, "meta-host", o.MetaHost, "The ip address on which listen for --meta-port port.")
fs.StringVar(&o.MetaPort, "meta-port", o.MetaPort, "The port on which to serve HTTP requests like profling, metrics")
}

// agentIdentifiersIsValid verify agent identifiers are valid or not.
Expand Down Expand Up @@ -118,6 +127,7 @@ func (o *AgentOptions) Config() (*config.Config, error) {
NodeIP: o.NodeIP,
TunnelServerAddr: o.TunnelServerAddr,
AgentIdentifiers: o.AgentIdentifiers,
AgentMetaAddr: net.JoinHostPort(o.MetaHost, o.MetaPort),
}

if len(c.AgentIdentifiers) == 0 {
Expand Down
6 changes: 5 additions & 1 deletion cmd/yurt-tunnel-agent/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurttunnel/pki"
"github.com/openyurtio/openyurt/pkg/yurttunnel/pki/certmanager"
"github.com/openyurtio/openyurt/pkg/yurttunnel/server/serveraddr"
"github.com/openyurtio/openyurt/pkg/yurttunnel/util"

"github.com/spf13/cobra"
"k8s.io/client-go/util/certificate"
Expand All @@ -44,7 +45,7 @@ func NewYurttunnelAgentCommand(stopCh <-chan struct{}) *cobra.Command {
fmt.Printf("%s: %#v\n", projectinfo.GetAgentName(), projectinfo.Get())
return nil
}
klog.Infof("%s version: %#v\n", projectinfo.GetAgentName(), projectinfo.Get())
klog.Infof("%s version: %#v", projectinfo.GetAgentName(), projectinfo.Get())

if err := agentOptions.Validate(); err != nil {
return err
Expand Down Expand Up @@ -101,6 +102,9 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
ta := agent.NewTunnelAgent(tlsCfg, tunnelServerAddr, cfg.NodeName, cfg.AgentIdentifiers)
ta.Run(stopCh)

// 5. start meta server
util.RunMetaServer(cfg.AgentMetaAddr)

<-stopCh
return nil
}
1 change: 1 addition & 0 deletions cmd/yurt-tunnel-server/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
ListenAddrForAgent string
ListenAddrForMaster string
ListenInsecureAddrForMaster string
ListenMetaAddr string
RootCert *x509.CertPool
Client kubernetes.Interface
SharedInformerFactory informers.SharedInformerFactory
Expand Down
7 changes: 5 additions & 2 deletions cmd/yurt-tunnel-server/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ServerOptions struct {
TunnelAgentConnectPort string
SecurePort string
InsecurePort string
MetaPort string
ServerCount int
ProxyStrategy string
}
Expand All @@ -63,6 +64,7 @@ func NewServerOptions() *ServerOptions {
TunnelAgentConnectPort: constants.YurttunnelServerAgentPort,
SecurePort: constants.YurttunnelServerMasterPort,
InsecurePort: constants.YurttunnelServerMasterInsecurePort,
MetaPort: constants.YurttunnelServerMetaPort,
ProxyStrategy: string(server.ProxyStrategyDestHost),
}
return o
Expand Down Expand Up @@ -92,7 +94,8 @@ func (o *ServerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.ProxyStrategy, "proxy-strategy", o.ProxyStrategy, "The strategy of proxying requests from tunnel server to agent.")
fs.StringVar(&o.TunnelAgentConnectPort, "tunnel-agent-connect-port", o.TunnelAgentConnectPort, "The port on which to serve tcp packets from tunnel agent")
fs.StringVar(&o.SecurePort, "secure-port", o.SecurePort, "The port on which to serve HTTPS requests from cloud clients like prometheus")
fs.StringVar(&o.InsecurePort, "insecure-port", o.InsecurePort, "The port on which to server HTTP requests from cloud clients like metrics-server")
fs.StringVar(&o.InsecurePort, "insecure-port", o.InsecurePort, "The port on which to serve HTTP requests from cloud clients like metrics-server")
fs.StringVar(&o.MetaPort, "meta-port", o.MetaPort, "The port on which to serve HTTP requests like profling, metrics")
}

func (o *ServerOptions) Config() (*config.Config, error) {
Expand Down Expand Up @@ -125,7 +128,7 @@ func (o *ServerOptions) Config() (*config.Config, error) {
cfg.ListenAddrForAgent = net.JoinHostPort(o.BindAddr, o.TunnelAgentConnectPort)
cfg.ListenAddrForMaster = net.JoinHostPort(o.BindAddr, o.SecurePort)
cfg.ListenInsecureAddrForMaster = net.JoinHostPort(o.InsecureBindAddr, o.InsecurePort)

cfg.ListenMetaAddr = net.JoinHostPort(o.InsecureBindAddr, o.MetaPort)
cfg.RootCert, err = pki.GenRootCertPool(o.KubeConfig, constants.YurttunnelCAFile)
if err != nil {
return nil, fmt.Errorf("fail to generate the rootCertPool: %s", err)
Expand Down
6 changes: 5 additions & 1 deletion cmd/yurt-tunnel-server/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurttunnel/pki"
"github.com/openyurtio/openyurt/pkg/yurttunnel/pki/certmanager"
"github.com/openyurtio/openyurt/pkg/yurttunnel/server"
"github.com/openyurtio/openyurt/pkg/yurttunnel/util"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -48,7 +49,7 @@ func NewYurttunnelServerCommand(stopCh <-chan struct{}) *cobra.Command {
fmt.Printf("%s: %#v\n", projectinfo.GetServerName(), projectinfo.Get())
return nil
}
klog.Infof("%s version: %#v\n", projectinfo.GetServerName(), projectinfo.Get())
klog.Infof("%s version: %#v", projectinfo.GetServerName(), projectinfo.Get())

if err := serverOptions.Validate(); err != nil {
return err
Expand Down Expand Up @@ -136,6 +137,9 @@ func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
return err
}

// 7. start meta server
util.RunMetaServer(cfg.ListenMetaAddr)

<-stopCh
return nil
}
2 changes: 1 addition & 1 deletion pkg/yurthub/profile/profile.go → pkg/profile/profile.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 The OpenYurt Authors.
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"net/http"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/profile"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
"github.com/openyurtio/openyurt/pkg/yurthub/profile"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down
2 changes: 2 additions & 0 deletions pkg/yurttunnel/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
YurttunnelServerAgentPort = "10262"
YurttunnelServerMasterPort = "10263"
YurttunnelServerMasterInsecurePort = "10264"
YurttunnelServerMetaPort = "10265"
YurttunnelAgentMetaPort = "10266"
YurttunnelServerServiceNs = "kube-system"
YurttunnelServerServiceName = "x-tunnel-server-svc"
YurttunnelServerAgentPortName = "tcp"
Expand Down
6 changes: 6 additions & 0 deletions pkg/yurttunnel/handlerwrapper/tracerequest/tracereq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/klog/v2"

hw "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper"
"github.com/openyurtio/openyurt/pkg/yurttunnel/server/metrics"
)

// TraceReqMiddleware prints request information when start/stop
Expand All @@ -47,6 +48,11 @@ func (trm *traceReqMiddleware) WrapHandler(handler http.Handler) http.Handler {

req.URL.Scheme = scheme
req.URL.Host = req.Host

// observe metrics
metrics.Metrics.IncInFlightRequests(req.Method, req.URL.Path)
defer metrics.Metrics.DecInFlightRequests(req.Method, req.URL.Path)

klog.V(2).Infof("start handling request %s %s, from %s to %s",
req.Method, req.URL.String(), req.RemoteAddr, req.Host)
start := time.Now()
Expand Down
2 changes: 2 additions & 0 deletions pkg/yurttunnel/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilnet "k8s.io/utils/net"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurttunnel/server/metrics"
)

const (
Expand Down Expand Up @@ -258,6 +259,7 @@ func (im *iptablesManager) getIPOfNodesWithoutAgent() []string {
}

klog.V(4).Infof("nodes without %s: %s", projectinfo.GetAgentName(), strings.Join(nodesIP, ","))
metrics.Metrics.ObserveCloudNodes(len(nodesIP))
return nodesIP
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/yurttunnel/server/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
)

var (
supportedHeaders = []string{"X-Tunnel-Proxy-Host", "User-Agent"}
proxyHostHeaderKey = "X-Tunnel-Proxy-Host"
supportedHeaders = []string{proxyHostHeaderKey, "User-Agent"}
HeaderTransferEncoding = "Transfer-Encoding"
HeaderChunked = "chunked"
)
Expand Down Expand Up @@ -275,9 +276,9 @@ func serveRequest(tunnelConn net.Conn, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
select {
case <-stopCh:
klog.V(2).Infof("chunked request(%s) normally exit", r.URL.String())
klog.V(3).Infof("chunked request(%s) normally exit", r.URL.String())
case <-ctx.Done():
klog.Errorf("chunked request(%s) closed by cloud client, %v", r.URL.String(), ctx.Err())
klog.V(2).Infof("chunked request(%s) to agent(%s) closed by cloud client, %v", r.URL.String(), r.Header.Get(proxyHostHeaderKey), ctx.Err())
// close connection with tunnel
conn.Close()
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/yurttunnel/server/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2020 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"strings"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/prometheus/client_golang/prometheus"
)

var (
namespace = strings.ReplaceAll(projectinfo.GetTunnelName(), "-", "_")
subsystem = "server"
)

var (
// Metrics provides access to all tunnel server metrics.
Metrics = newTunnelServerMetrics()
)

type TunnelServerMetrics struct {
proxyingRequestsCollector *prometheus.GaugeVec
proxyingRequestsGauge prometheus.Gauge
cloudNodeGauge prometheus.Gauge
}

func newTunnelServerMetrics() *TunnelServerMetrics {
proxyingRequestsCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "in_proxy_requests",
Help: "how many http requests are proxying by tunnel server",
},
[]string{"verb", "path"})
proxyingRequestsGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_in_proxy_requests",
Help: "the number of http requests are proxying by tunnel server",
})
cloudNodeGauge := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_nodes_counter",
Help: "counter of cloud nodes that do not run tunnel agent",
})

prometheus.MustRegister(proxyingRequestsCollector)
prometheus.MustRegister(proxyingRequestsGauge)
prometheus.MustRegister(cloudNodeGauge)
return &TunnelServerMetrics{
proxyingRequestsCollector: proxyingRequestsCollector,
proxyingRequestsGauge: proxyingRequestsGauge,
cloudNodeGauge: cloudNodeGauge,
}
}

func (tsm *TunnelServerMetrics) Reset() {
tsm.proxyingRequestsCollector.Reset()
tsm.proxyingRequestsGauge.Set(float64(0))
tsm.cloudNodeGauge.Set(float64(0))
}

func (tsm *TunnelServerMetrics) IncInFlightRequests(verb, path string) {
tsm.proxyingRequestsCollector.WithLabelValues(verb, path).Inc()
tsm.proxyingRequestsGauge.Inc()
}

func (tsm *TunnelServerMetrics) DecInFlightRequests(verb, path string) {
tsm.proxyingRequestsCollector.WithLabelValues(verb, path).Dec()
tsm.proxyingRequestsGauge.Dec()
}

func (tsm *TunnelServerMetrics) ObserveCloudNodes(cnt int) {
tsm.cloudNodeGauge.Set(float64(cnt))
}
35 changes: 35 additions & 0 deletions pkg/yurttunnel/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import (
"net/http"

"github.com/openyurtio/openyurt/pkg/profile"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog/v2"

"github.com/gorilla/mux"
)

// RunMetaServer start a http server for serving metrics and pprof requests.
func RunMetaServer(addr string) {
muxHandler := mux.NewRouter()
muxHandler.Handle("/metrics", promhttp.Handler())

// register handler for pprof
profile.Install(muxHandler)

metaServer := &http.Server{
Addr: addr,
Handler: muxHandler,
MaxHeaderBytes: 1 << 20,
}

klog.InfoS("start handling meta requests(metrics/pprof)", "server endpoint", addr)
go func() {
err := metaServer.ListenAndServe()
if err != nil {
klog.ErrorS(err, "meta server could not listen")
}
klog.InfoS("meta server stopped listening", "server endpoint", addr)
}()
}

0 comments on commit 02aa604

Please sign in to comment.