Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cron status update refactoring #5790

Merged
merged 4 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/cluster/EnvironmentRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"time"

k8s2 "github.com/devtron-labs/common-lib/utils/k8s"
Expand Down Expand Up @@ -513,9 +514,8 @@ func (impl EnvironmentRestHandlerImpl) GetEnvironmentConnection(w http.ResponseW
responseObj.ClusterReachable = false
}
//updating the cluster connection error to db
mapObj := map[int]error{
clusterBean.Id: err,
}
mapObj := &sync.Map{}
mapObj.Store(clusterBean.Id, err)
impl.environmentClusterMappingsService.HandleErrorInClusterConnections([]*request.ClusterBean{clusterBean}, mapObj, true)
common.WriteJsonResp(w, nil, responseObj, http.StatusOK)
}
55 changes: 37 additions & 18 deletions pkg/cluster/ClusterService.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ type ClusterService interface {
FindAllNamespacesByUserIdAndClusterId(userId int32, clusterId int, isActionUserSuperAdmin bool) ([]string, error)
FindAllForClusterByUserId(userId int32, isActionUserSuperAdmin bool) ([]ClusterBean, error)
FetchRolesFromGroup(userId int32) ([]*repository3.RoleModel, error)
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool)
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool)
ConnectClustersInBatch(clusters []*ClusterBean, clusterExistInDb bool)
ConvertClusterBeanToCluster(clusterBean *ClusterBean, userId int32) *repository.Cluster
ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster
Expand Down Expand Up @@ -259,11 +259,14 @@ func (impl *ClusterServiceImpl) ConvertClusterBeanToCluster(clusterBean *Cluster

// getAndUpdateClusterConnectionStatus is a cron function to update the connection status of all clusters
func (impl *ClusterServiceImpl) getAndUpdateClusterConnectionStatus() {
impl.logger.Debug("starting cluster connection status fetch thread")
defer impl.logger.Debug("stopped cluster connection status fetch thread")
impl.logger.Info("starting cluster connection status fetch thread")
startTime := time.Now()
defer func() {
impl.logger.Debugw("cluster connection status fetch thread completed", "timeTaken", time.Since(startTime))
}()

//getting all clusters
clusters, err := impl.FindAllExceptVirtual()
clusters, err := impl.FindAll()
if err != nil {
impl.logger.Errorw("error in getting all clusters", "err", err)
return
Expand Down Expand Up @@ -845,38 +848,54 @@ func (impl *ClusterServiceImpl) FetchRolesFromGroup(userId int32) ([]*repository
return roles, nil
}

func (impl *ClusterServiceImpl) updateConnectionStatusForVirtualCluster(respMap *sync.Map, clusterId int, clusterName string) {
connErr := fmt.Errorf("Get virtual cluster '%s' error: connection not setup for isolated clusters", clusterName)
respMap.Store(clusterId, connErr)
}

func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*ClusterBean, clusterExistInDb bool) {
var wg sync.WaitGroup
respMap := make(map[int]error)
mutex := &sync.Mutex{}

respMap := &sync.Map{}
for idx, cluster := range clusters {
if cluster.IsVirtualCluster {
impl.updateConnectionStatusForVirtualCluster(respMap, cluster.Id, cluster.ClusterName)
continue
}
wg.Add(1)
go func(idx int, cluster *ClusterBean) {
defer wg.Done()
clusterConfig := cluster.GetClusterConfig()
_, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig)
if err != nil {
mutex.Lock()
respMap[cluster.Id] = err
mutex.Unlock()
respMap.Store(cluster.Id, err)
return
}

id := cluster.Id
if !clusterExistInDb {
id = idx
}
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap, mutex)
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap)
}(idx, cluster)
}

wg.Wait()
impl.HandleErrorInClusterConnections(clusters, respMap, clusterExistInDb)
}

func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool) {
for id, err := range respMap {
func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool) {
respMap.Range(func(key, value any) bool {
defer func() {
// defer to handle panic on type assertion
if r := recover(); r != nil {
impl.logger.Errorw("error in handling error in cluster connections", "key", key, "value", value, "err", r)
}
}()
id := key.(int)
var err error
if connectionError, ok := value.(error); ok {
err = connectionError
}
errorInConnecting := ""
if err != nil {
errorInConnecting = err.Error()
Expand All @@ -896,7 +915,8 @@ func (impl *ClusterServiceImpl) HandleErrorInClusterConnections(clusters []*Clus
//id is index of the cluster in clusters array
clusters[id].ErrorInConnecting = errorInConnecting
}
}
return true
})
}

func (impl *ClusterServiceImpl) ValidateKubeconfig(kubeConfig string) (map[string]*ValidateClusterBean, error) {
Expand Down Expand Up @@ -1066,7 +1086,7 @@ func (impl *ClusterServiceImpl) ValidateKubeconfig(kubeConfig string) (map[strin

}

func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sClientSet *kubernetes.Clientset, clusterId int, respMap map[int]error, mutex *sync.Mutex) {
func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sClientSet *kubernetes.Clientset, clusterId int, respMap *sync.Map) {
response, err := impl.K8sUtil.GetLiveZCall(k8s.LiveZ, k8sClientSet)
log.Println("received response for cluster livez status", "response", string(response), "err", err, "clusterId", clusterId)

Expand All @@ -1092,9 +1112,8 @@ func (impl *ClusterServiceImpl) GetAndUpdateConnectionStatusForOneCluster(k8sCli
} else if err == nil && string(response) != "ok" {
err = fmt.Errorf("Validation failed with response : %s", string(response))
}
mutex.Lock()
respMap[clusterId] = err
mutex.Unlock()

respMap.Store(clusterId, err)
}

func (impl *ClusterServiceImpl) ConvertClusterBeanObjectToCluster(bean *ClusterBean) *v1alpha1.Cluster {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/EnvironmentService.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
bean2 "github.com/devtron-labs/devtron/pkg/cluster/repository/bean"
"strconv"
"strings"
"sync"
"time"

util2 "github.com/devtron-labs/common-lib/utils/k8s"
Expand Down Expand Up @@ -63,7 +64,7 @@ type EnvironmentService interface {
GetByClusterId(id int) ([]*bean2.EnvironmentBean, error)
GetCombinedEnvironmentListForDropDown(token string, isActionUserSuperAdmin bool, auth func(email string, object []string) map[string]bool) ([]*bean2.ClusterEnvDto, error)
GetCombinedEnvironmentListForDropDownByClusterIds(token string, clusterIds []int, auth func(token string, object string) bool) ([]*bean2.ClusterEnvDto, error)
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool)
HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool)
GetDetailsById(envId int) (*repository.Environment, error)
}

Expand Down Expand Up @@ -734,7 +735,7 @@ func (impl EnvironmentServiceImpl) Delete(deleteReq *bean2.EnvironmentBean, user
return nil
}

func (impl EnvironmentServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap map[int]error, clusterExistInDb bool) {
func (impl EnvironmentServiceImpl) HandleErrorInClusterConnections(clusters []*ClusterBean, respMap *sync.Map, clusterExistInDb bool) {
impl.clusterService.HandleErrorInClusterConnections(clusters, respMap, clusterExistInDb)
}

Expand Down
Loading