From 13c730b765dfe1dd08435922197203a8348e092c Mon Sep 17 00:00:00 2001 From: rambohe-ch Date: Tue, 10 Nov 2020 14:18:09 +0800 Subject: [PATCH] feature: support forward cloud requests to edge node's localhost endpoint. fixes #415 --- config/setup/yurt-tunnel-server.yaml | 1 + config/yaml-template/yurt-tunnel-server.yaml | 1 + .../constants/yurt-tunnel-server-tmpl.go | 1 + pkg/yurttunnel/constants/constants.go | 2 + .../handlerwrapper/localhostproxy/handler.go | 272 ++++++++++++++++++ .../localhostproxy/handler_test.go | 76 +++++ .../handlerwrapper/wraphandler/wraphandler.go | 2 + pkg/yurttunnel/util/util.go | 2 + 8 files changed, 357 insertions(+) create mode 100644 pkg/yurttunnel/handlerwrapper/localhostproxy/handler.go create mode 100644 pkg/yurttunnel/handlerwrapper/localhostproxy/handler_test.go diff --git a/config/setup/yurt-tunnel-server.yaml b/config/setup/yurt-tunnel-server.yaml index e50080e9692..0c2f6a99468 100644 --- a/config/setup/yurt-tunnel-server.yaml +++ b/config/setup/yurt-tunnel-server.yaml @@ -129,6 +129,7 @@ metadata: name: yurt-tunnel-server-cfg namespace: kube-system data: + localhost-proxy-ports: "10266, 10267" dnat-ports-pair: "" --- apiVersion: apps/v1 diff --git a/config/yaml-template/yurt-tunnel-server.yaml b/config/yaml-template/yurt-tunnel-server.yaml index 48c93ebd41e..d27c3f0a71b 100644 --- a/config/yaml-template/yurt-tunnel-server.yaml +++ b/config/yaml-template/yurt-tunnel-server.yaml @@ -129,6 +129,7 @@ metadata: name: __project_prefix__-tunnel-server-cfg namespace: kube-system data: + localhost-proxy-ports: "10266, 10267" dnat-ports-pair: "" --- apiVersion: apps/v1 diff --git a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go index e65d22e66ad..d3e37256883 100644 --- a/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go +++ b/pkg/yurtctl/constants/yurt-tunnel-server-tmpl.go @@ -155,6 +155,7 @@ metadata: name: yurt-tunnel-server-cfg namespace: kube-system data: + localhost-proxy-ports: "10266, 10267" dnat-ports-pair: "" ` YurttunnelServerDeployment = ` diff --git a/pkg/yurttunnel/constants/constants.go b/pkg/yurttunnel/constants/constants.go index 452de31a1cf..f441e54f5fa 100644 --- a/pkg/yurttunnel/constants/constants.go +++ b/pkg/yurttunnel/constants/constants.go @@ -45,7 +45,9 @@ const ( YurttunnelAgentPodIPEnv = "POD_IP" // name of the environment for selecting backend agent used in yurt-tunnel-server + NodeIPKeyIndex = "status.internalIP" ProxyHostHeaderKey = "X-Tunnel-Proxy-Host" + ProxyDestHeaderKey = "X-Tunnel-Proxy-Dest" // The timeout seconds of reading a complete request from the apiserver YurttunnelANPInterceptorReadTimeoutSec = 10 diff --git a/pkg/yurttunnel/handlerwrapper/localhostproxy/handler.go b/pkg/yurttunnel/handlerwrapper/localhostproxy/handler.go new file mode 100644 index 00000000000..6a2ca109c8b --- /dev/null +++ b/pkg/yurttunnel/handlerwrapper/localhostproxy/handler.go @@ -0,0 +1,272 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localhostproxy + +import ( + "errors" + "fmt" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/openyurtio/openyurt/pkg/projectinfo" + "github.com/openyurtio/openyurt/pkg/yurttunnel/constants" + hw "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper" + "github.com/openyurtio/openyurt/pkg/yurttunnel/util" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformer "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// localHostProxyMiddleware modify request for requests from cloud can access localhost of edge node. +type localHostProxyMiddleware struct { + sync.RWMutex + getNodesByIP func(nodeIP string) ([]*corev1.Node, error) + localhostPorts map[string]struct{} + nodeInformerSynced cache.InformerSynced + cmInformerSynced cache.InformerSynced +} + +func NewLocalHostProxyMiddleware() hw.Middleware { + return &localHostProxyMiddleware{ + localhostPorts: make(map[string]struct{}), + } +} + +func (plm *localHostProxyMiddleware) Name() string { + return "localHostProxyMiddleware" +} + +// WrapHandler modify request header and URL for underlying anp to proxy request. +func (plm *localHostProxyMiddleware) WrapHandler(handler http.Handler) http.Handler { + // wait for nodes and configmaps have synced + if !cache.WaitForCacheSync(wait.NeverStop, plm.nodeInformerSynced, plm.cmInformerSynced) { + klog.Error("failed to sync node or configmap cache") + return handler + } + + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Check the request port to see if it needs to be forwarded to the node's localhost + proxyDest := req.Header.Get(constants.ProxyDestHeaderKey) + if len(proxyDest) != 0 { + req.Header.Del(constants.ProxyDestHeaderKey) + } else { + proxyDest = req.Host + } + + nodeIP, port, err := net.SplitHostPort(proxyDest) + if err != nil { + klog.Errorf("proxy dest(%s) is invalid %v for request: %s", proxyDest, err, req.URL.String()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // port is included in proxy-localhost-ports, so modify request for + // forwarding the request to access node's localhost. + plm.RLock() + _, ok := plm.localhostPorts[port] + plm.RUnlock() + if ok { + // set up X-Tunnel-Proxy-Host header in request for underlying anp to select backend tunnel agent conn. + if len(req.Header.Get(constants.ProxyHostHeaderKey)) == 0 { + nodeName, err := plm.resolveNodeNameByNodeIP(nodeIP) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req.Header.Set(constants.ProxyHostHeaderKey, nodeName) + } + + proxyDest = fmt.Sprintf("127.0.0.1:%s", port) + oldHost := req.URL.Host + req.Host = proxyDest + req.Header.Set("Host", proxyDest) + req.URL.Host = proxyDest + klog.V(2).Infof("proxy request %s to access localhost, changed from %s to %s(%s)", req.URL.String(), oldHost, proxyDest, req.Header.Get(constants.ProxyHostHeaderKey)) + } + + klog.V(3).Infof("request header in localHostProxyMiddleware: %v with host: %s and urL: %s", req.Header, req.Host, req.URL.String()) + handler.ServeHTTP(w, req) + }) +} + +// SetSharedInformerFactory init GetNodesByIP and configmap event handler for WrapHandler +func (plm *localHostProxyMiddleware) SetSharedInformerFactory(factory informers.SharedInformerFactory) error { + if factory == nil { + return errors.New("shared informer factory should not be nil") + } + + nodeInformer := factory.Core().V1().Nodes() + if err := nodeInformer.Informer().AddIndexers(cache.Indexers{constants.NodeIPKeyIndex: getNodeAddress}); err != nil { + klog.ErrorS(err, "failed to add statusInternalIP indexer") + return err + } + + plm.getNodesByIP = func(nodeIP string) ([]*v1.Node, error) { + objs, err := nodeInformer.Informer().GetIndexer().ByIndex(constants.NodeIPKeyIndex, nodeIP) + if err != nil { + return nil, err + } + + nodes := make([]*v1.Node, 0, len(objs)) + for _, obj := range objs { + if node, ok := obj.(*v1.Node); ok { + nodes = append(nodes, node) + } + } + + return nodes, nil + } + plm.nodeInformerSynced = nodeInformer.Informer().HasSynced + + cmInformer := factory.InformerFor(&corev1.ConfigMap{}, newConfigMapInformer) + cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: plm.addConfigMap, + UpdateFunc: plm.updateConfigMap, + }) + plm.cmInformerSynced = cmInformer.HasSynced + + return nil +} + +// newConfigMapInformer creates a shared index informer that returns only interested configmaps +func newConfigMapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fmt.Sprintf("metadata.name=%v", util.YurttunnelServerDnatConfigMapName) + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = selector + } + return coreinformer.NewFilteredConfigMapInformer(cs, util.YurttunnelServerDnatConfigMapNs, resyncPeriod, nil, tweakListOptions) +} + +// addConfigMap handle configmap add event +func (plm *localHostProxyMiddleware) addConfigMap(obj interface{}) { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + return + } + if cm.DeletionTimestamp != nil { + return + } + klog.V(2).Infof("handle configmap add event for %v/%v to update localhost ports", cm.Namespace, cm.Name) + plm.replaceLocalHostPorts(cm.Data[util.YurtTunnelLocalHostProxyPorts]) +} + +// updateConfigMap handle configmap update event +func (plm *localHostProxyMiddleware) updateConfigMap(oldObj, newObj interface{}) { + oldConfigMap, ok := oldObj.(*corev1.ConfigMap) + if !ok { + return + } + newConfigMap, ok := newObj.(*corev1.ConfigMap) + if !ok { + return + } + + if oldConfigMap.Data[util.YurtTunnelLocalHostProxyPorts] == newConfigMap.Data[util.YurtTunnelLocalHostProxyPorts] { + return + } + + klog.V(2).Infof("handle configmap update event for %v/%v to update localhost ports", newConfigMap.Namespace, newConfigMap.Name) + plm.replaceLocalHostPorts(newConfigMap.Data[util.YurtTunnelLocalHostProxyPorts]) +} + +// replaceLocalHostPorts replace all localhost ports by new specified ports. +func (plm *localHostProxyMiddleware) replaceLocalHostPorts(portsStr string) { + ports := make([]string, 0) + for _, port := range strings.Split(portsStr, util.PortsSeparator) { + if len(strings.TrimSpace(port)) != 0 { + ports = append(ports, strings.TrimSpace(port)) + } + } + + plm.Lock() + defer plm.Unlock() + for port := range plm.localhostPorts { + delete(plm.localhostPorts, port) + } + + for i := range ports { + plm.localhostPorts[ports[i]] = struct{}{} + } +} + +// resolveProxyHostFromRequest get proxy host info from request +func (plm *localHostProxyMiddleware) resolveNodeNameByNodeIP(nodeIP string) (string, error) { + var nodeName string + + if nodes, err := plm.getNodesByIP(nodeIP); err != nil || len(nodes) == 0 { + klog.Warningf("failed to get node for node ip(%s)", nodeIP) + return "", fmt.Errorf("proxy node ip(%s) is not exist in cluster", nodeIP) + } else if len(nodes) != 1 { + klog.Warningf("more than one node with the same IP(%s), so unable to proxy request", nodeIP) + return "", fmt.Errorf("more than one node with ip(%s) in cluster", nodeIP) + } else { + nodeName = nodes[0].Name + } + + if len(nodeName) == 0 { + klog.Warningf("node name for node ip(%s) is not exist in cluster", nodeIP) + return "", fmt.Errorf("failed to get node name for node ip(%s)", nodeIP) + } + + klog.V(5).Infof("resolved node name(%s) for node ip(%s)", nodeName, nodeIP) + return nodeName, nil +} + +// getNodeAddress return the internal ip address of specified node. +func getNodeAddress(obj interface{}) ([]string, error) { + node, ok := obj.(*corev1.Node) + if !ok || node == nil { + return []string{}, nil + } + + if withoutAgent(node) { + // node has no running tunnel agent, do not go through tunnel server + return []string{}, nil + } + + for _, nodeAddr := range node.Status.Addresses { + if nodeAddr.Type == corev1.NodeInternalIP { + return []string{nodeAddr.Address}, nil + } + } + + return []string{}, nil +} + +// withoutAgent used to determine whether the node is running an tunnel agent +func withoutAgent(node *corev1.Node) bool { + tunnelAgentNode, ok := node.Labels[projectinfo.GetEdgeEnableTunnelLabelKey()] + if ok && tunnelAgentNode == "true" { + return false + } + + edgeNode, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()] + if ok && edgeNode == "true" { + return false + } + return true +} diff --git a/pkg/yurttunnel/handlerwrapper/localhostproxy/handler_test.go b/pkg/yurttunnel/handlerwrapper/localhostproxy/handler_test.go new file mode 100644 index 00000000000..c817586665b --- /dev/null +++ b/pkg/yurttunnel/handlerwrapper/localhostproxy/handler_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2021 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localhostproxy + +import ( + "reflect" + "testing" +) + +func TestReplaceLocalHostPorts(t *testing.T) { + testcases := map[string]struct { + initPorts []string + localhostPorts string + resultPorts map[string]struct{} + }{ + "no init ports for representing configmap is added": { + localhostPorts: "10250, 10255, 10256", + resultPorts: map[string]struct{}{ + "10250": {}, + "10255": {}, + "10256": {}, + }, + }, + "with init ports for representing configmap is updated": { + initPorts: []string{"10250", "10255", "10256"}, + localhostPorts: "10250, 10255, 10256, 10257", + resultPorts: map[string]struct{}{ + "10250": {}, + "10255": {}, + "10256": {}, + "10257": {}, + }, + }, + } + + plh := &localHostProxyMiddleware{ + localhostPorts: make(map[string]struct{}), + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + // prepare localhost ports + for i := range tc.initPorts { + plh.localhostPorts[tc.initPorts[i]] = struct{}{} + } + + // run replaceLocalHostPorts + plh.replaceLocalHostPorts(tc.localhostPorts) + + // compare replace result + ok := reflect.DeepEqual(plh.localhostPorts, tc.resultPorts) + if !ok { + t.Errorf("expect localhost ports: %v, but got %v", tc.resultPorts, plh.localhostPorts) + } + + // cleanup localhost ports + for port := range plh.localhostPorts { + delete(plh.localhostPorts, port) + } + }) + } +} diff --git a/pkg/yurttunnel/handlerwrapper/wraphandler/wraphandler.go b/pkg/yurttunnel/handlerwrapper/wraphandler/wraphandler.go index d39872987cf..d90ab7ea00e 100644 --- a/pkg/yurttunnel/handlerwrapper/wraphandler/wraphandler.go +++ b/pkg/yurttunnel/handlerwrapper/wraphandler/wraphandler.go @@ -23,6 +23,7 @@ import ( hw "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper" "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper/initializer" + "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper/localhostproxy" "github.com/openyurtio/openyurt/pkg/yurttunnel/handlerwrapper/tracerequest" ) @@ -41,6 +42,7 @@ func InitHandlerWrappers(mi initializer.MiddlewareInitializer) (hw.HandlerWrappe // // then the middleware m2 will be called before the mw1 wrappers = append(wrappers, tracerequest.NewTraceReqMiddleware()) + wrappers = append(wrappers, localhostproxy.NewLocalHostProxyMiddleware()) // init all of wrappers for i := range wrappers { diff --git a/pkg/yurttunnel/util/util.go b/pkg/yurttunnel/util/util.go index 17f38ad1fa9..da07821a51a 100644 --- a/pkg/yurttunnel/util/util.go +++ b/pkg/yurttunnel/util/util.go @@ -21,6 +21,8 @@ const ( // constants related dnat rules configmap YurttunnelServerDnatConfigMapNs = "kube-system" yurttunnelServerDnatDataKey = "dnat-ports-pair" + YurtTunnelLocalHostProxyPorts = "localhost-proxy-ports" + PortsSeparator = "," ) var (