Skip to content

Commit

Permalink
Add health checker implementation
Browse files Browse the repository at this point in the history
*Populate the healthchecker Ip
*Ping the healthchecker IP to check the remote
gateway status
*Update the gateway satus if the ping fails

Fixes: submariner-io#821

Signed-off-by: Aswin Surayanarayanan <[email protected]>
  • Loading branch information
aswinsuryan committed Nov 12, 2020
1 parent 2ee5c14 commit 3ae50a6
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/bronze1man/goStrongswanVici v0.0.0-20190921045355-4c81bd8d0bd5
github.com/coreos/go-iptables v0.4.5
github.com/go-logr/zapr v0.1.1 // indirect
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a
github.com/imdario/mergo v0.3.9 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a h1:O9xspHB2yrvKfMQ1m6OQhqe37i5yvg0dXAYMuAjugmM=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -410,6 +412,7 @@ golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
13 changes: 11 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
"github.com/submariner-io/submariner/pkg/cableengine"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
"github.com/submariner-io/submariner/pkg/cableengine/syncer"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/controllers/datastoresyncer"
Expand Down Expand Up @@ -110,18 +111,26 @@ func main() {
submSpec.CableDriver = strings.ToLower(submSpec.CableDriver)

localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, map[string]string{}, submSpec.NatEnabled,
localSubnets, util.GetLocalIP())
localSubnets, util.GetLocalIP(), submSpec.ClusterCidr)

if err != nil {
klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err)
}

latencyMap := new(sync.Map)
cableEngine := cableengine.NewEngine(localCluster, localEndpoint)

if len(submSpec.GlobalCidr) == 0 {
cableHealthchecker := healthchecker.NewHealthChecker(localEndpoint.Spec.Hostname,
submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace), latencyMap)

cableHealthchecker.Run(stopCh)
}

cableEngineSyncer := syncer.NewGatewaySyncer(
cableEngine,
submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace),
VERSION)
VERSION, latencyMap)

cableEngineSyncer.Run(stopCh)

Expand Down
155 changes: 155 additions & 0 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package healthchecker

import (
"sync"
"time"

"github.com/go-ping/ping"
v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)

var HealthCheckInterval = 15 * time.Second

type HealthChecker struct {
localEndpointName string
client v1typed.GatewayInterface
latencyMap *sync.Map
}

type LatencyInfo struct {
Status v1.ConnectionStatus
Latency *v1.LatencySpec
}

// NewEngine creates a new Engine for the local cluster
func NewHealthChecker(localEndpointName string, client v1typed.GatewayInterface,
latencyMap *sync.Map) *HealthChecker {
return &HealthChecker{
localEndpointName: localEndpointName,
client: client,
latencyMap: latencyMap,
}
}

func (h *HealthChecker) Run(stopCh <-chan struct{}) {
go func() {
wait.Until(h.checkRemoteGatewayHealth, HealthCheckInterval, stopCh)
}()

klog.Infof("CableEngine healthchecker started")
}
func (h *HealthChecker) checkRemoteGatewayHealth() {
existingGw, err := h.client.Get(h.localEndpointName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(2).Infof("Gateway object not found: %v", err)
return
} else if err != nil {
klog.Errorf("Error retrieving the gateway object: %v", err)
return
}

if len(existingGw.Status.Connections) == 0 {
klog.V(2).Info("No endpoints present in the gateway")
return
}

deleteMap := copyLatencyMap(h.latencyMap)
var wg sync.WaitGroup

for _, endPointInfo := range existingGw.Status.Connections {
wg.Add(1)

hostName := endPointInfo.Endpoint.Hostname
healthCheckIp := endPointInfo.Endpoint.HealthCheckIP

// Remove the entry that is found in the gateway connection status
delete(deleteMap, hostName)

go func() {
latencyInfo := h.sendPing(&wg, hostName, healthCheckIp)
if latencyInfo != nil {
h.latencyMap.Store(hostName, latencyInfo)
}
}()
}

wg.Wait()

for k := range deleteMap {
// Remove the entry that are missing in the gateway connection status from the
// latencymap
h.latencyMap.Delete(k)
}
}

func copyLatencyMap(latencyMap *sync.Map) map[string]*LatencyInfo {
newMap := make(map[string]*LatencyInfo)

latencyMap.Range(func(key, val interface{}) bool {
if val != nil {
newMap[key.(string)] = val.(*LatencyInfo)
}
return true
})

return newMap
}

func (h *HealthChecker) sendPing(wg *sync.WaitGroup, remoteHostName, healthCheckIp string) *LatencyInfo {
defer wg.Done()
var lastRtt time.Duration

pinger, err := ping.NewPinger(healthCheckIp)
if err != nil {
klog.Errorf("Creating pinger for hostname %v to the ip %v failed due to %v", remoteHostName, healthCheckIp, err)
return nil
}

pinger.Count = 3
pinger.SetPrivileged(true)
pinger.Timeout = 3 * time.Second

pinger.OnRecv = func(packet *ping.Packet) {
lastRtt = packet.Rtt
}
var latencyinfo *LatencyInfo

pinger.OnFinish = func(stats *ping.Statistics) {
if stats.PacketLoss == 100 {
latencySpec := &v1.LatencySpec{
LastRTT: 0,
MinRTT: 0,
AverageRTT: 0,
MaxRTT: 0,
StdDevRTT: 0,
}
latencyinfo = &LatencyInfo{
Status: v1.ConnectionError,
Latency: latencySpec,
}
} else {
latencySpec := &v1.LatencySpec{
LastRTT: float64(lastRtt.Microseconds()) / 1000,
MinRTT: float64(stats.MinRtt.Microseconds()) / 1000,
AverageRTT: float64(stats.AvgRtt.Microseconds()) / 1000,
MaxRTT: float64(stats.MaxRtt.Microseconds()) / 1000,
StdDevRTT: float64(stats.StdDevRtt.Microseconds()) / 1000,
}
latencyinfo = &LatencyInfo{
Status: v1.Connected,
Latency: latencySpec,
}
}
}
err = pinger.Run()
if err != nil {
klog.Errorf("Ping to %q with ip %q failed due to %v", remoteHostName, healthCheckIp, err)
}

return latencyinfo
}
24 changes: 20 additions & 4 deletions pkg/cableengine/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/submariner-io/admiral/pkg/log"
v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cableengine"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1"
)

