Skip to content

Commit

Permalink
Add health checker implementation (#938)
Browse files Browse the repository at this point in the history
* Add health checker implementation

*Populate the healthchecker Ip
*Ping the healthchecker IP to check the remote
gateway status
*Update the gateway satus if the ping fails

Fixes: submariner-io/submariner#821

Signed-off-by: Aswin Surayanarayanan <[email protected]>
Co-authored-by: Thomas Pantelis <[email protected]>
  • Loading branch information
aswinsuryan and tpantelis authored Nov 26, 2020
1 parent c99f0e8 commit 4287bb3
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/bronze1man/goStrongswanVici v0.0.0-20190921045355-4c81bd8d0bd5
github.com/coreos/go-iptables v0.4.5
github.com/go-logr/zapr v0.1.1 // indirect
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a
github.com/imdario/mergo v0.3.9 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.3
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/ebay/go-ovn v0.1.0 h1:IxmpGJsp0SrsBrabCUCV1/xbQRNGUR5LeRbgkDcpIAs=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1 h1:TEmChtx8+IeOghiySC8kQIr0JZOdKUmRmmkuRDuYs3E=
Expand Down Expand Up @@ -171,6 +170,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a h1:O9xspHB2yrvKfMQ1m6OQhqe37i5yvg0dXAYMuAjugmM=
github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down Expand Up @@ -480,8 +481,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/submariner-io/admiral v0.7.1-0.20201105164647-156433d6fe3e h1:R26aMi94rSIkSpXBDvKPSfs6bwUbh1tFiw5CkFiB+z0=
github.com/submariner-io/admiral v0.7.1-0.20201105164647-156433d6fe3e/go.mod h1:CB0bBubRoDoYYJTpjAww+VwloKfLRU4U0roevyXkrXk=
github.com/submariner-io/admiral v0.7.1-0.20201113155402-50bbbbc388cf h1:GVvrpEx82lqv/gUV8vr8gVB6LUJKQqWJQ7isa7mz0Ec=
github.com/submariner-io/admiral v0.7.1-0.20201113155402-50bbbbc388cf/go.mod h1:CB0bBubRoDoYYJTpjAww+VwloKfLRU4U0roevyXkrXk=
github.com/submariner-io/shipyard v0.7.2 h1:jlg8AHfBkAqWKJXyby1VEBN1aCipDgwfCvBoWr5Qb6M=
Expand Down Expand Up @@ -578,6 +577,7 @@ golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -664,6 +664,7 @@ golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
29 changes: 22 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
"github.com/submariner-io/submariner/pkg/cableengine"
"github.com/submariner-io/submariner/pkg/cableengine/healthchecker"
"github.com/submariner-io/submariner/pkg/cableengine/syncer"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/controllers/datastoresyncer"
Expand Down Expand Up @@ -109,26 +110,40 @@ func main() {
submSpec.CableDriver = strings.ToLower(submSpec.CableDriver)

localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, nil, submSpec.NatEnabled,
localSubnets, util.GetLocalIP())
localSubnets, util.GetLocalIP(), submSpec.ClusterCidr)

if err != nil {
klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err)
}

cableEngine := cableengine.NewEngine(localCluster, localEndpoint)

err = subv1.AddToScheme(scheme.Scheme)
if err != nil {
klog.Errorf("Error adding submariner types to the scheme: %v", err)
}

var cableHealthchecker healthchecker.Interface
if len(submSpec.GlobalCidr) == 0 {
cableHealthchecker, err = healthchecker.New(&watcher.Config{RestConfig: cfg}, submSpec.Namespace, submSpec.ClusterID)
if err != nil {
klog.Errorf("Error creating healthChecker: %v", err)
}

err = cableHealthchecker.Start(stopCh)

if err != nil {
klog.Errorf("Error starting healthChecker: %v", err)
}
}

cableEngineSyncer := syncer.NewGatewaySyncer(
cableEngine,
submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace),
VERSION)
VERSION, cableHealthchecker)

cableEngineSyncer.Run(stopCh)

err = subv1.AddToScheme(scheme.Scheme)
if err != nil {
fatal(cableEngineSyncer, "Error adding submariner types to the scheme: %v", err)
}

becameLeader := func(context.Context) {
klog.Info("Creating the datastore syncer")

Expand Down
132 changes: 132 additions & 0 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package healthchecker

import (
"sync"

"github.com/submariner-io/admiral/pkg/log"
"github.com/submariner-io/admiral/pkg/watcher"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
)

type LatencyInfo struct {
ConnectionError string
Spec *submarinerv1.LatencySpec
}

type Interface interface {
Start(stopCh <-chan struct{}) error

GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo
}

type controller struct {
endpointWatcher watcher.Interface
pingers sync.Map
clusterID string
}

func New(config *watcher.Config, endpointNameSpace, clusterID string) (Interface, error) {
controller := &controller{
clusterID: clusterID,
}
config.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "HealthChecker Endpoint Controller",
ResourceType: &submarinerv1.Endpoint{},
Handler: watcher.EventHandlerFuncs{
OnCreateFunc: controller.endpointCreatedorUpdated,
OnUpdateFunc: controller.endpointCreatedorUpdated,
OnDeleteFunc: controller.endpointDeleted,
},
SourceNamespace: endpointNameSpace,
},
}

