Skip to content

Commit

Permalink
Merge pull request #197 from MUzairS15/uzair/refactor/controllers
Browse files Browse the repository at this point in the history
Refactor controller status logic
  • Loading branch information
Revolyssup authored Oct 4, 2022
2 parents 5aa4cb7 + 7f66845 commit 7142f4d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 14 deletions.
15 changes: 15 additions & 0 deletions models/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@ const (
NotDeployed //The controller is not deployed yet
Undeployed //The controller has been intentionally undeployed. This state is useful to avoid automatic redeployment.
// we don't know since we have not checked yet
Enabled
Running
Connected
Unknown
)

const (
MeshSync = "meshsync"
MesheryBroker = "meshery-broker"
MesheryServer = "meshery-server"
)

func (mcs MesheryControllerStatus) String() string {
switch mcs {
case Deployed:
Expand All @@ -21,6 +30,12 @@ func (mcs MesheryControllerStatus) String() string {
return "Not Deployed"
case Undeployed:
return "Undeployed"
case Enabled:
return "Enabled"
case Running:
return "Running"
case Connected:
return "Connected"
case Unknown:
return "Unknown"
}
Expand Down
43 changes: 43 additions & 0 deletions models/controllers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package controllers

import (
"encoding/json"
"fmt"
"io"
"strconv"
"strings"

"net/http"
"net/url"

"github.com/layer5io/meshery-operator/api/v1alpha1"
Expand All @@ -13,6 +16,16 @@ import (
"github.com/spf13/viper"
)

const BrokerPingEndpoint = "8222/connz"

type Connections struct {
Connections []connection `json:"connections"`
}

type connection struct {
Name string `json:"name"`
}

func GetBrokerEndpoint(kclient *mesherykube.Client, broker *v1alpha1.Broker) string {
endpoint := broker.Status.Endpoint.Internal
if len(strings.Split(broker.Status.Endpoint.Internal, ":")) > 1 {
Expand Down Expand Up @@ -78,3 +91,33 @@ func applyOperatorHelmChart(chartRepo string, client mesherykube.Client, meshery
}
return nil
}

func ConnectivityTest(clientName, externalIP string) bool {
endpoint, err := url.Parse("http://" + externalIP + ":" + BrokerPingEndpoint)
if err != nil {
return false
}

resp, err := http.Get(endpoint.String())
if err != nil {
return false
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return false
}

var natsResponse Connections
err = json.Unmarshal(body, &natsResponse)
if err != nil {
return false
}

for _, client := range natsResponse.Connections {
if client.Name == clientName {
return true
}
}
return false
}
42 changes: 34 additions & 8 deletions models/controllers/meshery_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package controllers
import (
"context"
"fmt"
"strings"

opClient "github.com/layer5io/meshery-operator/pkg/client"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
v1 "k8s.io/api/core/v1"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -16,13 +18,15 @@ type mesheryBroker struct {
name string
status MesheryControllerStatus
kclient *mesherykube.Client
version string
}

func NewMesheryBrokerHandler(kubernetesClient *mesherykube.Client) IMesheryController {
return &mesheryBroker{
name: "MesheryBroker",
status: Unknown,
kclient: kubernetesClient,
version: "",
}
}

Expand All @@ -38,25 +42,27 @@ func (mb *mesheryBroker) GetStatus() MesheryControllerStatus {
// TODO: Confirm if the presence of operator is needed to use the operator client sdk
broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{})
if err == nil {
if broker.Status.Endpoint.External != "" {
mb.status = Deployed
brokerEndpoint := broker.Status.Endpoint.External
hostIP := strings.Split(brokerEndpoint, ":")[0]
if broker.Status.Endpoint.External != "" && ConnectivityTest(MesheryServer, hostIP) {
mb.status = Connected
return mb.status
}
mb.status = NotDeployed
mb.status = Deployed
return mb.status
} else {
if kubeerror.IsNotFound(err) {
if mb.status != Undeployed {
mb.status = NotDeployed
mb.status = Undeployed
}
return mb.status
}
// when operatorClient is not able to get meshesry-broker, we try again with kubernetes client as a fallback
broker, err := mb.kclient.DynamicKubeClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}).Namespace("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{})
broker, err := mb.kclient.DynamicKubeClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}).Namespace("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if err != nil {
// if the resource is not found, then it is NotDeployed
if kubeerror.IsNotFound(err) {
mb.status = NotDeployed
mb.status = Undeployed
return mb.status
}
return Unknown
Expand Down Expand Up @@ -93,7 +99,7 @@ func (mb *mesheryBroker) GetPublicEndpoint() (string, error) {
if err != nil {
return "", ErrGetControllerPublicEndpoint(err)
}
broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{})
broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if broker.Status.Endpoint.External == "" {
if err == nil {
err = fmt.Errorf("Could not get the External endpoint for meshery-broker")
Expand All @@ -106,5 +112,25 @@ func (mb *mesheryBroker) GetPublicEndpoint() (string, error) {
}

func (mb *mesheryBroker) GetVersion() (string, error) {
return "", nil
if len(mb.version) == 0 {
statefulSet, err := mb.kclient.KubeClient.AppsV1().StatefulSets("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{})
if kubeerror.IsNotFound(err) {
return "", err
}
return getImageVersionOfContainer(statefulSet.Spec.Template, "nats"), nil
}
return mb.version, nil
}

func getImageVersionOfContainer(container v1.PodTemplateSpec, containerName string) string {
var version string
for _, container := range container.Spec.Containers {
if strings.Compare(container.Name, containerName) == 0 {
versionTag := strings.Split(container.Image, ":")
if len(versionTag) > 1 {
version = versionTag[1]
}
}
}
return version
}
34 changes: 28 additions & 6 deletions models/controllers/meshsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package controllers

import (
"context"
"strings"

// opClient "github.com/layer5io/meshery-operator/pkg/client"
opClient "github.com/layer5io/meshery-operator/pkg/client"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
v1 "k8s.io/api/core/v1"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -33,18 +35,38 @@ func (ms *meshsync) GetName() string {
func (ms *meshsync) GetStatus() MesheryControllerStatus {
operatorClient, _ := opClient.New(&ms.kclient.RestConfig)
// TODO: Confirm if the presence of operator is needed to use the operator client sdk
meshSync, err := operatorClient.CoreV1Alpha1().MeshSyncs("meshery").Get(context.TODO(), "meshery-meshsync", metav1.GetOptions{})
_, err := operatorClient.CoreV1Alpha1().MeshSyncs("meshery").Get(context.TODO(), "meshery-meshsync", metav1.GetOptions{})
if err == nil {
if meshSync.Status.PublishingTo != "" {
ms.status = Enabled
meshSyncPod, err := ms.kclient.KubeClient.CoreV1().Pods("meshery").List(context.TODO(), metav1.ListOptions{
LabelSelector: "component=meshsync",
})
if len(meshSyncPod.Items) == 0 || kubeerror.IsNotFound(err) {
return ms.status
}

switch meshSyncPod.Items[0].Status.Phase {
case v1.PodRunning:
ms.status = Running
broker := NewMesheryBrokerHandler(ms.kclient)
brokerEndpoint, err := broker.GetPublicEndpoint()
if err != nil {
return ms.status
}
hostIP := strings.Split(brokerEndpoint, ":")[0]
isConnected := ConnectivityTest(MeshSync, hostIP)
if isConnected {
ms.status = Connected
}
return ms.status
default:
ms.status = Deployed
return ms.status
}
ms.status = NotDeployed
return ms.status
} else {
if kubeerror.IsNotFound(err) {
if ms.status != Undeployed {
ms.status = NotDeployed
ms.status = Undeployed
}
return ms.status
}
Expand All @@ -53,7 +75,7 @@ func (ms *meshsync) GetStatus() MesheryControllerStatus {
if err != nil {
// if the resource is not found, then it is NotDeployed
if kubeerror.IsNotFound(err) {
ms.status = NotDeployed
ms.status = Undeployed
return ms.status
}
return Unknown
Expand Down

0 comments on commit 7142f4d

Please sign in to comment.