Skip to content

Commit

Permalink
Add healthcheck to check the gateway status
Browse files Browse the repository at this point in the history
*Populate the healthcheck-ip in endpoint object
*Ping the remote gateway nodes healthcheck ip

Signed-off-by: Aswin Surayanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Nov 6, 2020
1 parent b5234c4 commit 6e612d6
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 10 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/bronze1man/goStrongswanVici v0.0.0-20190921045355-4c81bd8d0bd5
github.com/coreos/go-iptables v0.4.5
github.com/go-logr/zapr v0.1.1 // indirect
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a
github.com/imdario/mergo v0.3.9 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a h1:O9xspHB2yrvKfMQ1m6OQhqe37i5yvg0dXAYMuAjugmM=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -410,6 +412,7 @@ golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
11 changes: 9 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -110,18 +111,24 @@ func main() {
submSpec.CableDriver = strings.ToLower(submSpec.CableDriver)

localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, map[string]string{}, submSpec.NatEnabled,
localSubnets, util.GetLocalIP())
localSubnets, util.GetLocalIP(), submSpec.ClusterCidr)

if err != nil {
klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err)
}

latencyMap := new(sync.Map)
cableEngine := cableengine.NewEngine(localCluster, localEndpoint)

cableHealthchecker := healthchecker.NewHealthChecker(localEndpoint.Spec.Hostname,submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace),
latencyMap)

cableHealthchecker.Run(stopCh)

cableEngineSyncer := syncer.NewGatewaySyncer(
cableEngine,
submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace),
VERSION)
VERSION, latencyMap)

cableEngineSyncer.Run(stopCh)

Expand Down
133 changes: 133 additions & 0 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package healthchecker

import (
"github.com/go-ping/ping"
v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"sync"
"time"
)

var HealthCheckInterval = 15 * time.Second

type HealthChecker struct {
localEndpointName string
client v1typed.GatewayInterface
latencyMap *sync.Map
}

type LatencyInfo struct {
Status v1.ConnectionStatus
Latency *v1.LatencySpec
}

// NewEngine creates a new Engine for the local cluster
func NewHealthChecker(localEndpointName string, client v1typed.GatewayInterface,
latencyMap *sync.Map) *HealthChecker {
return &HealthChecker{
localEndpointName: localEndpointName,
client: client,
latencyMap: latencyMap,
}
}

func (h *HealthChecker) Run(stopCh <-chan struct{}) {
go func() {
wait.Until(h.checkRemoteGatewayHealth, HealthCheckInterval, stopCh)
}()

klog.Infof("CableEngine healthchecker started")
}
func (h *HealthChecker) checkRemoteGatewayHealth() {
existingGw, err := h.client.Get(h.localEndpointName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(2).Info("Gateway object not found: %v", err)
return;
} else if err != nil {
klog.Errorf("Error retrieving the gateway object: %v", err)
return;
}
if (len(existingGw.Status.Connections) == 0) {
klog.V(2).Info("No endpoints present in the gateway")
return
}
deleteMap := copyLatencyMap(h.latencyMap)
for _, endPointInfo := range existingGw.Status.Connections {
hostName := endPointInfo.Endpoint.Hostname
healthCheckIp := endPointInfo.Endpoint.HealthCheckIP
// Remove the entry that is found in the gateway connection status
deleteMap.Delete(hostName)
go func() {
latencyInfo := h.checkRemoteGatewayHealthSafe(hostName, healthCheckIp)
if (latencyInfo != nil) {
h.latencyMap.Store(hostName, latencyInfo)
}
}()
}

deleteMap.Range(func(k, v interface{}) bool {
// Remove the entry that are missing in the gateway connection status from the
// latencymap
h.latencyMap.Delete(k)
return true
})
}

func copyLatencyMap(latencyMap *sync.Map) sync.Map {
var newMap sync.Map

latencyMap.Range(func(k, v interface{}) bool {
newMap.Store(k, v)
return true
})

return newMap
}

func (h *HealthChecker) checkRemoteGatewayHealthSafe(remoteHostName string, healthCheckIp string) *LatencyInfo {
var lastRtt time.Duration
pinger, err := ping.NewPinger(healthCheckIp)
if err != nil {
return nil;
}

pinger.Count = 3
pinger.SetPrivileged(true)
pinger.OnRecv = func(packet *ping.Packet) {
lastRtt = packet.Rtt
}
var latencyinfo *LatencyInfo
pinger.OnFinish = func(stats *ping.Statistics) {
if (stats.PacketLoss == 100) {
latencySpec := &v1.LatencySpec{
LastRTT: 0,
MinRTT: 0,
AverageRTT: 0,
MaxRTT: 0,
StdDevRTT: 0,
}
latencyinfo = &LatencyInfo{
Status: v1.ConnectionError,
Latency: latencySpec,
}
} else {
latencySpec := &v1.LatencySpec{
LastRTT: float64(lastRtt.Microseconds()) / 1000,
MinRTT: float64(stats.MinRtt.Microseconds()) / 1000,
AverageRTT: float64(stats.AvgRtt.Microseconds()) / 1000,
MaxRTT: float64(stats.MaxRtt.Microseconds()) / 1000,
StdDevRTT: float64(stats.StdDevRtt.Microseconds()) / 1000,
}
latencyinfo = &LatencyInfo{
Status: v1.Connected,
Latency: latencySpec,
}
}
}
err = pinger.Run()
return latencyinfo
}
23 changes: 19 additions & 4 deletions pkg/cableengine/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package syncer

