Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: divide the yurthubServer into hubServer and proxyServert #237

Merged
merged 1 commit into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"
"net"
"net/url"
"strings"

Expand All @@ -15,8 +16,8 @@ import (
type YurtHubConfiguration struct {
LBMode string
RemoteServers []*url.URL
YurtHubHost string
YurtHubPort int
YurtHubServerAddr string
YurtHubProxyServerAddr string
GCFrequency int
CertMgrMode string
NodeName string
Expand All @@ -26,6 +27,7 @@ type YurtHubConfiguration struct {
MaxRequestInFlight int
JoinToken string
RootDir string
EnableProfiling bool
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -35,11 +37,13 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
return nil, err
}

hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort)
proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort)
cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
YurtHubHost: options.YurtHubHost,
YurtHubPort: options.YurtHubPort,
YurtHubServerAddr: hubServerAddr,
YurtHubProxyServerAddr: proxyServerAddr,
GCFrequency: options.GCFrequency,
CertMgrMode: options.CertMgrMode,
NodeName: options.NodeName,
Expand All @@ -49,6 +53,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
MaxRequestInFlight: options.MaxRequestInFlight,
JoinToken: options.JoinToken,
RootDir: options.RootDir,
EnableProfiling: options.EnableProfiling,
}

return cfg, nil
Expand Down
12 changes: 9 additions & 3 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
type YurtHubOptions struct {
ServerAddr string
YurtHubHost string
YurtHubPort int
YurtHubPort string
YurtHubProxyPort string
GCFrequency int
CertMgrMode string
NodeName string
Expand All @@ -25,13 +26,15 @@ type YurtHubOptions struct {
JoinToken string
RootDir string
Version bool
EnableProfiling bool
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
func NewYurtHubOptions() *YurtHubOptions {
o := &YurtHubOptions{
YurtHubHost: "127.0.0.1",
YurtHubPort: 10261,
YurtHubProxyPort: "10261",
YurtHubPort: "10267",
GCFrequency: 120,
CertMgrMode: "hubself",
LBMode: "rr",
Expand All @@ -40,6 +43,7 @@ func NewYurtHubOptions() *YurtHubOptions {
HeartbeatTimeoutSeconds: 2,
MaxRequestInFlight: 250,
RootDir: filepath.Join("/var/lib/", projectinfo.GetHubName()),
EnableProfiling: true,
}

return o
Expand Down Expand Up @@ -69,7 +73,8 @@ func ValidateOptions(options *YurtHubOptions) error {
// AddFlags returns flags for a specific yurthub by section name
func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.YurtHubHost, "bind-address", o.YurtHubHost, "the IP address on which to listen for the --serve-port port.")
fs.IntVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP.")
fs.StringVar(&o.YurtHubPort, "serve-port", o.YurtHubPort, "the port on which to serve HTTP requests(like profiling, metrics) for hub agent.")
fs.StringVar(&o.YurtHubProxyPort, "proxy-port", o.YurtHubProxyPort, "the port on which to proxy HTTP requests to kube-apiserver")
fs.StringVar(&o.ServerAddr, "server-addr", o.ServerAddr, "the address of Kubernetes kube-apiserver,the format is: \"server1,server2,...\"")
fs.StringVar(&o.CertMgrMode, "cert-mgr-mode", o.CertMgrMode, "the cert manager mode, kubelet: use certificates that belongs to kubelet, hubself: auto generate client cert for hub agent.")
fs.IntVar(&o.GCFrequency, "gc-frequency", o.GCFrequency, "the frequency to gc cache in storage(unit: minute).")
Expand All @@ -82,4 +87,5 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.JoinToken, "join-token", o.JoinToken, "the Join token for bootstrapping hub agent when --cert-mgr-mode=hubself.")
fs.StringVar(&o.RootDir, "root-dir", o.RootDir, "directory path for managing hub agent files(pki, cache etc).")
fs.BoolVar(&o.Version, "version", o.Version, "print the version information.")
fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
klog.Infof("%d. new %s server and begin to serve, proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubServerAddr)
s := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler)
s.Run()
<-stopCh
Expand Down
4 changes: 2 additions & 2 deletions config/yaml-template/yurthub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ spec:
- name: pem-dir
mountPath: /var/lib/kubelet/pki
command:
- __project_prefix__hub
- __project_prefix__hub
- --v=2
- --server-addr=__server_addr__
- --node-name=$(NODE_NAME)
livenessProbe:
httpGet:
host: 127.0.0.1
path: /v1/healthz
port: 10261
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
failureThreshold: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurtctl/util/edgenode/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ spec:
httpGet:
host: 127.0.0.1
path: /v1/healthz
port: 10261
port: 10267
initialDelaySeconds: 300
periodSeconds: 5
failureThreshold: 3
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (

// Install adds the Profiling webservice to the given mux.
func Install(c *mux.Router) {
c.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))
c.HandleFunc("/debug/pprof/", http.HandlerFunc(pprof.Index))
c.HandleFunc("/debug/pprof/profile", pprof.Profile)
c.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
c.HandleFunc("/debug/pprof/trace", pprof.Trace)
c.HandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))
c.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}