Expand All @@ -26,6 +27,7 @@ type GatewaySyncer struct {
engine cableengine.Engine
version string
statusError error
latencyMap *sync.Map
}

var GatewayUpdateInterval = 5 * time.Second
Expand All @@ -44,11 +46,12 @@ func init() {

// NewEngine creates a new Engine for the local cluster
func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface,
version string) *GatewaySyncer {
version string, latencyMap *sync.Map) *GatewaySyncer {
return &GatewaySyncer{
client: client,
engine: engine,
version: version,
client: client,
engine: engine,
version: version,
latencyMap: latencyMap,
}
}

Expand Down Expand Up @@ -201,6 +204,19 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway {
}

if connections != nil {
for index, connection := range *connections {
latencySpec, ok := i.latencyMap.Load(connection.Endpoint.Hostname)
if ok {
latencyInfo := latencySpec.(*healthchecker.LatencyInfo)
connection.Latency = latencyInfo.Latency
if connection.Status == v1.Connected && latencyInfo.Status != v1.Connected {
connection.Status = latencyInfo.Status
}

(*connections)[index] = connection
}
}

gateway.Status.Connections = *connections
} else {
gateway.Status.Connections = []v1.Connection{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cableengine/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer_test
import (
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -337,7 +338,7 @@ func (t *testDriver) run() {

client := fakeClientset.NewSimpleClientset()
t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace)
t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version)
t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, new(sync.Map))

informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0)
informer := informerFactory.Submariner().V1().Gateways().Informer()
Expand Down
48 changes: 47 additions & 1 deletion pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func FlattenColors(colorCodes []string) string {
}

func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string, natEnabled bool,
subnets []string, privateIP string) (types.SubmarinerEndpoint, error) {
subnets []string, privateIP string, clusterCIDR []string) (types.SubmarinerEndpoint, error) {
hostname, err := os.Hostname()
if err != nil {
return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting hostname: %v", err)
Expand Down Expand Up @@ -114,9 +114,55 @@ func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string
endpoint.Spec.PublicIP = publicIP
}

endpoint.Spec.HealthCheckIP, err = getCNIInterfaceIPAddress(clusterCIDR)

if err != nil {
return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting CNI Interface IP address: %v", err)
}

return endpoint, nil
}

func getCNIInterfaceIPAddress(clusterCIDRs []string) (string, error) {
for _, clusterCIDR := range clusterCIDRs {
_, clusterNetwork, err := net.ParseCIDR(clusterCIDR)
if err != nil {
return "", fmt.Errorf("unable to ParseCIDR %q : %v", clusterCIDR, err)
}

hostInterfaces, err := net.Interfaces()
if err != nil {
return "", fmt.Errorf("net.Interfaces() returned error : %v", err)
}

for _, iface := range hostInterfaces {
addrs, err := iface.Addrs()
if err != nil {
return "", fmt.Errorf("for interface %q, iface.Addrs returned error: %v", iface.Name, err)
}

for i := range addrs {
ipAddr, _, err := net.ParseCIDR(addrs[i].String())
if err != nil {
klog.Errorf("Unable to ParseCIDR : %q", addrs[i].String())
} else if ipAddr.To4() != nil {
klog.V(level.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr)
address := net.ParseIP(ipAddr.String())

// Verify that interface has an address from cluster CIDR
if clusterNetwork.Contains(address) {
klog.V(level.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q",
iface.Name, ipAddr.String(), clusterCIDR)
return ipAddr.String(), nil
}
}
}
}
}

return "", fmt.Errorf("unable to find CNI Interface on the host which has IP from %q", clusterCIDRs)
}

func GetClusterIDFromCableName(cableName string) string {
// length is 11
// 0 1 2 3 4 5 6 7 8 9 10
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func testFlattenColors() {

func testGetLocalEndpoint() {
It("should return a valid SubmarinerEndpoint object", func() {
subnets := []string{"1.2.3.4/16"}
privateIP := "1.2.3.4"
endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP)
subnets := []string{"127.0.0.1/16"}
privateIP := "127.0.0.1"
endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP, subnets)

Expect(err).ToNot(HaveOccurred())
Expect(endpoint.Spec.ClusterID).To(Equal("east"))
Expand All @@ -80,6 +80,7 @@ func testGetLocalEndpoint() {
Expect(endpoint.Spec.Backend).To(Equal("backend"))
Expect(endpoint.Spec.Subnets).To(Equal(subnets))
Expect(endpoint.Spec.NATEnabled).To(Equal(false))
Expect(endpoint.Spec.HealthCheckIP).To(Equal(privateIP))
})
}

Expand Down

0 comments on commit 3ae50a6

Please sign in to comment.