endpointWatcher, err := watcher.New(config)

if err != nil {
return nil, err
}

controller.endpointWatcher = endpointWatcher

return controller, nil
}

func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo {
if obj, found := h.pingers.Load(endpoint.CableName); found {
pinger := obj.(*pingerInfo)

return &LatencyInfo{
ConnectionError: pinger.failureMsg,
Spec: &submarinerv1.LatencySpec{
LastRTT: pinger.statistics.lastRtt,
MinRTT: pinger.statistics.minRtt,
AverageRTT: pinger.statistics.mean,
MaxRTT: pinger.statistics.maxRtt,
StdDevRTT: pinger.statistics.stdDev,
},
}
}

return nil
}

func (h *controller) Start(stopCh <-chan struct{}) error {
if err := h.endpointWatcher.Start(stopCh); err != nil {
return err
}

return nil
}

func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool {
klog.V(log.TRACE).Infof("Endpoint created: %#v", obj)
endpointCreated := obj.(*submarinerv1.Endpoint)
if endpointCreated.Spec.ClusterID == h.clusterID {
return false
}

if endpointCreated.Spec.HealthCheckIP == "" || endpointCreated.Spec.CableName == "" {
klog.Infof("HealthCheckIP (%q) and/or CableName (%q) for Endpoint %q empty - will not monitor endpoint health",
endpointCreated.Spec.HealthCheckIP, endpointCreated.Spec.CableName, endpointCreated.Name)
return false
}

if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found {
pinger := obj.(*pingerInfo)
if pinger.healthCheckIP == endpointCreated.Spec.HealthCheckIP {
return false
}

klog.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name)
pinger.stop()
h.pingers.Delete(endpointCreated.Spec.CableName)
}

klog.V(log.TRACE).Infof("Starting Pinger for CableName: %q, with HealthCheckIP: %q",
endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP)

pinger := newPinger(endpointCreated.Spec.HealthCheckIP)
h.pingers.Store(endpointCreated.Spec.CableName, pinger)
pinger.start()

return false
}

func (h *controller) endpointDeleted(obj runtime.Object) bool {
endpointDeleted := obj.(*submarinerv1.Endpoint)
if endpointDeleted.Spec.CableName == "" {
return false
}

if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found {
pinger := obj.(*pingerInfo)
pinger.stop()
h.pingers.Delete(endpointDeleted.Spec.CableName)
}

return false
}
13 changes: 13 additions & 0 deletions pkg/cableengine/healthchecker/healthchecker_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package healthchecker

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestHealthChecker(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Health Checker Suite")
}
81 changes: 81 additions & 0 deletions pkg/cableengine/healthchecker/pinger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package healthchecker

import (
"fmt"
"time"

"github.com/go-ping/ping"
"k8s.io/klog"
)

var waitTime time.Duration = 15 * time.Second
var timeout = 3 * time.Second

// The RTT will be stored and will be used to calculate the statistics until
// the size is reached. Once the size is reached the array will be reset and
// the last elements will be added to the array for statistics.
var size uint64 = 1000

type pingerInfo struct {
healthCheckIP string
statistics statistics
failureMsg string
stopCh chan struct{}
}

func newPinger(healthCheckIP string) *pingerInfo {
return &pingerInfo{
healthCheckIP: healthCheckIP,
statistics: statistics{
size: size,
previousRtts: make([]uint64, size),
},
stopCh: make(chan struct{}),
}
}

func (p *pingerInfo) start() {
go func() {
for {
select {
case <-p.stopCh:
return
case <-time.After(waitTime):
p.sendPing()
}
}
}()
klog.Infof("CableEngine HealthChecker started pinger for IP %q", p.healthCheckIP)
}

func (p *pingerInfo) stop() {
close(p.stopCh)
}

func (p *pingerInfo) sendPing() {
pinger, err := ping.NewPinger(p.healthCheckIP)
if err != nil {
klog.Errorf("Error creating pinger for IP %q: %v", p.healthCheckIP, err)
return
}

pinger.SetPrivileged(true)
pinger.RecordRtts = false
// After 3 seconds stop waiting.
pinger.Timeout = timeout

pinger.OnRecv = func(packet *ping.Packet) {
p.failureMsg = ""
p.statistics.update(uint64(packet.Rtt.Nanoseconds()))
}

pinger.OnFinish = func(stats *ping.Statistics) {
// Since we are setting a timeout and not a count, it will be an endless ping.
// If the timeout is reached with no successful packets, onFinish will be called and it is a failed ping.
p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.healthCheckIP)
}
err = pinger.Run()
if err != nil {
klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.healthCheckIP, err)
}
}
Loading

0 comments on commit 4287bb3

Please sign in to comment.