// redirectTo redirects request to a certain destination.
Expand Down
59 changes: 30 additions & 29 deletions pkg/yurthub/server/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,43 @@ import (
"net/http"

"github.com/openyurtio/openyurt/cmd/yurthub/app/config"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/interfaces"
)

const (
tokenKey = "jointoken"
)

// updateToken update bootstrap token in the bootstrap-hub.conf file
// in order to update node certificate when both node certificate and
// old join token expires
func (s *yurtHubServer) updateToken(w http.ResponseWriter, r *http.Request) {
tokens := make(map[string]string)
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&tokens)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "could not decode tokens, %v", err)
return
}
// updateTokenHandler returns a http handler that update bootstrap token in the bootstrap-hub.conf file
// in order to update node certificate when both node certificate and old join token expires
func updateTokenHandler(certificateMgr interfaces.YurtCertificateManager) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokens := make(map[string]string)
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&tokens)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "could not decode tokens, %v", err)
return
}

joinToken := tokens[tokenKey]
if len(joinToken) == 0 {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "no join token is set")
return
}
joinToken := tokens[tokenKey]
if len(joinToken) == 0 {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "no join token is set")
return
}

err = s.certificateMgr.Update(&config.YurtHubConfiguration{JoinToken: joinToken})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "could not update bootstrap token, %v", err)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "update bootstrap token successfully")
return
err = certificateMgr.Update(&config.YurtHubConfiguration{JoinToken: joinToken})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "could not update bootstrap token, %v", err)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "update bootstrap token successfully")
return
})
}
64 changes: 39 additions & 25 deletions pkg/yurthub/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,69 @@ type Server interface {
Run()
}

// yutHubServer includes hubServer and proxyServer,
// and hubServer handles requests by hub agent itself, like profiling, metrics, healthz
// and proxyServer does not handle requests locally and proxy requests to kube-apiserver
type yurtHubServer struct {
mux *mux.Router
certificateMgr interfaces.YurtCertificateManager
proxyHandler http.Handler
cfg *config.YurtHubConfiguration
hubServer *http.Server
proxyServer *http.Server
}

// NewYurtHubServer creates a Server object
func NewYurtHubServer(cfg *config.YurtHubConfiguration,
certificateMgr interfaces.YurtCertificateManager,
proxyHandler http.Handler) Server {
return &yurtHubServer{
mux: mux.NewRouter(),
certificateMgr: certificateMgr,
proxyHandler: proxyHandler,
cfg: cfg,
hubMux := mux.NewRouter()
registerHandlers(hubMux, cfg, certificateMgr)
hubServer := &http.Server{
Addr: cfg.YurtHubServerAddr,
Handler: hubMux,
MaxHeaderBytes: 1 << 20,
}
}

func (s *yurtHubServer) Run() {
s.registerHandler()
proxyServer := &http.Server{
Addr: cfg.YurtHubProxyServerAddr,
Handler: proxyHandler,
MaxHeaderBytes: 1 << 20,
}

server := &http.Server{
Addr: fmt.Sprintf("%s:%d", s.cfg.YurtHubHost, s.cfg.YurtHubPort),
Handler: s.mux,
return &yurtHubServer{
hubServer: hubServer,
proxyServer: proxyServer,
}
}

err := server.ListenAndServe()
// Run will start hub server and proxy server
func (s *yurtHubServer) Run() {
go func() {
err := s.hubServer.ListenAndServe()
if err != nil {
panic(err)
}
}()

err := s.proxyServer.ListenAndServe()
if err != nil {
panic(err)
}
}

func (s *yurtHubServer) registerHandler() {
// registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token.
func registerHandlers(c *mux.Router, cfg *config.YurtHubConfiguration, certificateMgr interfaces.YurtCertificateManager) {
// register handlers for update join token
s.mux.HandleFunc("/v1/token", s.updateToken).Methods("POST", "PUT")
c.Handle("/v1/token", updateTokenHandler(certificateMgr)).Methods("POST", "PUT")

// register handler for health check
s.mux.HandleFunc("/v1/healthz", s.healthz).Methods("GET")
c.HandleFunc("/v1/healthz", healthz).Methods("GET")

// register handler for profile
profile.Install(s.mux)

// attention: "/" route must be put at the end of registerHandler
// register handlers for proxy to kube-apiserver
s.mux.PathPrefix("/").Handler(s.proxyHandler)
if cfg.EnableProfiling {
profile.Install(c)
}
}

func (s *yurtHubServer) healthz(w http.ResponseWriter, r *http.Request) {
// healthz returns ok for healthz request
func healthz(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "OK")
}