From 4287bb352cd8543678f233ddd03975ac98dcab1c Mon Sep 17 00:00:00 2001 From: Aswin Suryanarayanan Date: Thu, 26 Nov 2020 10:53:31 +0530 Subject: [PATCH] Add health checker implementation (#938) * Add health checker implementation *Populate the healthchecker Ip *Ping the healthchecker IP to check the remote gateway status *Update the gateway satus if the ping fails Fixes: submariner-io/submariner#821 Signed-off-by: Aswin Surayanarayanan Co-authored-by: Thomas Pantelis --- go.mod | 2 +- go.sum | 7 +- main.go | 29 +++- .../healthchecker/healthchecker.go | 132 ++++++++++++++++++ .../healthchecker/healthchecker_suite_test.go | 13 ++ pkg/cableengine/healthchecker/pinger.go | 81 +++++++++++ pkg/cableengine/healthchecker/statistics.go | 63 +++++++++ .../healthchecker/statistics_test.go | 54 +++++++ pkg/cableengine/syncer/syncer.go | 34 +++-- pkg/cableengine/syncer/syncer_test.go | 22 ++- pkg/util/util.go | 49 ++++++- pkg/util/util_test.go | 6 +- 12 files changed, 467 insertions(+), 25 deletions(-) create mode 100644 pkg/cableengine/healthchecker/healthchecker.go create mode 100644 pkg/cableengine/healthchecker/healthchecker_suite_test.go create mode 100644 pkg/cableengine/healthchecker/pinger.go create mode 100644 pkg/cableengine/healthchecker/statistics.go create mode 100644 pkg/cableengine/healthchecker/statistics_test.go diff --git a/go.mod b/go.mod index 3fdb2d97..11bac824 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/bronze1man/goStrongswanVici v0.0.0-20190921045355-4c81bd8d0bd5 github.com/coreos/go-iptables v0.4.5 github.com/go-logr/zapr v0.1.1 // indirect + github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a github.com/imdario/mergo v0.3.9 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/kelseyhightower/envconfig v1.4.0 github.com/onsi/ginkgo v1.14.2 github.com/onsi/gomega v1.10.3 diff --git a/go.sum b/go.sum index 599851ad..f1642c34 100644 --- a/go.sum +++ b/go.sum @@ -92,7 +92,6 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/ebay/go-ovn v0.1.0 h1:IxmpGJsp0SrsBrabCUCV1/xbQRNGUR5LeRbgkDcpIAs= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20200426045556-49ad98f6dac1 h1:TEmChtx8+IeOghiySC8kQIr0JZOdKUmRmmkuRDuYs3E= @@ -171,6 +170,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= +github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a h1:O9xspHB2yrvKfMQ1m6OQhqe37i5yvg0dXAYMuAjugmM= +github.com/go-ping/ping v0.0.0-20201022122018-3977ed72668a/go.mod h1:35JbSyV/BYqHwwRA6Zr1uVDm1637YlNOU61wI797NPI= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -480,8 +481,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/submariner-io/admiral v0.7.1-0.20201105164647-156433d6fe3e h1:R26aMi94rSIkSpXBDvKPSfs6bwUbh1tFiw5CkFiB+z0= -github.com/submariner-io/admiral v0.7.1-0.20201105164647-156433d6fe3e/go.mod h1:CB0bBubRoDoYYJTpjAww+VwloKfLRU4U0roevyXkrXk= github.com/submariner-io/admiral v0.7.1-0.20201113155402-50bbbbc388cf h1:GVvrpEx82lqv/gUV8vr8gVB6LUJKQqWJQ7isa7mz0Ec= github.com/submariner-io/admiral v0.7.1-0.20201113155402-50bbbbc388cf/go.mod h1:CB0bBubRoDoYYJTpjAww+VwloKfLRU4U0roevyXkrXk= github.com/submariner-io/shipyard v0.7.2 h1:jlg8AHfBkAqWKJXyby1VEBN1aCipDgwfCvBoWr5Qb6M= @@ -578,6 +577,7 @@ golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -664,6 +664,7 @@ golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index f25662f4..ac33b94c 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( subv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/cable" "github.com/submariner-io/submariner/pkg/cableengine" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" "github.com/submariner-io/submariner/pkg/cableengine/syncer" submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned" "github.com/submariner-io/submariner/pkg/controllers/datastoresyncer" @@ -109,7 +110,7 @@ func main() { submSpec.CableDriver = strings.ToLower(submSpec.CableDriver) localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, nil, submSpec.NatEnabled, - localSubnets, util.GetLocalIP()) + localSubnets, util.GetLocalIP(), submSpec.ClusterCidr) if err != nil { klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err) @@ -117,18 +118,32 @@ func main() { cableEngine := cableengine.NewEngine(localCluster, localEndpoint) + err = subv1.AddToScheme(scheme.Scheme) + if err != nil { + klog.Errorf("Error adding submariner types to the scheme: %v", err) + } + + var cableHealthchecker healthchecker.Interface + if len(submSpec.GlobalCidr) == 0 { + cableHealthchecker, err = healthchecker.New(&watcher.Config{RestConfig: cfg}, submSpec.Namespace, submSpec.ClusterID) + if err != nil { + klog.Errorf("Error creating healthChecker: %v", err) + } + + err = cableHealthchecker.Start(stopCh) + + if err != nil { + klog.Errorf("Error starting healthChecker: %v", err) + } + } + cableEngineSyncer := syncer.NewGatewaySyncer( cableEngine, submarinerClient.SubmarinerV1().Gateways(submSpec.Namespace), - VERSION) + VERSION, cableHealthchecker) cableEngineSyncer.Run(stopCh) - err = subv1.AddToScheme(scheme.Scheme) - if err != nil { - fatal(cableEngineSyncer, "Error adding submariner types to the scheme: %v", err) - } - becameLeader := func(context.Context) { klog.Info("Creating the datastore syncer") diff --git a/pkg/cableengine/healthchecker/healthchecker.go b/pkg/cableengine/healthchecker/healthchecker.go new file mode 100644 index 00000000..f1063bf5 --- /dev/null +++ b/pkg/cableengine/healthchecker/healthchecker.go @@ -0,0 +1,132 @@ +package healthchecker + +import ( + "sync" + + "github.com/submariner-io/admiral/pkg/log" + "github.com/submariner-io/admiral/pkg/watcher" + submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" +) + +type LatencyInfo struct { + ConnectionError string + Spec *submarinerv1.LatencySpec +} + +type Interface interface { + Start(stopCh <-chan struct{}) error + + GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo +} + +type controller struct { + endpointWatcher watcher.Interface + pingers sync.Map + clusterID string +} + +func New(config *watcher.Config, endpointNameSpace, clusterID string) (Interface, error) { + controller := &controller{ + clusterID: clusterID, + } + config.ResourceConfigs = []watcher.ResourceConfig{ + { + Name: "HealthChecker Endpoint Controller", + ResourceType: &submarinerv1.Endpoint{}, + Handler: watcher.EventHandlerFuncs{ + OnCreateFunc: controller.endpointCreatedorUpdated, + OnUpdateFunc: controller.endpointCreatedorUpdated, + OnDeleteFunc: controller.endpointDeleted, + }, + SourceNamespace: endpointNameSpace, + }, + } + + endpointWatcher, err := watcher.New(config) + + if err != nil { + return nil, err + } + + controller.endpointWatcher = endpointWatcher + + return controller, nil +} + +func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo { + if obj, found := h.pingers.Load(endpoint.CableName); found { + pinger := obj.(*pingerInfo) + + return &LatencyInfo{ + ConnectionError: pinger.failureMsg, + Spec: &submarinerv1.LatencySpec{ + LastRTT: pinger.statistics.lastRtt, + MinRTT: pinger.statistics.minRtt, + AverageRTT: pinger.statistics.mean, + MaxRTT: pinger.statistics.maxRtt, + StdDevRTT: pinger.statistics.stdDev, + }, + } + } + + return nil +} + +func (h *controller) Start(stopCh <-chan struct{}) error { + if err := h.endpointWatcher.Start(stopCh); err != nil { + return err + } + + return nil +} + +func (h *controller) endpointCreatedorUpdated(obj runtime.Object) bool { + klog.V(log.TRACE).Infof("Endpoint created: %#v", obj) + endpointCreated := obj.(*submarinerv1.Endpoint) + if endpointCreated.Spec.ClusterID == h.clusterID { + return false + } + + if endpointCreated.Spec.HealthCheckIP == "" || endpointCreated.Spec.CableName == "" { + klog.Infof("HealthCheckIP (%q) and/or CableName (%q) for Endpoint %q empty - will not monitor endpoint health", + endpointCreated.Spec.HealthCheckIP, endpointCreated.Spec.CableName, endpointCreated.Name) + return false + } + + if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found { + pinger := obj.(*pingerInfo) + if pinger.healthCheckIP == endpointCreated.Spec.HealthCheckIP { + return false + } + + klog.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name) + pinger.stop() + h.pingers.Delete(endpointCreated.Spec.CableName) + } + + klog.V(log.TRACE).Infof("Starting Pinger for CableName: %q, with HealthCheckIP: %q", + endpointCreated.Spec.CableName, endpointCreated.Spec.HealthCheckIP) + + pinger := newPinger(endpointCreated.Spec.HealthCheckIP) + h.pingers.Store(endpointCreated.Spec.CableName, pinger) + pinger.start() + + return false +} + +func (h *controller) endpointDeleted(obj runtime.Object) bool { + endpointDeleted := obj.(*submarinerv1.Endpoint) + if endpointDeleted.Spec.CableName == "" { + return false + } + + if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found { + pinger := obj.(*pingerInfo) + pinger.stop() + h.pingers.Delete(endpointDeleted.Spec.CableName) + } + + return false +} diff --git a/pkg/cableengine/healthchecker/healthchecker_suite_test.go b/pkg/cableengine/healthchecker/healthchecker_suite_test.go new file mode 100644 index 00000000..2fae9fc8 --- /dev/null +++ b/pkg/cableengine/healthchecker/healthchecker_suite_test.go @@ -0,0 +1,13 @@ +package healthchecker + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestHealthChecker(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Health Checker Suite") +} diff --git a/pkg/cableengine/healthchecker/pinger.go b/pkg/cableengine/healthchecker/pinger.go new file mode 100644 index 00000000..4c3a93e6 --- /dev/null +++ b/pkg/cableengine/healthchecker/pinger.go @@ -0,0 +1,81 @@ +package healthchecker + +import ( + "fmt" + "time" + + "github.com/go-ping/ping" + "k8s.io/klog" +) + +var waitTime time.Duration = 15 * time.Second +var timeout = 3 * time.Second + +// The RTT will be stored and will be used to calculate the statistics until +// the size is reached. Once the size is reached the array will be reset and +// the last elements will be added to the array for statistics. +var size uint64 = 1000 + +type pingerInfo struct { + healthCheckIP string + statistics statistics + failureMsg string + stopCh chan struct{} +} + +func newPinger(healthCheckIP string) *pingerInfo { + return &pingerInfo{ + healthCheckIP: healthCheckIP, + statistics: statistics{ + size: size, + previousRtts: make([]uint64, size), + }, + stopCh: make(chan struct{}), + } +} + +func (p *pingerInfo) start() { + go func() { + for { + select { + case <-p.stopCh: + return + case <-time.After(waitTime): + p.sendPing() + } + } + }() + klog.Infof("CableEngine HealthChecker started pinger for IP %q", p.healthCheckIP) +} + +func (p *pingerInfo) stop() { + close(p.stopCh) +} + +func (p *pingerInfo) sendPing() { + pinger, err := ping.NewPinger(p.healthCheckIP) + if err != nil { + klog.Errorf("Error creating pinger for IP %q: %v", p.healthCheckIP, err) + return + } + + pinger.SetPrivileged(true) + pinger.RecordRtts = false + // After 3 seconds stop waiting. + pinger.Timeout = timeout + + pinger.OnRecv = func(packet *ping.Packet) { + p.failureMsg = "" + p.statistics.update(uint64(packet.Rtt.Nanoseconds())) + } + + pinger.OnFinish = func(stats *ping.Statistics) { + // Since we are setting a timeout and not a count, it will be an endless ping. + // If the timeout is reached with no successful packets, onFinish will be called and it is a failed ping. + p.failureMsg = fmt.Sprintf("Failed to successfully ping the remote endpoint IP %q", p.healthCheckIP) + } + err = pinger.Run() + if err != nil { + klog.Errorf("Error running ping for the remote endpoint IP %q: %v", p.healthCheckIP, err) + } +} diff --git a/pkg/cableengine/healthchecker/statistics.go b/pkg/cableengine/healthchecker/statistics.go new file mode 100644 index 00000000..007fed2d --- /dev/null +++ b/pkg/cableengine/healthchecker/statistics.go @@ -0,0 +1,63 @@ +package healthchecker + +import ( + "math" +) + +type statistics struct { + previousRtts []uint64 + sum uint64 + mean uint64 + stdDev uint64 + lastRtt uint64 + minRtt uint64 + maxRtt uint64 + sqrDiff uint64 + index uint64 + size uint64 +} + +func (s *statistics) update(rtt uint64) { + s.lastRtt = rtt + + // TODO Take more samples while resetting, for example samples in last 2 hours + if s.index == s.size { + // Resetting since the incremental SD calculated have an error factor due to truncation which + // could be significant as count increases. + s.index = 2 + s.previousRtts[0] = s.previousRtts[s.size-2] + s.previousRtts[1] = s.previousRtts[s.size-1] + s.sum = s.previousRtts[0] + s.previousRtts[1] + s.mean = s.sum / 2 + s.sqrDiff = uint64((int64(s.previousRtts[0]-s.mean))*(int64(s.previousRtts[0]-s.mean)) + + (int64(s.previousRtts[1]-s.mean))*(int64(s.previousRtts[1]-s.mean))) + } + + if (s.index + 1) > 1 { + s.previousRtts[s.index] = rtt + if s.minRtt == 0 || s.minRtt > rtt { + s.minRtt = rtt + } + + if s.maxRtt < rtt { + s.maxRtt = rtt + } + + s.sum += rtt + oldMean := s.mean + s.mean = s.sum / (s.index + 1) + s.sqrDiff += uint64(((int64(rtt - oldMean)) * int64((rtt - s.mean)))) + s.stdDev = uint64(math.Sqrt(float64(s.sqrDiff / (s.index + 1)))) + } else { + s.sum = rtt + s.sqrDiff = 0 + s.minRtt = rtt + s.maxRtt = rtt + s.mean = rtt + s.sum = rtt + s.previousRtts[s.index] = rtt + s.stdDev = 0 + } + + s.index++ +} diff --git a/pkg/cableengine/healthchecker/statistics_test.go b/pkg/cableengine/healthchecker/statistics_test.go new file mode 100644 index 00000000..ca90869e --- /dev/null +++ b/pkg/cableengine/healthchecker/statistics_test.go @@ -0,0 +1,54 @@ +package healthchecker + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Test Statistics", func() { + const ( + testMinRTT = 404351 + testMaxRTT = 1048263 + testLastRTT = 1044609 + testNewMinRTT = 404300 + testNewMaxRTT = 1048264 + testNewLastRTT = 609555 + ) + + When("update is called with a sample space", func() { + It("should correctly compute the statistics", func() { + size := 10 + statistics := &statistics{ + size: uint64(size), + previousRtts: make([]uint64, size), + } + + sampleSpace := [10]uint64{testMinRTT, 490406, 530333, 609556, 609650, 685106, 726265, 785707, testMaxRTT, testLastRTT} + expectedMean := 693424 + expectedSD := 205994 + + for _, v := range sampleSpace { + statistics.update(v) + } + + Expect(statistics.maxRtt).To(Equal(uint64(testMaxRTT))) + Expect(statistics.minRtt).To(Equal(uint64(testMinRTT))) + Expect(statistics.lastRtt).To(Equal(uint64(testLastRTT))) + Expect(statistics.mean).To(Equal(uint64(expectedMean))) + Expect(statistics.stdDev).To(Equal(uint64(expectedSD))) + + statistics.update(testNewMinRTT) + statistics.update(testNewMaxRTT) + statistics.update(testNewLastRTT) + + newExpectedMean := 830998 + newExpectedSD := 272450 + + Expect(statistics.maxRtt).To(Equal(uint64(testNewMaxRTT))) + Expect(statistics.minRtt).To(Equal(uint64(testNewMinRTT))) + Expect(statistics.lastRtt).To(Equal(uint64(testNewLastRTT))) + Expect(statistics.mean).To(Equal(uint64(newExpectedMean))) + Expect(statistics.stdDev).To(Equal(uint64(newExpectedSD))) + }) + }) +}) diff --git a/pkg/cableengine/syncer/syncer.go b/pkg/cableengine/syncer/syncer.go index e25df744..80ba04a2 100644 --- a/pkg/cableengine/syncer/syncer.go +++ b/pkg/cableengine/syncer/syncer.go @@ -8,16 +8,16 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/submariner-io/admiral/pkg/log" + v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/cableengine" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" + v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" - - "github.com/submariner-io/admiral/pkg/log" - v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" - "github.com/submariner-io/submariner/pkg/cableengine" - v1typed "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1" ) type GatewaySyncer struct { @@ -26,6 +26,7 @@ type GatewaySyncer struct { engine cableengine.Engine version string statusError error + healthCheck healthchecker.Interface } var GatewayUpdateInterval = 5 * time.Second @@ -44,11 +45,12 @@ func init() { // NewEngine creates a new Engine for the local cluster func NewGatewaySyncer(engine cableengine.Engine, client v1typed.GatewayInterface, - version string) *GatewaySyncer { + version string, healthCheck healthchecker.Interface) *GatewaySyncer { return &GatewaySyncer{ - client: client, - engine: engine, - version: version, + client: client, + engine: engine, + version: version, + healthCheck: healthCheck, } } @@ -201,6 +203,20 @@ func (i *GatewaySyncer) generateGatewayObject() *v1.Gateway { } if connections != nil { + if i.healthCheck != nil { + for index := range *connections { + connection := &(*connections)[index] + latencyInfo := i.healthCheck.GetLatencyInfo(&connection.Endpoint) + if latencyInfo != nil { + connection.Latency = latencyInfo.Spec + if connection.Status == v1.Connected && latencyInfo.ConnectionError != "" { + connection.Status = v1.ConnectionError + connection.StatusMessage = latencyInfo.ConnectionError + } + } + } + } + gateway.Status.Connections = *connections } else { gateway.Status.Connections = []v1.Connection{} diff --git a/pkg/cableengine/syncer/syncer_test.go b/pkg/cableengine/syncer/syncer_test.go index 9ac672fb..78c6fab9 100644 --- a/pkg/cableengine/syncer/syncer_test.go +++ b/pkg/cableengine/syncer/syncer_test.go @@ -10,15 +10,21 @@ import ( . "github.com/onsi/gomega" "github.com/pkg/errors" . "github.com/submariner-io/admiral/pkg/gomega" + "github.com/submariner-io/admiral/pkg/syncer/test" + "github.com/submariner-io/admiral/pkg/watcher" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" fakeEngine "github.com/submariner-io/submariner/pkg/cableengine/fake" + "github.com/submariner-io/submariner/pkg/cableengine/healthchecker" "github.com/submariner-io/submariner/pkg/cableengine/syncer" fakeClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned/fake" fakeClientsetv1 "github.com/submariner-io/submariner/pkg/client/clientset/versioned/typed/submariner.io/v1/fake" submarinerInformers "github.com/submariner-io/submariner/pkg/client/informers/externalversions" "github.com/submariner-io/submariner/pkg/types" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + fakeClient "k8s.io/client-go/dynamic/fake" + kubeScheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/klog" ) @@ -283,6 +289,7 @@ type testDriver struct { engine *fakeEngine.Engine gateways *fakeClientsetv1.FailingGateways syncer *syncer.GatewaySyncer + healthChecker healthchecker.Interface expectedGateway *submarinerv1.Gateway expectedDeletedAfter *submarinerv1.Gateway gatewayUpdated chan *submarinerv1.Gateway @@ -335,9 +342,22 @@ func (t *testDriver) run() { t.handledError <- err }) + Expect(submarinerv1.AddToScheme(kubeScheme.Scheme)).To(Succeed()) + + scheme := runtime.NewScheme() + Expect(submarinerv1.AddToScheme(scheme)).To(Succeed()) + + dynamicClient := fakeClient.NewSimpleDynamicClient(scheme) + restMapper := test.GetRESTMapperFor(&submarinerv1.Endpoint{}) + client := fakeClientset.NewSimpleClientset() t.gateways.GatewayInterface = client.SubmarinerV1().Gateways(namespace) - t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version) + t.healthChecker, _ = healthchecker.New(&watcher.Config{ + RestMapper: restMapper, + Client: dynamicClient, + Scheme: scheme, + }, namespace, "west") + t.syncer = syncer.NewGatewaySyncer(t.engine, t.gateways, t.expectedGateway.Status.Version, t.healthChecker) informerFactory := submarinerInformers.NewSharedInformerFactory(client, 0) informer := informerFactory.Submariner().V1().Gateways().Informer() diff --git a/pkg/util/util.go b/pkg/util/util.go index ce6c4b0d..4a278c7a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -86,7 +86,7 @@ func FlattenColors(colorCodes []string) string { } func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string, natEnabled bool, - subnets []string, privateIP string) (types.SubmarinerEndpoint, error) { + subnets []string, privateIP string, clusterCIDR []string) (types.SubmarinerEndpoint, error) { hostname, err := os.Hostname() if err != nil { return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting hostname: %v", err) @@ -118,9 +118,56 @@ func GetLocalEndpoint(clusterID, backend string, backendConfig map[string]string endpoint.Spec.PublicIP = publicIP } + endpoint.Spec.HealthCheckIP, err = getCNIInterfaceIPAddress(clusterCIDR) + + if err != nil { + return types.SubmarinerEndpoint{}, fmt.Errorf("Error getting CNI Interface IP address: %v", err) + } + return endpoint, nil } +//TODO: to handle de-duplication of code/finding common parts with the route agent +func getCNIInterfaceIPAddress(clusterCIDRs []string) (string, error) { + for _, clusterCIDR := range clusterCIDRs { + _, clusterNetwork, err := net.ParseCIDR(clusterCIDR) + if err != nil { + return "", fmt.Errorf("unable to ParseCIDR %q : %v", clusterCIDR, err) + } + + hostInterfaces, err := net.Interfaces() + if err != nil { + return "", fmt.Errorf("net.Interfaces() returned error : %v", err) + } + + for _, iface := range hostInterfaces { + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("for interface %q, iface.Addrs returned error: %v", iface.Name, err) + } + + for i := range addrs { + ipAddr, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + klog.Errorf("Unable to ParseCIDR : %q", addrs[i].String()) + } else if ipAddr.To4() != nil { + klog.V(level.DEBUG).Infof("Interface %q has %q address", iface.Name, ipAddr) + address := net.ParseIP(ipAddr.String()) + + // Verify that interface has an address from cluster CIDR + if clusterNetwork.Contains(address) { + klog.V(level.DEBUG).Infof("Found CNI Interface %q that has IP %q from ClusterCIDR %q", + iface.Name, ipAddr.String(), clusterCIDR) + return ipAddr.String(), nil + } + } + } + } + } + + return "", fmt.Errorf("unable to find CNI Interface on the host which has IP from %q", clusterCIDRs) +} + func GetClusterIDFromCableName(cableName string) string { // length is 11 // 0 1 2 3 4 5 6 7 8 9 10 diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 669efe19..78877169 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -68,9 +68,9 @@ func testFlattenColors() { func testGetLocalEndpoint() { It("should return a valid SubmarinerEndpoint object", func() { - subnets := []string{"1.2.3.4/16"} - privateIP := "1.2.3.4" - endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP) + subnets := []string{"127.0.0.1/16"} + privateIP := "127.0.0.1" + endpoint, err := util.GetLocalEndpoint("east", "backend", map[string]string{}, false, subnets, privateIP, subnets) Expect(err).ToNot(HaveOccurred()) Expect(endpoint.Spec.ClusterID).To(Equal("east"))