import (
"fmt"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
"reflect"
"strconv"
"sync"
Expand All @@ -26,6 +27,7 @@ type GatewaySyncer struct {
engine cableengine.Engine
version string
statusError error
latencyMap *sync.Map
}

var GatewayUpdateInterval = 5 * time.Second
Expand All @@ -44,11 +46,12 @@ func init() {

// NewEngine creates a new Engine for the local cluster
func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface,
version string) *GatewaySyncer {
version string, latencyMap *sync.Map) *GatewaySyncer {
return &GatewaySyncer{
client: client,
engine: engine,
version: version,
client: client,
engine: engine,
version: version,
latencyMap: latencyMap,
}
}

Expand Down Expand Up @@ -108,6 +111,7 @@ func (i *GatewaySyncer) syncGatewayStatusSafe() {
klog.V(log.TRACE).Info("Gateway already exists but doesn't need updating")
}


if gatewayObj.Status.HAStatus == v1.HAStatusActive {
err := i.cleanupStaleGatewayEntries(gatewayObj.Name)
if err != nil {
Expand Down Expand Up @@ -201,6 +205,17 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway {
}

if connections != nil {
for index, connection := range *connections {
latencySpec, ok := i.latencyMap.Load(connection.Endpoint.Hostname)
if ok {
latencyInfo := latencySpec.(*healthchecker.LatencyInfo)
connection.Latency = latencyInfo.Latency
if(connection.Status == v1.Connected && latencyInfo.Status != v1.Connected) {
connection.Status = latencyInfo.Status
}
(*connections)[index] = connection
}
}
gateway.Status.Connections = *connections
} else {
gateway.Status.Connections = []v1.Connection{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cableengine/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer_test
import (
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -337,7 +338,7 @@ func (t *testDriver) run() {

client := fakeClientset.NewSimpleClientset()
t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace)
t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version)
t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, sync.Map{})

informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0)
informer := informerFactory.Submariner().V1().Gateways().Informer()
Expand Down
50 changes: 48 additions & 2 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"fmt"
"github.com/rdegges/go-ipify"
"log"
"net"
"os"
Expand All @@ -10,7 +11,6 @@ import (
"syscall"

"github.com/coreos/go-iptables/iptables"
"github.com/rdegges/go-ipify"
level "github.com/submariner-io/admiral/pkg/log"
subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/types"
Expand Down Expand Up @@ -86,7 +86,7 @@ func FlattenColors(colorCodes []string) string {
}

func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string, natEnabled bool,
subnets []string, privateIP string) (types.SubmarinerEndpoint, error) {
subnets []string, privateIP string, clusterCIDR []string) (types.SubmarinerEndpoint, error) {
hostname, err := os.Hostname()
if err != nil {
return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting hostname: %v", err)
Expand Down Expand Up @@ -114,9 +114,55 @@ func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string
endpoint.Spec.PublicIP = publicIP
}

endpoint.Spec.HealthCheckIP, err = getCNIInterfaceIPAddress(clusterCIDR)

if err != nil {
return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting CNI Interface IP address: %v", err)
}

return endpoint, nil
}

func getCNIInterfaceIPAddress(clusterCIDRs []string) (string, error) {
for _, clusterCIDR := range clusterCIDRs {
_, clusterNetwork, err := net.ParseCIDR(clusterCIDR)
if err != nil {
return "", fmt.Errorf("unable to ParseCIDR %q : %v", clusterCIDR, err)
}

hostInterfaces, err := net.Interfaces()
if err != nil {
return "", fmt.Errorf("net.Interfaces() returned error : %v", err)
}

for _, iface := range hostInterfaces {
addrs, err := iface.Addrs()
if err != nil {
return "", fmt.Errorf("for interface %q, iface.Addrs returned error: %v", iface.Name, err)
}

for i := range addrs {
ipAddr, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
klog.Errorf("Unable to ParseCIDR : %q", addrs[i].String())
} else if ipAddr.To4() != nil {
klog.V(level.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr)
address := net.ParseIP(ipAddr.String())

// Verify that interface has an address from cluster CIDR
if clusterNetwork.Contains(address) {
klog.V(level.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q",
iface.Name, ipAddr.String(), clusterCIDR)
return ipAddr.String(), nil
}
}
}
}
}

return "", fmt.Errorf("unable to find CNI Interface on the host which has IP from %q", clusterCIDRs)
}

func GetClusterIDFromCableName(cableName string) string {
// length is 11
// 0 1 2 3 4 5 6 7 8 9 10
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func testGetLocalEndpoint() {
It("should return a valid SubmarinerEndpoint object", func() {
subnets := []string{"1.2.3.4/16"}
privateIP := "1.2.3.4"
endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP)
endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP, subnets)

Expect(err).ToNot(HaveOccurred())
Expect(endpoint.Spec.ClusterID).To(Equal("east"))
Expand Down

0 comments on commit 6e612d6

Please sign in to comment.