diff --git a/go.mod b/go.mod index 4095391158..2a95d0b3a9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/bronze1man/goStrongswanVici v0.0.0-20190921045355-4c81bd8d0bd5 github.com/coreos/go-iptables v0.4.5 github.com/go-logr/zapr v0.1.1 // indirect + github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a github.com/imdario/mergo v0.3.9 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/kelseyhightower/envconfig v1.4.0 diff --git a/go.sum b/go.sum index ed4ef4df93..ae3460cc62 100644 --- a/go.sum +++ b/go.sum @@ -130,6 +130,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= +github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a h1:O9xspHB2yrvKfMQ1m6OQhqe37i5yvg0dXAYMuAjugmM= +github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -410,6 +412,7 @@ golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= diff --git a/main.go b/main.go index 7236c00c66..3f29edd961 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/cable" "github.com/submariner-io/submariner/pkg/cableengine" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" "github.com/submariner-io/submariner/pkg/cableengine/syncer" submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/controllers/datastoresyncer" @@ -110,18 +111,24 @@ func main() { submSpec.CableDriver = strings.ToLower(submSpec.CableDriver) localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, map[string]string{}, submSpec.NatEnabled, - localSubnets, util.GetLocalIP()) + localSubnets, util.GetLocalIP(), submSpec.ClusterCidr) if err != nil { klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err) } + latencyMap := new(sync.Map) cableEngine := cableengine.NewEngine(localCluster, localEndpoint) + cableHealthchecker := healthchecker.NewHealthChecker(localEndpoint.Spec.Hostname, + submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace), latencyMap) + + cableHealthchecker.Run(stopCh) + cableEngineSyncer := syncer.NewGatewaySyncer( cableEngine, submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace), - VERSION) + VERSION, latencyMap) cableEngineSyncer.Run(stopCh) diff --git a/pkg/cableengine/healthchecker/healthchecker.go b/pkg/cableengine/healthchecker/healthchecker.go new file mode 100644 index 0000000000..ccb2a384b4 --- /dev/null +++ b/pkg/cableengine/healthchecker/healthchecker.go @@ -0,0 +1,155 @@ +package healthchecker + +import ( + "sync" + "time" + + "github.com/go-ping/ping" + v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog" +) + +var HealthCheckInterval = 15 * time.Second + +type HealthChecker struct { + localEndpointName string + client v1typed.GatewayInterface + latencyMap *sync.Map +} + +type LatencyInfo struct { + Status v1.ConnectionStatus + Latency *v1.LatencySpec +} + +// NewEngine creates a new Engine for the local cluster +func NewHealthChecker(localEndpointName string, client v1typed.GatewayInterface, + latencyMap *sync.Map) *HealthChecker { + return &HealthChecker{ + localEndpointName: localEndpointName, + client: client, + latencyMap: latencyMap, + } +} + +func (h *HealthChecker) Run(stopCh <-chan struct{}) { + go func() { + wait.Until(h.checkRemoteGatewayHealth, HealthCheckInterval, stopCh) + }() + + klog.Infof("CableEngine healthchecker started") +} +func (h *HealthChecker) checkRemoteGatewayHealth() { + existingGw, err := h.client.Get(h.localEndpointName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + klog.V(2).Infof("Gateway object not found: %v", err) + return + } else if err != nil { + klog.Errorf("Error retrieving the gateway object: %v", err) + return + } + + if len(existingGw.Status.Connections) == 0 { + klog.V(2).Info("No endpoints present in the gateway") + return + } + + deleteMap := copyLatencyMap(h.latencyMap) + var wg sync.WaitGroup + + for _, endPointInfo := range existingGw.Status.Connections { + wg.Add(1) + + hostName := endPointInfo.Endpoint.Hostname + healthCheckIp := endPointInfo.Endpoint.HealthCheckIP + + // Remove the entry that is found in the gateway connection status + delete(deleteMap, hostName) + + go func() { + latencyInfo := h.sendPing(&wg, hostName, healthCheckIp) + if latencyInfo != nil { + h.latencyMap.Store(hostName, latencyInfo) + } + }() + } + + wg.Wait() + + for k := range deleteMap { + // Remove the entry that are missing in the gateway connection status from the + // latencymap + h.latencyMap.Delete(k) + } +} + +func copyLatencyMap(latencyMap *sync.Map) map[string]*LatencyInfo { + newMap := make(map[string]*LatencyInfo) + + latencyMap.Range(func(key, val interface{}) bool { + if val != nil { + newMap[key.(string)] = val.(*LatencyInfo) + } + return true + }) + + return newMap +} + +func (h *HealthChecker) sendPing(wg *sync.WaitGroup, remoteHostName, healthCheckIp string) *LatencyInfo { + defer wg.Done() + var lastRtt time.Duration + + pinger, err := ping.NewPinger(healthCheckIp) + if err != nil { + klog.Errorf("Creating pinger for hostname %v to the ip %v failed due to %v", remoteHostName, healthCheckIp, err) + return nil + } + + pinger.Count = 3 + pinger.SetPrivileged(true) + pinger.Timeout = 3 * time.Second + + pinger.OnRecv = func(packet *ping.Packet) { + lastRtt = packet.Rtt + } + var latencyinfo *LatencyInfo + + pinger.OnFinish = func(stats *ping.Statistics) { + if stats.PacketLoss == 100 { + latencySpec := &v1.LatencySpec{ + LastRTT: 0, + MinRTT: 0, + AverageRTT: 0, + MaxRTT: 0, + StdDevRTT: 0, + } + latencyinfo = &LatencyInfo{ + Status: v1.ConnectionError, + Latency: latencySpec, + } + } else { + latencySpec := &v1.LatencySpec{ + LastRTT: float64(lastRtt.Microseconds()) / 1000, + MinRTT: float64(stats.MinRtt.Microseconds()) / 1000, + AverageRTT: float64(stats.AvgRtt.Microseconds()) / 1000, + MaxRTT: float64(stats.MaxRtt.Microseconds()) / 1000, + StdDevRTT: float64(stats.StdDevRtt.Microseconds()) / 1000, + } + latencyinfo = &LatencyInfo{ + Status: v1.Connected, + Latency: latencySpec, + } + } + } + err = pinger.Run() + if err != nil { + klog.Errorf("Ping to %q with ip %q failed due to %v", remoteHostName, healthCheckIp, err) + } + + return latencyinfo +} diff --git a/pkg/cableengine/syncer/syncer.go b/pkg/cableengine/syncer/syncer.go index e25df74418..27b995784b 100644 --- a/pkg/cableengine/syncer/syncer.go +++ b/pkg/cableengine/syncer/syncer.go @@ -17,6 +17,7 @@ import ( "github.com/submariner-io/admiral/pkg/log" v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/cableengine" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1" ) @@ -26,6 +27,7 @@ type GatewaySyncer struct { engine cableengine.Engine version string statusError error + latencyMap *sync.Map } var GatewayUpdateInterval = 5 * time.Second @@ -44,11 +46,12 @@ func init() { // NewEngine creates a new Engine for the local cluster func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface, - version string) *GatewaySyncer { + version string, latencyMap *sync.Map) *GatewaySyncer { return &GatewaySyncer{ - client: client, - engine: engine, - version: version, + client: client, + engine: engine, + version: version, + latencyMap: latencyMap, } } @@ -201,6 +204,19 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway { } if connections != nil { + for index, connection := range *connections { + latencySpec, ok := i.latencyMap.Load(connection.Endpoint.Hostname) + if ok { + latencyInfo := latencySpec.(*healthchecker.LatencyInfo) + connection.Latency = latencyInfo.Latency + if connection.Status == v1.Connected && latencyInfo.Status != v1.Connected { + connection.Status = latencyInfo.Status + } + + (*connections)[index] = connection + } + } + gateway.Status.Connections = *connections } else { gateway.Status.Connections = []v1.Connection{} diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index 9ac672fbe9..07d8694d86 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -3,6 +3,7 @@ package syncer_test import ( "fmt" "strconv" + "sync" "testing" "time" @@ -337,7 +338,7 @@ func (t *testDriver) run() { client := fakeClientset.NewSimpleClientset() t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace) - t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version) + t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, new(sync.Map)) informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0) informer := informerFactory.Submariner().V1().Gateways().Informer() diff --git a/pkg/util/util.go b/pkg/util/util.go index 676ebafdf0..dad5f03cd5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -86,7 +86,7 @@ func FlattenColors(colorCodes []string) string { } func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string, natEnabled bool, - subnets []string, privateIP string) (types.SubmarinerEndpoint, error) { + subnets []string, privateIP string, clusterCIDR []string) (types.SubmarinerEndpoint, error) { hostname, err := os.Hostname() if err != nil { return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting hostname: %v", err) @@ -114,9 +114,55 @@ func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string endpoint.Spec.PublicIP = publicIP } + endpoint.Spec.HealthCheckIP, err = getCNIInterfaceIPAddress(clusterCIDR) + + if err != nil { + return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting CNI Interface IP address: %v", err) + } + return endpoint, nil } +func getCNIInterfaceIPAddress(clusterCIDRs []string) (string, error) { + for _, clusterCIDR := range clusterCIDRs { + _, clusterNetwork, err := net.ParseCIDR(clusterCIDR) + if err != nil { + return "", fmt.Errorf("unable to ParseCIDR %q : %v", clusterCIDR, err) + } + + hostInterfaces, err := net.Interfaces() + if err != nil { + return "", fmt.Errorf("net.Interfaces() returned error : %v", err) + } + + for _, iface := range hostInterfaces { + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("for interface %q, iface.Addrs returned error: %v", iface.Name, err) + } + + for i := range addrs { + ipAddr, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + klog.Errorf("Unable to ParseCIDR : %q", addrs[i].String()) + } else if ipAddr.To4() != nil { + klog.V(level.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr) + address := net.ParseIP(ipAddr.String()) + + // Verify that interface has an address from cluster CIDR + if clusterNetwork.Contains(address) { + klog.V(level.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q", + iface.Name, ipAddr.String(), clusterCIDR) + return ipAddr.String(), nil + } + } + } + } + } + + return "", fmt.Errorf("unable to find CNI Interface on the host which has IP from %q", clusterCIDRs) +} + func GetClusterIDFromCableName(cableName string) string { // length is 11 // 0 1 2 3 4 5 6 7 8 9 10 diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 3b03468ff6..564252aca2 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -70,7 +70,7 @@ func testGetLocalEndpoint() { It("should return a valid SubmarinerEndpoint object", func() { subnets := []string{"1.2.3.4/16"} privateIP := "1.2.3.4" - endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP) + endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP, subnets) Expect(err).ToNot(HaveOccurred()) Expect(endpoint.Spec.ClusterID).To(Equal("east"))