diff --git a/cmd/kubenest/node-agent/app/client.go b/cmd/kubenest/node-agent/app/client.go new file mode 100644 index 000000000..601bd885c --- /dev/null +++ b/cmd/kubenest/node-agent/app/client.go @@ -0,0 +1,320 @@ +package app + +import ( + "bufio" + "crypto/tls" + "encoding/base64" + "errors" + "fmt" + "github.com/gorilla/websocket" + "github.com/spf13/cobra" + "net/http" + "net/url" + "os" + "os/signal" + "path/filepath" + "sync" +) + +var ( + ClientCmd = &cobra.Command{ + Use: "client", + Short: "A WebSocket client CLI tool to execute commands and file uploads", + } + shCmd = &cobra.Command{ + Use: "sh [command]", + Short: "Execute a command via WebSocket", + RunE: cmdCmdRun, + Example: `node-agent client sh -u=[user] -p=[pass] -a="127.0.0.1:5678" -o ls -r "-l"`, + } + uploadCmd = &cobra.Command{ + Use: "upload", + Short: "Upload a file via WebSocket", + RunE: cmdUploadRun, + Example: `node-agent upload -u=[user] -p=[pass] -a="127.0.0.1:5678" -f /tmp -n=app.go`, + } + ttyCmd = &cobra.Command{ + Use: "tty", + Short: "Execute a command via WebSocket with TTY", + RunE: cmdTtyRun, + } + wg sync.WaitGroup +) +var uniqueValuesMap = make(map[string]bool) +var dialer = websocket.DefaultDialer + +func init() { + dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + + ClientCmd.PersistentFlags().StringSliceVarP(&wsAddr, "addr", "a", []string{}, "WebSocket address (e.g., host1:port1,host2:port2)") + ClientCmd.MarkPersistentFlagRequired("addr") + + // PreRunE check param + ClientCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + for _, value := range wsAddr { + if _, exists := uniqueValuesMap[value]; exists { + return errors.New("duplicate values are not allowed") + } + uniqueValuesMap[value] = true + } + return nil + } + + shCmd.Flags().StringArrayVarP(¶ms, "param", "r", []string{}, "Command parameters") + shCmd.Flags().StringVarP(&operation, "operation", "o", "", "Operation to perform") + shCmd.MarkFlagRequired("addr") + + uploadCmd.Flags().StringVarP(&fileName, "name", "n", "", "Name of the file to upload") + uploadCmd.Flags().StringVarP(&filePath, "path", "f", "", "Path to the file to upload") + uploadCmd.MarkFlagRequired("name") + uploadCmd.MarkFlagRequired("path") + + ttyCmd.Flags().StringVarP(&operation, "operation", "o", "", "Operation to perform") + + ClientCmd.AddCommand(shCmd) + ClientCmd.AddCommand(uploadCmd) + ClientCmd.AddCommand(ttyCmd) +} + +func cmdTtyRun(cmd *cobra.Command, args []string) error { + headers := http.Header{ + "Authorization": {"Basic " + basicAuth(user, password)}, + } + cmdStr := fmt.Sprintf("command=%s", operation) + // execute one every wsAddr + for _, addr := range wsAddr { + wg.Add(1) + go func(addr string) { + defer wg.Done() + wsURL := fmt.Sprintf("wss://%s/tty/?%s", addr, cmdStr) + fmt.Println("Executing tty:", cmdStr, "on", addr) + err := connectTty(wsURL, headers) + if err != nil { + log.Errorf("failed to execute command: %v on %s: %v\n", err, addr, cmdStr) + } + }(addr) + } + wg.Wait() + return nil +} + +func connectTty(wsURL string, headers http.Header) error { + ws, resp, err := dialer.Dial(wsURL, headers) + defer wsRespClose(resp) + if err != nil { + return fmt.Errorf("WebSocket dial error: %v", err) + } + defer ws.Close() + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + inputChan := make(chan string) + go func() { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + inputChan <- scanner.Text() + } + if err := scanner.Err(); err != nil { + log.Fatalf("Scanner error: %v", err) + } + }() + done := make(chan struct{}) + // Read messages from the WebSocket server + go func() { + defer close(done) + for { + _, message, err := ws.ReadMessage() + if err != nil { + log.Infof("ReadMessage: %v", err) + interrupt <- os.Interrupt + return + } + fmt.Printf("%s", message) + } + }() + // Main event loop + for { + select { + case msg := <-inputChan: + // Send user input to the WebSocket server + if err := ws.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("%s\n", msg))); err != nil { + log.Infof("WriteMessage: %v", err) + return err + } + if msg == "exit" { + return nil + } + case <-interrupt: + // Cleanly close the connection on interrupt + log.Infof("Interrupt received, closing connection...") + if err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { + log.Infof("CloseMessage: %v", err) + return err + } + select { + case <-done: + } + return nil + } + } +} + +func cmdCmdRun(cmd *cobra.Command, args []string) error { + if len(operation) == 0 { + log.Errorf("operation is required") + return fmt.Errorf("operation is required") + } + if len(user) == 0 || len(password) == 0 { + log.Errorf("user and password are required") + return fmt.Errorf("user and password are required") + } + // use set to remove duplicate for wsAddr + return executeWebSocketCommand() +} + +func cmdUploadRun(cmd *cobra.Command, args []string) error { + return uploadFile(filePath, fileName) +} + +func executeWebSocketCommand() error { + headers := http.Header{ + "Authorization": {"Basic " + basicAuth(user, password)}, + } + cmdStr := fmt.Sprintf("command=%s", operation) + // Build params part of the URL + if len(params) > 1 { + paramsStr := "args=" + for _, param := range params { + paramsStr += param + "&&args=" + } + paramsStr = paramsStr[:len(paramsStr)-7] + cmdStr = fmt.Sprintf("command=%s&&%s", operation, paramsStr) + } + + // execute one every wsAddr + for _, addr := range wsAddr { + wg.Add(1) + go func(addr string) { + defer wg.Done() + wsURL := fmt.Sprintf("wss://%s/cmd/?%s", addr, cmdStr) + fmt.Println("Executing command:", cmdStr, "on", addr) + err := connectAndHandleMessages(wsURL, headers) + if err != nil { + log.Errorf("failed to execute command: %v on %s: %v\n", err, addr, cmdStr) + } + }(addr) + } + wg.Wait() + return nil +} + +func uploadFile(filePath, fileName string) error { + headers := http.Header{ + "Authorization": {"Basic " + basicAuth(user, password)}, + } + for _, addr := range wsAddr { + wg.Add(1) + go func(addr string) { + defer wg.Done() + wsURL := fmt.Sprintf("wss://%s/upload/?file_name=%s&file_path=%s", addr, url.QueryEscape(filepath.Base(fileName)), url.QueryEscape(filePath)) + fmt.Println("Uploading file:", fileName, "from", filePath, "to", addr) + err := connectAndSendFile(wsURL, headers, filePath, fileName) + if err != nil { + log.Errorf("failed to upload file: %v on %s: %v\n", err, addr, fileName) + } + }(addr) + } + wg.Wait() + return nil +} + +func wsRespClose(resp *http.Response) { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } +} + +func connectAndHandleMessages(wsURL string, headers http.Header) error { + ws, resp, err := dialer.Dial(wsURL, headers) + defer wsRespClose(resp) + if err != nil { + return fmt.Errorf("WebSocket dial error: %v", err) + } + defer ws.Close() + + handleMessages(ws) + return nil +} + +func connectAndSendFile(wsURL string, headers http.Header, filePath, fileName string) error { + ws, resp, err := dialer.Dial(wsURL, headers) + if err != nil { + return fmt.Errorf("WebSocket dial error: %v", err) + } + defer wsRespClose(resp) + defer ws.Close() + + sendFile(ws, fileName) + + handleMessages(ws) + return nil +} + +func basicAuth(user, password string) string { + auth := user + ":" + password + return base64.StdEncoding.EncodeToString([]byte(auth)) +} + +func handleMessages(ws *websocket.Conn) { + defer ws.Close() + for { + _, message, err := ws.ReadMessage() + if err != nil { + log.Println("Read message error:", err) + return + } + fmt.Printf("Received message: %s\n", message) + } +} + +func sendFile(ws *websocket.Conn, filePath string) { + //if file not exists, close connection + if _, err := os.Stat(filePath); os.IsNotExist(err) { + log.Errorf("File not exists: %v", err) + err := ws.WriteMessage(websocket.BinaryMessage, []byte("EOF")) + if err != nil { + log.Printf("Write message error: %v", err) + } + return + } + + file, err := os.Open(filePath) + if err != nil { + log.Errorf("File open error: %v", err) + } + defer file.Close() + // 指定每次读取的数据块大小 + bufferSize := 1024 // 例如每次读取 1024 字节 + buffer := make([]byte, bufferSize) + + reader := bufio.NewReader(file) + for { + n, err := reader.Read(buffer) + if err != nil { + // check if EOF + if err.Error() == "EOF" { + break + } + log.Errorf("failed to read file %v:", err) + return + } + dataToSend := buffer[:n] + + _ = ws.WriteMessage(websocket.BinaryMessage, dataToSend) + } + + err = ws.WriteMessage(websocket.BinaryMessage, []byte("EOF")) + log.Infof("send EOF ----") + if err != nil { + log.Errorf("Write message error: %v", err) + } +} diff --git a/cmd/kubenest/node-agent/app/root.go b/cmd/kubenest/node-agent/app/root.go new file mode 100644 index 000000000..407d3f7fc --- /dev/null +++ b/cmd/kubenest/node-agent/app/root.go @@ -0,0 +1,52 @@ +package app + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var ( + user string // username for authentication + password string // password for authentication +) + +var RootCmd = &cobra.Command{ + Use: "node-agent", + Short: "node-agent is a tool for node to start websocket server and client", + Long: `node-agent client for connect server to execute command and upload file to node + node-agent serve for start websocket server to receive message from client and download file from client`, + Run: func(cmd *cobra.Command, args []string) { + _ = cmd.Help() + }, +} + +func initConfig() { + // Tell Viper to automatically look for a .env file + viper.SetConfigFile("agent.env") + + // If a .env file is found, read it in. + if err := viper.ReadInConfig(); err != nil { + } + // set default value from agent.env + if len(user) == 0 { + user = viper.GetString("WEB_USER") + } + if len(password) == 0 { + password = viper.GetString("WEB_PASS") + } +} + +func init() { + cobra.OnInitialize(initConfig) + + RootCmd.PersistentFlags().StringVarP(&user, "user", "u", "", "Username for authentication") + RootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password for authentication") + // bind flags to viper + viper.BindPFlag("WEB_USER", RootCmd.PersistentFlags().Lookup("user")) + viper.BindPFlag("WEB_PASS", RootCmd.PersistentFlags().Lookup("password")) + // bind environment variables + viper.BindEnv("WEB_USER", "WEB_USER") + viper.BindEnv("WEB_PASS", "WEB_PASS") + RootCmd.AddCommand(ClientCmd) + RootCmd.AddCommand(ServeCmd) +} diff --git a/cmd/kubenest/node-agent/app.go b/cmd/kubenest/node-agent/app/serve.go similarity index 65% rename from cmd/kubenest/node-agent/app.go rename to cmd/kubenest/node-agent/app/serve.go index 60a6e88ea..c50db2ef1 100644 --- a/cmd/kubenest/node-agent/app.go +++ b/cmd/kubenest/node-agent/app/serve.go @@ -1,5 +1,4 @@ -// main.go -package main +package app import ( "bufio" @@ -7,67 +6,82 @@ import ( "crypto/tls" "encoding/base64" "errors" - "flag" "fmt" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "golang.org/x/term" + "io" "net/http" "net/url" "os" "os/exec" + "os/signal" "path/filepath" "strings" + "syscall" "time" + "github.com/creack/pty" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" ) var ( - addr = flag.String("addr", ":5678", "websocket service address") - certFile = flag.String("cert", "cert.pem", "SSL certificate file") - keyFile = flag.String("key", "key.pem", "SSL key file") - user = flag.String("user", "", "Username for authentication") - password = flag.String("password", "", "Password for authentication") - log = logrus.New() + ServeCmd = &cobra.Command{ + Use: "serve", + Short: "Start a WebSocket server", + RunE: serveCmdRun, + } + + wsAddr []string // websocket client connect address list + filePath string // the server path to save upload file + fileName string // local file to upload + params []string // New slice to hold multiple command parameters + operation string // operation for client to execute + certFile string // SSL certificate file + keyFile string // SSL key file + addr string // server listen address + log = logrus.New() ) -var upgrader = websocket.Upgrader{} // use default options +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} // use default options func init() { + // setup log log.Out = os.Stdout - - file, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + logFile, err := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err == nil { - log.Out = file + log.SetOutput(io.MultiWriter(os.Stdout, logFile)) } else { log.Info("Failed to log to file, using default stderr") } log.SetLevel(logrus.InfoLevel) + + // setup flags + ServeCmd.PersistentFlags().StringVarP(&addr, "addr", "a", ":5678", "websocket service address") + ServeCmd.PersistentFlags().StringVarP(&certFile, "cert", "c", "cert.pem", "SSL certificate file") + ServeCmd.PersistentFlags().StringVarP(&keyFile, "key", "k", "key.pem", "SSL key file") + } -func main() { - flag.Parse() - if *user == "" { - _user := os.Getenv("WEB_USER") - if _user != "" { - *user = _user - } - } - if *password == "" { - _password := os.Getenv("WEB_PASS") - if _password != "" { - *password = _password - } +func serveCmdRun(cmd *cobra.Command, args []string) error { + user := viper.GetString("WEB_USER") + password := viper.GetString("WEB_PASS") + if len(user) == 0 || len(password) == 0 { + log.Errorf("-user and -password are required %s %s", user, password) + return errors.New("-user and -password are required") } - if len(*user) == 0 || len(*password) == 0 { - flag.Usage() - log.Errorf("-user and -password are required %s %s", *user, *password) - return - } - start(*addr, *certFile, *keyFile, *user, *password) + return start(addr, certFile, keyFile, user, password) } // start server -func start(addr, certFile, keyFile, user, password string) { +func start(addr, certFile, keyFile, user, password string) error { passwordHash := sha256.Sum256([]byte(password)) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -119,6 +133,8 @@ func start(addr, certFile, keyFile, user, password string) { handleScript(conn, queryParams, []string{"python3", "-u"}) case strings.HasPrefix(r.URL.Path, "/sh"): handleScript(conn, queryParams, []string{"sh"}) + case strings.HasPrefix(r.URL.Path, "/tty"): + handleTty(conn, queryParams) default: _ = conn.WriteMessage(websocket.TextMessage, []byte("Invalid path")) } @@ -135,8 +151,87 @@ func start(addr, certFile, keyFile, user, password string) { TLSConfig: tlsConfig, ReadHeaderTimeout: 10 * time.Second, } + err := server.ListenAndServeTLS("", "") + if err != nil { + log.Errorf("failed to start server %v", err) + } + return err +} + +func handleTty(conn *websocket.Conn, queryParams url.Values) { + entrypoint := queryParams.Get("command") + if len(entrypoint) == 0 { + log.Errorf("command is required") + return + } + log.Infof("Executing command %s", entrypoint) + cmd := exec.Command(entrypoint) + ptmx, err := pty.Start(cmd) + if err != nil { + log.Errorf("failed to start command %v", err) + return + } + defer func() { + _ = ptmx.Close() + }() + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGWINCH) + go func() { + for range ch { + if err := pty.InheritSize(os.Stdin, ptmx); err != nil { + log.Errorf("error resizing pty: %s", err) + } + } + }() + ch <- syscall.SIGWINCH // Initial resize. + defer func() { signal.Stop(ch); close(ch) }() // Cleanup signals when done. + done := make(chan struct{}) + // Use a goroutine to copy PTY output to WebSocket + go func() { + buf := make([]byte, 1024) + for { + n, err := ptmx.Read(buf) + if err != nil { + log.Errorf("PTY read error: %v", err) + break + } + log.Printf("Received message: %s", buf[:n]) + if err := conn.WriteMessage(websocket.BinaryMessage, buf[:n]); err != nil { + log.Errorf("WebSocket write error: %v", err) + break + } + } + done <- struct{}{} + }() + // echo off + //ptmx.Write([]byte("stty -echo\n")) + // Set stdin in raw mode. + oldState, err := term.MakeRaw(int(ptmx.Fd())) + if err != nil { + panic(err) + } + defer func() { _ = term.Restore(int(ptmx.Fd()), oldState) }() // Best effort. + + // Use a goroutine to copy WebSocket input to PTY + go func() { + for { + _, message, err := conn.ReadMessage() + if err != nil { + log.Printf("read from websocket failed: %v, %s", err, string(message)) + break + } + log.Printf("Received message: %s", message) // Debugging line + if _, err := ptmx.Write(message); err != nil { // Ensure newline character for commands + log.Printf("PTY write error: %v", err) + break + } + } + // Signal the done channel when this goroutine finishes + done <- struct{}{} + }() - log.Errorf("failed to start server %v", server.ListenAndServeTLS("", "")) + // Wait for the done channel to be closed + <-done } func handleUpload(conn *websocket.Conn, params url.Values) { @@ -271,21 +366,22 @@ func handleScript(conn *websocket.Conn, params url.Values, command []string) { func execCmd(conn *websocket.Conn, command string, args []string) { // #nosec G204 cmd := exec.Command(command, args...) + log.Infof("Executing command: %s, %v", command, args) stdout, err := cmd.StdoutPipe() if err != nil { log.Warnf("Error obtaining command output pipe: %v", err) } defer stdout.Close() - if err := cmd.Start(); err != nil { - errStr := strings.ToLower(err.Error()) - log.Warnf("Error starting command: %v, %s", err, errStr) - if strings.Contains(errStr, "no such file") { - exitCode := 127 - _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", exitCode))) - } + stderr, err := cmd.StderrPipe() + if err != nil { + log.Warnf("Error obtaining command error pipe: %v", err) } + defer stderr.Close() + // Channel for signaling command completion + doneCh := make(chan struct{}) + defer close(doneCh) // processOutput go func() { scanner := bufio.NewScanner(stdout) @@ -294,7 +390,23 @@ func execCmd(conn *websocket.Conn, command string, args []string) { log.Warnf("%s", data) _ = conn.WriteMessage(websocket.TextMessage, data) } + scanner = bufio.NewScanner(stderr) + for scanner.Scan() { + data := scanner.Bytes() + log.Warnf("%s", data) + _ = conn.WriteMessage(websocket.TextMessage, data) + } + doneCh <- struct{}{} }() + if err := cmd.Start(); err != nil { + errStr := strings.ToLower(err.Error()) + log.Warnf("Error starting command: %v, %s", err, errStr) + _ = conn.WriteMessage(websocket.TextMessage, []byte(errStr)) + if strings.Contains(errStr, "no such file") { + exitCode := 127 + _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", exitCode))) + } + } // Wait for the command to finish if err := cmd.Wait(); err != nil { @@ -303,6 +415,7 @@ func execCmd(conn *websocket.Conn, command string, args []string) { log.Warnf("Command : %s exited with non-zero status: %v", command, exitError) } } + <-doneCh exitCode := cmd.ProcessState.ExitCode() log.Infof("Command : %s finished with exit code %d", command, exitCode) _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%d", exitCode))) diff --git a/cmd/kubenest/node-agent/app_test.go b/cmd/kubenest/node-agent/app/serve_test.go similarity index 58% rename from cmd/kubenest/node-agent/app_test.go rename to cmd/kubenest/node-agent/app/serve_test.go index 0c45d5387..10062a6fb 100644 --- a/cmd/kubenest/node-agent/app_test.go +++ b/cmd/kubenest/node-agent/app/serve_test.go @@ -1,9 +1,7 @@ -package main +package app import ( - "bufio" "crypto/tls" - "encoding/base64" "fmt" "net/http" "net/url" @@ -12,13 +10,8 @@ import ( "runtime" "testing" "time" - - "github.com/gorilla/websocket" ) -// create dialer -var dialer = *websocket.DefaultDialer - // test addr user pass var testAddr, username, pass string var headers http.Header @@ -43,11 +36,7 @@ func init() { go start(":5678", "cert.pem", "key.pem", username, pass) time.Sleep(10 * time.Second) } -func wsRespClose(resp *http.Response) { - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } -} + func TestCmd(t *testing.T) { fmt.Println("Command test") command := url.QueryEscape("ls -l") @@ -106,63 +95,3 @@ func TestPyScript(t *testing.T) { sendFile(ws, filepath.Join(parentDir, "count.py")) handleMessages(ws) } - -func basicAuth(username, password string) string { - auth := username + ":" + password - return base64.StdEncoding.EncodeToString([]byte(auth)) -} - -func handleMessages(ws *websocket.Conn) { - defer ws.Close() - for { - _, message, err := ws.ReadMessage() - if err != nil { - log.Println("Read message end :", err) - return - } - fmt.Printf("Received message: %s\n", message) - } -} - -func sendFile(ws *websocket.Conn, filePath string) { - //if file not exists, close connection - if _, err := os.Stat(filePath); os.IsNotExist(err) { - log.Printf("File not exists: %v", err) - err := ws.WriteMessage(websocket.BinaryMessage, []byte("EOF")) - if err != nil { - log.Printf("Write message error: %v", err) - } - return - } - - file, err := os.Open(filePath) - if err != nil { - log.Printf("File open error: %v", err) - } - defer file.Close() - // 指定每次读取的数据块大小 - bufferSize := 1024 // 例如每次读取 1024 字节 - buffer := make([]byte, bufferSize) - - reader := bufio.NewReader(file) - for { - n, err := reader.Read(buffer) - if err != nil { - // check if EOF - if err.Error() == "EOF" { - break - } - log.Printf("failed to read file %v:", err) - return - } - dataToSend := buffer[:n] - - _ = ws.WriteMessage(websocket.BinaryMessage, dataToSend) - } - - err = ws.WriteMessage(websocket.BinaryMessage, []byte("EOF")) - log.Printf("send EOF ----") - if err != nil { - log.Printf("Write message error: %v", err) - } -} diff --git a/cmd/kubenest/node-agent/main.go b/cmd/kubenest/node-agent/main.go new file mode 100644 index 000000000..d1aeddd31 --- /dev/null +++ b/cmd/kubenest/node-agent/main.go @@ -0,0 +1,12 @@ +package main + +import ( + "github.com/kosmos.io/kosmos/cmd/kubenest/node-agent/app" + "log" +) + +func main() { + if err := app.RootCmd.Execute(); err != nil { + log.Fatal(err) + } +} diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index fc750ce90..0cc626a97 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -202,6 +202,10 @@ spec: updateTime: format: date-time type: string + vipMap: + additionalProperties: + type: string + type: object type: object required: - spec diff --git a/deploy/virtual-cluster-components-manifest-cm.yaml b/deploy/virtual-cluster-components-manifest-cm.yaml index 449040b5a..60f69a752 100644 --- a/deploy/virtual-cluster-components-manifest-cm.yaml +++ b/deploy/virtual-cluster-components-manifest-cm.yaml @@ -4,6 +4,7 @@ data: [ {"name": "kube-proxy", "path": "/kosmos/manifest/kube-proxy/*.yaml"}, {"name": "calico", "path": "/kosmos/manifest/calico/*.yaml"}, + {"name": "keepalived", "path": "/kosmos/manifest/keepalived/*.yaml"}, ] host-core-dns-components: | [ diff --git a/deploy/virtual-cluster-vip-pool-cm.yaml b/deploy/virtual-cluster-vip-pool-cm.yaml new file mode 100644 index 000000000..940c63ab5 --- /dev/null +++ b/deploy/virtual-cluster-vip-pool-cm.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kosmos-vip-pool + namespace: kosmos-system +data: + vip-config.yaml: | + # can be use for vc, the ip formate is 192.168.0.1 and 192.168.0.2-192.168.0.10 + vipPool: + - 192.168.0.1-192.168.0.10 \ No newline at end of file diff --git a/go.mod b/go.mod index 592bf813e..46fe33f77 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/containerd/console v1.0.3 github.com/containerd/containerd v1.6.14 github.com/coreos/go-iptables v0.7.1-0.20231102141700-50d824baaa46 + github.com/creack/pty v1.1.11 github.com/docker/docker v24.0.6+incompatible github.com/evanphx/json-patch v4.12.0+incompatible github.com/go-logr/logr v1.2.3 @@ -24,6 +25,7 @@ require ( github.com/spf13/cast v1.6.0 github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 + github.com/spf13/viper v1.12.0 github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b golang.org/x/sys v0.12.0 golang.org/x/time v0.3.0 @@ -103,6 +105,7 @@ require ( github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -111,10 +114,12 @@ require ( github.com/klauspost/compress v1.11.13 // indirect github.com/leodido/go-urn v0.0.0-20181204092800-a67a23e1c1af // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect + github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/go-wordwrap v1.0.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/locker v1.0.1 // indirect github.com/moby/spdystream v0.2.0 // indirect github.com/moby/sys/mountinfo v0.6.2 // indirect @@ -131,6 +136,7 @@ require ( github.com/opencontainers/runtime-spec v1.1.0-rc.1 // indirect github.com/opencontainers/selinux v1.10.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect + github.com/pelletier/go-toml/v2 v2.0.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/projectcalico/go-json v0.0.0-20161128004156-6219dc7339ba // indirect github.com/projectcalico/go-yaml-wrapper v0.0.0-20191112210931-090425220c54 // indirect @@ -139,7 +145,10 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/spf13/afero v1.9.2 // indirect + github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect + github.com/subosito/gotenv v1.3.0 // indirect github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect github.com/xlab/treeprint v1.1.0 // indirect go.etcd.io/etcd/api/v3 v3.5.7 // indirect @@ -176,6 +185,7 @@ require ( google.golang.org/protobuf v1.28.2-0.20230118093459-a9481185b34d // indirect gopkg.in/go-playground/validator.v9 v9.27.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/cloud-provider v0.26.3 // indirect diff --git a/go.sum b/go.sum index 0c25b4561..9d65cf458 100644 --- a/go.sum +++ b/go.sum @@ -993,6 +993,7 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -1076,6 +1077,8 @@ github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuz github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= +github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -1117,6 +1120,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= @@ -1240,6 +1245,8 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU= +github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1347,6 +1354,7 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= +github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= @@ -1360,6 +1368,8 @@ github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB github.com/spf13/cobra v1.6.0 h1:42a0n6jwCot1pUmomAp4T7DeMD+20LFv4Q54pxLf2LI= github.com/spf13/cobra v1.6.0/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= +github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v1.0.1-0.20171106142849-4c012f6dcd95/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -1367,6 +1377,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ= +github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= @@ -1389,6 +1401,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI= +github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= @@ -2207,6 +2221,8 @@ gopkg.in/go-playground/validator.v9 v9.27.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWd gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= +gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index a91735ecc..bb1e7c8a0 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -137,6 +137,8 @@ type VirtualClusterStatus struct { Port int32 `json:"port,omitempty"` // +optional PortMap map[string]int32 `json:"portMap,omitempty"` + // +optional + VipMap map[string]string `json:"vipMap,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index f65fafbf1..f9d33523c 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -1974,6 +1974,13 @@ func (in *VirtualClusterStatus) DeepCopyInto(out *VirtualClusterStatus) { (*out)[key] = val } } + if in.VipMap != nil { + in, out := &in.VipMap, &out.VipMap + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 9eab67a6e..e8424dc8a 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -3462,6 +3462,21 @@ func schema_pkg_apis_kosmos_v1alpha1_VirtualClusterStatus(ref common.ReferenceCa }, }, }, + "vipMap": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, }, }, diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 5a1182115..78f710919 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -105,6 +105,17 @@ const ( ApiServerNetworkProxyAdminPortKey = "apiserver-network-proxy-admin-port" VirtualClusterPortNum = 5 + // vip + VipPoolConfigMapName = "kosmos-vip-pool" + VipPoolKey = "vip-config.yaml" + VcVipStatusKey = "vip-key" + VipKeepAlivedNodeLabelKey = "kosmos.io/keepalived-node" + VipKeepAlivedNodeLabelValue = "true" + VipKeepAlivedNodeRoleKey = "kosmos.io/keepalived-role" + VipKeepAlivedNodeRoleMaster = "master" + VipKeepalivedNodeRoleBackup = "backup" + VipKeepAlivedReplicas = 3 + ManifestComponentsConfigMap = "components-manifest-cm" WaitAllPodsRunningTimeoutSeconds = 1800 diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 88ef52e77..ff990d628 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + cs "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" @@ -34,6 +35,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) @@ -58,6 +60,10 @@ type HostPortPool struct { PortsPool []int32 `yaml:"portsPool"` } +type VipPool struct { + Vips []string `yaml:"vipPool"` +} + const ( VirtualClusterControllerFinalizer = "kosmos.io/virtualcluster-controller" RequeueTime = 10 * time.Second @@ -137,7 +143,21 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } case v1alpha1.AllNodeReady: - err := c.ensureAllPodsRunning(updatedCluster, constants.WaitAllPodsRunningTimeoutSeconds*time.Second) + name, namespace := request.Name, request.Namespace + // label node for keepalived + vcClient, err := tasks.GetVcClientset(c.RootClientSet, name, namespace) + if err != nil { + klog.Errorf("Get vc client failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Get vc client failed. err: %s", err.Error()) + } + reps, err := c.labelNode(vcClient) + if err != nil { + klog.Errorf("Label node for keepalived failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Label node for keepalived failed. err: %s", err.Error()) + } + klog.V(2).Infof("Label %d node for keepalived", reps) + + err = c.ensureAllPodsRunning(updatedCluster, constants.WaitAllPodsRunningTimeoutSeconds*time.Second) if err != nil { klog.Errorf("Check all pods running err: %s", err.Error()) updatedCluster.Status.Reason = err.Error() @@ -274,7 +294,11 @@ func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1al if err != nil { return errors.Wrap(err, "Error in assign host port!") } - + //Allocate vip + err = c.AllocateVip(virtualCluster) + if err != nil { + return errors.Wrap(err, "Error in allocate vip!") + } executer, err := NewExecutor(virtualCluster, c.Client, c.Config, kubeNestOptions) if err != nil { return err @@ -631,6 +655,25 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK return &hostPool, nil } +func GetVipFromConfigMap(client kubernetes.Interface, ns, cmName, key string) (*VipPool, error) { + vipPoolCm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + yamlData, exist := vipPoolCm.Data[key] + if !exist { + return nil, fmt.Errorf("key '%s' not found in vip pool ConfigMap '%s'", key, cmName) + } + + var vipPool VipPool + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + return nil, err + } + + return &vipPool, nil +} + func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { vcList := &v1alpha1.VirtualClusterList{} err := c.List(context.Background(), vcList) @@ -692,3 +735,90 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 return 0, err } + +// AllocateVip allocate vip for virtual cluster +// #nosec G602 +func (c *VirtualClusterInitController) AllocateVip(virtualCluster *v1alpha1.VirtualCluster) error { + c.lock.Lock() + defer c.lock.Unlock() + if len(virtualCluster.Status.VipMap) > 0 { + return nil + } + vipPool, err := GetVipFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.VipPoolConfigMapName, constants.VipPoolKey) + if err != nil { + return err + } + klog.V(4).InfoS("get vip pool", "vipPool", vipPool) + + vcList := &v1alpha1.VirtualClusterList{} + err = c.List(context.Background(), vcList) + if err != nil { + klog.Errorf("list virtual cluster error: %v", err) + return err + } + var allocatedVips []string + for _, vc := range vcList.Items { + for _, val := range vc.Status.VipMap { + allocatedVips = append(allocatedVips, val) + } + } + + vip, err := util.FindAvailableIP(vipPool.Vips, allocatedVips) + if err != nil { + klog.Errorf("find available vip error: %v", err) + return err + } + virtualCluster.Status.VipMap = make(map[string]string) + virtualCluster.Status.VipMap[constants.VcVipStatusKey] = vip + + return err +} + +func (c *VirtualClusterInitController) labelNode(client cs.Interface) (reps int, err error) { + replicas := constants.VipKeepAlivedReplicas + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return 0, fmt.Errorf("failed to list nodes, err: %w", err) + } + if len(nodes.Items) == 0 { + return 0, fmt.Errorf("no nodes found") + } + reps = replicas + // select replicas nodes + if replicas > len(nodes.Items) { + reps = len(nodes.Items) + } + randomIndex, err := util.SecureRandomInt(reps) + if err != nil { + klog.Errorf("failed to get random index for master node, err: %v", err) + return 0, err + } + // sub reps as nodes + subNodes := nodes.Items[:reps] + masterNode := nodes.Items[randomIndex] + + // label node + for _, node := range subNodes { + currentNode := node + labels := currentNode.GetLabels() + if currentNode.Name == masterNode.Name { + // label master + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepAlivedNodeRoleMaster + } else { + // label backup + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepalivedNodeRoleBackup + } + labels[constants.VipKeepAlivedNodeLabelKey] = constants.VipKeepAlivedNodeLabelValue + + // update label + currentNode.SetLabels(labels) + _, err := client.CoreV1().Nodes().Update(context.TODO(), ¤tNode, metav1.UpdateOptions{}) + if err != nil { + klog.V(2).Infof("Failed to update labels for node %s: %v", currentNode.Name, err) + return 0, err + } + klog.V(2).Infof("Successfully updated labels for node %s", currentNode.Name) + } + klog.V(2).InfoS("[vip] Successfully label all node") + return reps, nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index aa9585b3f..6135aac4e 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -36,6 +36,7 @@ type initData struct { externalIP string hostPort int32 hostPortMap map[string]int32 + vipMap map[string]string kubeNestOptions *v1alpha1.KubeNestConfiguration virtualCluster *v1alpha1.VirtualCluster ETCDStorageClass string @@ -189,6 +190,7 @@ func newRunData(opt *InitOptions) (*initData, error) { externalIP: opt.virtualCluster.Spec.ExternalIP, hostPort: opt.virtualCluster.Status.Port, hostPortMap: opt.virtualCluster.Status.PortMap, + vipMap: opt.virtualCluster.Status.VipMap, kubeNestOptions: opt.KubeNestOptions, virtualCluster: opt.virtualCluster, ETCDUnitSize: opt.KubeNestOptions.KubeInKubeConfig.ETCDUnitSize, @@ -247,6 +249,9 @@ func (i initData) DataDir() string { func (i initData) VirtualCluster() *v1alpha1.VirtualCluster { return i.virtualCluster } +func (i initData) VirtualClusterVersion() string { + return i.virtualClusterVersion.String() +} func (i initData) ExternalIP() string { return i.externalIP @@ -260,6 +265,10 @@ func (i initData) HostPortMap() map[string]int32 { return i.hostPortMap } +func (i initData) VipMap() map[string]string { + return i.vipMap +} + func (i initData) DynamicClient() *dynamic.DynamicClient { return i.dynamicClient } diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go index 60eee0313..50d4bb73e 100644 --- a/pkg/kubenest/tasks/anp.go +++ b/pkg/kubenest/tasks/anp.go @@ -311,7 +311,7 @@ func getVcDynamicClient(client clientset.Interface, name, namespace string) (dyn } return dynamicClient, nil } -func getVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { +func GetVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), fmt.Sprintf("%s-%s", name, constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -346,7 +346,7 @@ func runUploadProxyAgentCert(r workflow.RunData) error { certsData[c.CertName()] = c.CertData() } } - vcClient, err := getVcClientset(data.RemoteClient(), name, namespace) + vcClient, err := GetVcClientset(data.RemoteClient(), name, namespace) if err != nil { return fmt.Errorf("failed to get virtual cluster client, err: %w", err) } diff --git a/pkg/kubenest/tasks/cert.go b/pkg/kubenest/tasks/cert.go index 266170ab5..832a153fa 100644 --- a/pkg/kubenest/tasks/cert.go +++ b/pkg/kubenest/tasks/cert.go @@ -135,6 +135,7 @@ func mutateCertConfig(data InitData, cc *cert.CertConfig) error { ControlplaneAddr: data.ControlplaneAddress(), ClusterIps: data.ServiceClusterIp(), ExternalIP: data.ExternalIP(), + VipMap: data.VipMap(), }, cc) if err != nil { return err diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index 0336773c8..f9f5b08e7 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -22,6 +22,7 @@ type InitData interface { ExternalIP() string HostPort() int32 HostPortMap() map[string]int32 + VipMap() map[string]string DynamicClient() *dynamic.DynamicClient KubeNestOpt() *v1alpha1.KubeNestConfiguration PluginOptions() map[string]string diff --git a/pkg/kubenest/tasks/manifests_components.go b/pkg/kubenest/tasks/manifests_components.go index a68cf846f..cfee0dd04 100644 --- a/pkg/kubenest/tasks/manifests_components.go +++ b/pkg/kubenest/tasks/manifests_components.go @@ -59,7 +59,7 @@ func applyComponentsManifests(r workflow.RunData) error { if !ok { return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct") } - + keepalivedReplicas := constants.VipKeepAlivedReplicas secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -83,10 +83,18 @@ func applyComponentsManifests(r workflow.RunData) error { templatedMapping["KUBE_PROXY_KUBECONFIG"] = string(secret.Data[constants.KubeConfig]) imageRepository, _ := util.GetImageMessage() templatedMapping["ImageRepository"] = imageRepository - + if data.VipMap() != nil && data.VipMap()[constants.VcVipStatusKey] != "" { + templatedMapping["Vip"] = data.VipMap()[constants.VcVipStatusKey] + } + // use min replicas + nodeCount := data.VirtualCluster().Spec.PromotePolicies[0].NodeCount + if nodeCount < constants.VipKeepAlivedReplicas { + keepalivedReplicas = int(nodeCount) + } for k, v := range data.PluginOptions() { templatedMapping[k] = v } + templatedMapping["KeepalivedReplicas"] = keepalivedReplicas for _, component := range components { klog.V(2).Infof("Deploy component %s", component.Name) diff --git a/pkg/kubenest/util/cert/certs.go b/pkg/kubenest/util/cert/certs.go index 908d29c4b..087cedae8 100644 --- a/pkg/kubenest/util/cert/certs.go +++ b/pkg/kubenest/util/cert/certs.go @@ -43,6 +43,7 @@ type AltNamesMutatorConfig struct { ControlplaneAddr string ClusterIps []string ExternalIP string + VipMap map[string]string } func (config *CertConfig) defaultPublicKeyAlgorithm() { @@ -273,6 +274,11 @@ func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, e if len(cfg.ExternalIP) > 0 { appendSANsToAltNames(altNames, []string{cfg.ExternalIP}) } + if len(cfg.VipMap) > 0 { + for _, vip := range cfg.VipMap { + appendSANsToAltNames(altNames, []string{vip}) + } + } if len(cfg.ClusterIps) > 0 { for _, clusterIp := range cfg.ClusterIps { appendSANsToAltNames(altNames, []string{clusterIp}) diff --git a/pkg/kubenest/util/util.go b/pkg/kubenest/util/util.go index 49bb084d2..ee0f0535b 100644 --- a/pkg/kubenest/util/util.go +++ b/pkg/kubenest/util/util.go @@ -1,8 +1,10 @@ package util import ( + "crypto/rand" "encoding/base64" "fmt" + "math/big" "net" "strings" @@ -89,6 +91,121 @@ func IPV6First(ipNetStr string) (bool, error) { return utils.IsIPv6(ipNetStrArray[0]), nil } +// parseCIDR returns a channel that generates IP addresses in the CIDR range. +func parseCIDR(cidr string) (chan string, error) { + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + ch := make(chan string) + go func() { + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + ch <- ip.String() + } + close(ch) + }() + return ch, nil +} + +// inc increments an IP address. +func inc(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} + +// parseRange returns a channel that generates IP addresses in the range. +func parseRange(ipRange string) (chan string, error) { + parts := strings.Split(ipRange, "-") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid IP range format: %s", ipRange) + } + startIP := net.ParseIP(parts[0]) + endIP := net.ParseIP(parts[1]) + if startIP == nil || endIP == nil { + return nil, fmt.Errorf("invalid IP address in range: %s", ipRange) + } + + ch := make(chan string) + go func() { + for ip := startIP; !ip.Equal(endIP); inc(ip) { + ch <- ip.String() + } + ch <- endIP.String() + close(ch) + }() + return ch, nil +} + +// ParseVIPPool returns a channel that generates IP addresses from the vipPool. +func parseVIPPool(vipPool []string) (chan string, error) { + ch := make(chan string) + go func() { + defer close(ch) + for _, entry := range vipPool { + entry = strings.TrimSpace(entry) + var ipCh chan string + var err error + if strings.Contains(entry, "/") { + ipCh, err = parseCIDR(entry) + } else if strings.Contains(entry, "-") { + ipCh, err = parseRange(entry) + } else { + ip := net.ParseIP(entry) + if ip == nil { + err = fmt.Errorf("invalid IP address: %s", entry) + } else { + ipCh = make(chan string, 1) + ipCh <- entry + close(ipCh) + } + } + if err != nil { + fmt.Println("Error:", err) + return + } + for ip := range ipCh { + ch <- ip + } + } + }() + return ch, nil +} + +// FindAvailableIP finds an available IP address from vipPool that is not in allocatedVips. +func FindAvailableIP(vipPool, allocatedVips []string) (string, error) { + allocatedSet := make(map[string]struct{}) + for _, ip := range allocatedVips { + allocatedSet[ip] = struct{}{} + } + + ipCh, err := parseVIPPool(vipPool) + if err != nil { + return "", err + } + + for ip := range ipCh { + if _, allocated := allocatedSet[ip]; !allocated { + return ip, nil + } + } + + return "", fmt.Errorf("no available IP addresses") +} + +// Seed the random number generator using crypto/rand +func SecureRandomInt(n int) (int, error) { + bigN := big.NewInt(int64(n)) + randInt, err := rand.Int(rand.Reader, bigN) + if err != nil { + return 0, err + } + return int(randInt.Int64()), nil +} + func MapContains(big map[string]string, small map[string]string) bool { for k, v := range small { if bigV, ok := big[k]; !ok || bigV != v { diff --git a/pkg/kubenest/util/util_test.go b/pkg/kubenest/util/util_test.go new file mode 100644 index 000000000..403785a25 --- /dev/null +++ b/pkg/kubenest/util/util_test.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + "testing" + + "gopkg.in/yaml.v3" +) + +func TestFindAvailableIP(t *testing.T) { + type args struct { + vipPool []string + allocatedVips []string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test1", + args: args{ + vipPool: []string{"192.168.0.1", "192.168.0.2", "192.168.0.3"}, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test2", + args: args{ + vipPool: []string{ + "192.168.0.1", + "192.168.0.2-192.168.0.10", + "192.168.1.0/24", + "2001:db8::1", + "2001:db8::1-2001:db8::10", + "2001:db8::/64", + }, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test3", + args: args{ + vipPool: []string{ + "192.168.6.110-192.168.6.120", + }, + allocatedVips: []string{}, + }, + want: "192.168.6.110", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := FindAvailableIP(tt.args.vipPool, tt.args.allocatedVips) + fmt.Printf("got vip : %v", got) + if (err != nil) != tt.wantErr { + t.Errorf("FindAvailableIP() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("FindAvailableIP() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFindAvailableIP2(t *testing.T) { + type HostPortPool struct { + PortsPool []int32 `yaml:"portsPool"` + } + type VipPool struct { + Vip []string `yaml:"vipPool"` + } + var vipPool VipPool + var hostPortPool HostPortPool + yamlData2 := ` +portsPool: + - 33001 + - 33002 + - 33003 + - 33004 + - 33005 + - 33006 + - 33007 + - 33008 + - 33009 + - 33010 +` + yamlData := ` +vipPool: + - 192.168.6.110-192.168.6.120 +` + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + panic(err) + } + if err := yaml.Unmarshal([]byte(yamlData2), &hostPortPool); err != nil { + panic(err) + } + fmt.Printf("vipPool: %v", vipPool) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 13f9b7e2b..b0bdf2771 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -155,6 +155,9 @@ github.com/coreos/go-semver/semver ## explicit; go 1.12 github.com/coreos/go-systemd/v22/daemon github.com/coreos/go-systemd/v22/journal +# github.com/creack/pty v1.1.11 +## explicit; go 1.13 +github.com/creack/pty # github.com/davecgh/go-spew v1.1.1 ## explicit github.com/davecgh/go-spew/spew @@ -360,6 +363,18 @@ github.com/grpc-ecosystem/go-grpc-prometheus github.com/grpc-ecosystem/grpc-gateway/v2/internal/httprule github.com/grpc-ecosystem/grpc-gateway/v2/runtime github.com/grpc-ecosystem/grpc-gateway/v2/utilities +# github.com/hashicorp/hcl v1.0.0 +## explicit +github.com/hashicorp/hcl +github.com/hashicorp/hcl/hcl/ast +github.com/hashicorp/hcl/hcl/parser +github.com/hashicorp/hcl/hcl/printer +github.com/hashicorp/hcl/hcl/scanner +github.com/hashicorp/hcl/hcl/strconv +github.com/hashicorp/hcl/hcl/token +github.com/hashicorp/hcl/json/parser +github.com/hashicorp/hcl/json/scanner +github.com/hashicorp/hcl/json/token # github.com/imdario/mergo v0.3.12 ## explicit; go 1.13 github.com/imdario/mergo @@ -388,6 +403,9 @@ github.com/leodido/go-urn # github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de ## explicit github.com/liggitt/tabwriter +# github.com/magiconair/properties v1.8.6 +## explicit; go 1.13 +github.com/magiconair/properties # github.com/mailru/easyjson v0.7.7 ## explicit; go 1.12 github.com/mailru/easyjson/buffer @@ -402,6 +420,9 @@ github.com/matttproud/golang_protobuf_extensions/pbutil # github.com/mitchellh/go-wordwrap v1.0.0 ## explicit github.com/mitchellh/go-wordwrap +# github.com/mitchellh/mapstructure v1.5.0 +## explicit; go 1.14 +github.com/mitchellh/mapstructure # github.com/moby/locker v1.0.1 ## explicit; go 1.13 github.com/moby/locker @@ -494,6 +515,12 @@ github.com/opencontainers/selinux/pkg/pwalkdir # github.com/pelletier/go-toml v1.9.5 ## explicit; go 1.12 github.com/pelletier/go-toml +# github.com/pelletier/go-toml/v2 v2.0.1 +## explicit; go 1.16 +github.com/pelletier/go-toml/v2 +github.com/pelletier/go-toml/v2/internal/ast +github.com/pelletier/go-toml/v2/internal/danger +github.com/pelletier/go-toml/v2/internal/tracker # github.com/peterbourgon/diskv v2.0.1+incompatible ## explicit github.com/peterbourgon/diskv @@ -564,18 +591,40 @@ github.com/russross/blackfriday/v2 # github.com/sirupsen/logrus v1.9.0 ## explicit; go 1.13 github.com/sirupsen/logrus +# github.com/spf13/afero v1.9.2 +## explicit; go 1.16 +github.com/spf13/afero +github.com/spf13/afero/internal/common +github.com/spf13/afero/mem # github.com/spf13/cast v1.6.0 ## explicit; go 1.19 github.com/spf13/cast # github.com/spf13/cobra v1.6.0 ## explicit; go 1.15 github.com/spf13/cobra +# github.com/spf13/jwalterweatherman v1.1.0 +## explicit +github.com/spf13/jwalterweatherman # github.com/spf13/pflag v1.0.5 ## explicit; go 1.12 github.com/spf13/pflag +# github.com/spf13/viper v1.12.0 +## explicit; go 1.17 +github.com/spf13/viper +github.com/spf13/viper/internal/encoding +github.com/spf13/viper/internal/encoding/dotenv +github.com/spf13/viper/internal/encoding/hcl +github.com/spf13/viper/internal/encoding/ini +github.com/spf13/viper/internal/encoding/javaproperties +github.com/spf13/viper/internal/encoding/json +github.com/spf13/viper/internal/encoding/toml +github.com/spf13/viper/internal/encoding/yaml # github.com/stoewer/go-strcase v1.2.0 ## explicit; go 1.11 github.com/stoewer/go-strcase +# github.com/subosito/gotenv v1.3.0 +## explicit; go 1.18 +github.com/subosito/gotenv # github.com/vishvananda/netlink v1.2.1-beta.2.0.20220630165224-c591ada0fb2b ## explicit; go 1.12 github.com/vishvananda/netlink @@ -927,6 +976,9 @@ gopkg.in/go-playground/validator.v9 # gopkg.in/inf.v0 v0.9.1 ## explicit gopkg.in/inf.v0 +# gopkg.in/ini.v1 v1.66.4 +## explicit +gopkg.in/ini.v1 # gopkg.in/natefinch/lumberjack.v2 v2.0.0 ## explicit gopkg.in/natefinch/lumberjack.v2