diff --git a/models/controllers/controllers.go b/models/controllers/controllers.go index 0834d56c..535517bc 100644 --- a/models/controllers/controllers.go +++ b/models/controllers/controllers.go @@ -8,9 +8,18 @@ const ( NotDeployed //The controller is not deployed yet Undeployed //The controller has been intentionally undeployed. This state is useful to avoid automatic redeployment. // we don't know since we have not checked yet + Enabled + Running + Connected Unknown ) +const ( + MeshSync = "meshsync" + MesheryBroker = "meshery-broker" + MesheryServer = "meshery-server" +) + func (mcs MesheryControllerStatus) String() string { switch mcs { case Deployed: @@ -21,6 +30,12 @@ func (mcs MesheryControllerStatus) String() string { return "Not Deployed" case Undeployed: return "Undeployed" + case Enabled: + return "Enabled" + case Running: + return "Running" + case Connected: + return "Connected" case Unknown: return "Unknown" } diff --git a/models/controllers/helpers.go b/models/controllers/helpers.go index 36f7fdfb..d60e3137 100644 --- a/models/controllers/helpers.go +++ b/models/controllers/helpers.go @@ -1,10 +1,13 @@ package controllers import ( + "encoding/json" "fmt" + "io" "strconv" "strings" + "net/http" "net/url" "github.com/layer5io/meshery-operator/api/v1alpha1" @@ -13,6 +16,16 @@ import ( "github.com/spf13/viper" ) +const BrokerPingEndpoint = "8222/connz" + +type Connections struct { + Connections []connection `json:"connections"` +} + +type connection struct { + Name string `json:"name"` +} + func GetBrokerEndpoint(kclient *mesherykube.Client, broker *v1alpha1.Broker) string { endpoint := broker.Status.Endpoint.Internal if len(strings.Split(broker.Status.Endpoint.Internal, ":")) > 1 { @@ -78,3 +91,33 @@ func applyOperatorHelmChart(chartRepo string, client mesherykube.Client, meshery } return nil } + +func ConnectivityTest(clientName, externalIP string) bool { + endpoint, err := url.Parse("http://" + externalIP + ":" + BrokerPingEndpoint) + if err != nil { + return false + } + + resp, err := http.Get(endpoint.String()) + if err != nil { + return false + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return false + } + + var natsResponse Connections + err = json.Unmarshal(body, &natsResponse) + if err != nil { + return false + } + + for _, client := range natsResponse.Connections { + if client.Name == clientName { + return true + } + } + return false +} diff --git a/models/controllers/meshery_broker.go b/models/controllers/meshery_broker.go index 30a4f9c4..09ab1215 100644 --- a/models/controllers/meshery_broker.go +++ b/models/controllers/meshery_broker.go @@ -3,9 +3,11 @@ package controllers import ( "context" "fmt" + "strings" opClient "github.com/layer5io/meshery-operator/pkg/client" mesherykube "github.com/layer5io/meshkit/utils/kubernetes" + v1 "k8s.io/api/core/v1" kubeerror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -16,6 +18,7 @@ type mesheryBroker struct { name string status MesheryControllerStatus kclient *mesherykube.Client + version string } func NewMesheryBrokerHandler(kubernetesClient *mesherykube.Client) IMesheryController { @@ -23,6 +26,7 @@ func NewMesheryBrokerHandler(kubernetesClient *mesherykube.Client) IMesheryContr name: "MesheryBroker", status: Unknown, kclient: kubernetesClient, + version: "", } } @@ -38,25 +42,27 @@ func (mb *mesheryBroker) GetStatus() MesheryControllerStatus { // TODO: Confirm if the presence of operator is needed to use the operator client sdk broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{}) if err == nil { - if broker.Status.Endpoint.External != "" { - mb.status = Deployed + brokerEndpoint := broker.Status.Endpoint.External + hostIP := strings.Split(brokerEndpoint, ":")[0] + if broker.Status.Endpoint.External != "" && ConnectivityTest(MesheryServer, hostIP) { + mb.status = Connected return mb.status } - mb.status = NotDeployed + mb.status = Deployed return mb.status } else { if kubeerror.IsNotFound(err) { if mb.status != Undeployed { - mb.status = NotDeployed + mb.status = Undeployed } return mb.status } // when operatorClient is not able to get meshesry-broker, we try again with kubernetes client as a fallback - broker, err := mb.kclient.DynamicKubeClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}).Namespace("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{}) + broker, err := mb.kclient.DynamicKubeClient.Resource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}).Namespace("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{}) if err != nil { // if the resource is not found, then it is NotDeployed if kubeerror.IsNotFound(err) { - mb.status = NotDeployed + mb.status = Undeployed return mb.status } return Unknown @@ -93,7 +99,7 @@ func (mb *mesheryBroker) GetPublicEndpoint() (string, error) { if err != nil { return "", ErrGetControllerPublicEndpoint(err) } - broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), "meshery-broker", metav1.GetOptions{}) + broker, err := operatorClient.CoreV1Alpha1().Brokers("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{}) if broker.Status.Endpoint.External == "" { if err == nil { err = fmt.Errorf("Could not get the External endpoint for meshery-broker") @@ -106,5 +112,25 @@ func (mb *mesheryBroker) GetPublicEndpoint() (string, error) { } func (mb *mesheryBroker) GetVersion() (string, error) { - return "", nil + if len(mb.version) == 0 { + statefulSet, err := mb.kclient.KubeClient.AppsV1().StatefulSets("meshery").Get(context.TODO(), MesheryBroker, metav1.GetOptions{}) + if kubeerror.IsNotFound(err) { + return "", err + } + return getImageVersionOfContainer(statefulSet.Spec.Template, "nats"), nil + } + return mb.version, nil +} + +func getImageVersionOfContainer(container v1.PodTemplateSpec, containerName string) string { + var version string + for _, container := range container.Spec.Containers { + if strings.Compare(container.Name, containerName) == 0 { + versionTag := strings.Split(container.Image, ":") + if len(versionTag) > 1 { + version = versionTag[1] + } + } + } + return version } diff --git a/models/controllers/meshsync.go b/models/controllers/meshsync.go index 9d9a25e3..fe0f140b 100644 --- a/models/controllers/meshsync.go +++ b/models/controllers/meshsync.go @@ -2,10 +2,12 @@ package controllers import ( "context" + "strings" // opClient "github.com/layer5io/meshery-operator/pkg/client" opClient "github.com/layer5io/meshery-operator/pkg/client" mesherykube "github.com/layer5io/meshkit/utils/kubernetes" + v1 "k8s.io/api/core/v1" kubeerror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,18 +35,38 @@ func (ms *meshsync) GetName() string { func (ms *meshsync) GetStatus() MesheryControllerStatus { operatorClient, _ := opClient.New(&ms.kclient.RestConfig) // TODO: Confirm if the presence of operator is needed to use the operator client sdk - meshSync, err := operatorClient.CoreV1Alpha1().MeshSyncs("meshery").Get(context.TODO(), "meshery-meshsync", metav1.GetOptions{}) + _, err := operatorClient.CoreV1Alpha1().MeshSyncs("meshery").Get(context.TODO(), "meshery-meshsync", metav1.GetOptions{}) if err == nil { - if meshSync.Status.PublishingTo != "" { + ms.status = Enabled + meshSyncPod, err := ms.kclient.KubeClient.CoreV1().Pods("meshery").List(context.TODO(), metav1.ListOptions{ + LabelSelector: "component=meshsync", + }) + if len(meshSyncPod.Items) == 0 || kubeerror.IsNotFound(err) { + return ms.status + } + + switch meshSyncPod.Items[0].Status.Phase { + case v1.PodRunning: + ms.status = Running + broker := NewMesheryBrokerHandler(ms.kclient) + brokerEndpoint, err := broker.GetPublicEndpoint() + if err != nil { + return ms.status + } + hostIP := strings.Split(brokerEndpoint, ":")[0] + isConnected := ConnectivityTest(MeshSync, hostIP) + if isConnected { + ms.status = Connected + } + return ms.status + default: ms.status = Deployed return ms.status } - ms.status = NotDeployed - return ms.status } else { if kubeerror.IsNotFound(err) { if ms.status != Undeployed { - ms.status = NotDeployed + ms.status = Undeployed } return ms.status } @@ -53,7 +75,7 @@ func (ms *meshsync) GetStatus() MesheryControllerStatus { if err != nil { // if the resource is not found, then it is NotDeployed if kubeerror.IsNotFound(err) { - ms.status = NotDeployed + ms.status = Undeployed return ms.status } return Unknown