From 88c6b7279c104b62e7f67b5804c08d6666b810ec Mon Sep 17 00:00:00 2001 From: Noe Luaces Date: Fri, 15 Dec 2023 15:41:15 +0100 Subject: [PATCH] Fix the output in status commands when headless services are exposed in skupper (#1306) * fix stateful sets in service status command * fix querying connections from router, for links or status. It was getting the first pod with the label router, and headless services have statefulsets with routers using the same label * ignore routers from statefulsets in skupper network command --- api/types/client.go | 1 - client/network_status.go | 10 --- client/router_inspect.go | 2 +- cmd/skupper/skupper_kube_network.go | 38 +++++----- cmd/skupper/skupper_kube_service.go | 8 +-- cmd/skupper/skupper_kube_site.go | 7 +- cmd/skupper/skupper_link.go | 2 +- cmd/skupper/skupper_mock_test.go | 4 -- pkg/kube/pods.go | 21 ++++-- pkg/kube/qdr/mgmt.go | 2 +- pkg/network/network.go | 72 ++++++++++++++----- pkg/network/network_test.go | 2 +- .../acceptance/custom/basic/basic.go | 2 +- .../custom/hello_policy/issues_test.go | 2 +- test/integration/performance/common/setup.go | 2 +- 15 files changed, 109 insertions(+), 66 deletions(-) diff --git a/api/types/client.go b/api/types/client.go index 30cace522..6dfea4117 100644 --- a/api/types/client.go +++ b/api/types/client.go @@ -350,6 +350,5 @@ type VanClientInterface interface { GetIngressDefault() string RevokeAccess(ctx context.Context) error NetworkStatus(ctx context.Context) (*network.NetworkStatusInfo, error) - GetRemoteLinks(ctx context.Context, siteConfig *SiteConfig) ([]*network.RemoteLinkInfo, error) GetConsoleUrl(namespace string) (string, error) } diff --git a/client/network_status.go b/client/network_status.go index cea132ea0..b9ea34f08 100644 --- a/client/network_status.go +++ b/client/network_status.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/skupperproject/skupper/api/types" - "github.com/skupperproject/skupper/pkg/domain/kube" k8s "github.com/skupperproject/skupper/pkg/kube" "github.com/skupperproject/skupper/pkg/network" ) @@ -29,12 +28,3 @@ func (cli *VanClient) NetworkStatus(ctx context.Context) (*network.NetworkStatus return vanInfo, nil } - -func (cli *VanClient) GetRemoteLinks(ctx context.Context, siteConfig *types.SiteConfig) ([]*network.RemoteLinkInfo, error) { - cfg, err := cli.getRouterConfig(ctx, cli.Namespace) - if err != nil { - return nil, err - } - linkHander := kube.NewLinkHandlerKube(cli.Namespace, siteConfig, cfg, cli.KubeClient, cli.RestConfig) - return linkHander.RemoteLinks(ctx) -} diff --git a/client/router_inspect.go b/client/router_inspect.go index 35193d967..a12914071 100644 --- a/client/router_inspect.go +++ b/client/router_inspect.go @@ -162,7 +162,7 @@ func getSelf(sites []data.Site, siteId string) *data.Site { } func (cli *VanClient) exec(command []string, namespace string) (*bytes.Buffer, error) { - pod, err := kube.GetReadyPod(namespace, cli.KubeClient, "service-controller") + pod, err := kube.GetReadyPod(namespace, cli.KubeClient, "service-controller", "") if err != nil { return nil, err } diff --git a/cmd/skupper/skupper_kube_network.go b/cmd/skupper/skupper_kube_network.go index cc048a8a6..cc1a1b063 100644 --- a/cmd/skupper/skupper_kube_network.go +++ b/cmd/skupper/skupper_kube_network.go @@ -73,8 +73,12 @@ func (s *SkupperKubeNetwork) Status(cmd *cobra.Command, args []string) error { if len(siteStatus.RouterStatus) > 0 { - //to get the generic information about the links of a site, we can get the first router, because in case of multiple routers the information will be the same. - mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&siteStatus.RouterStatus[0], &siteStatus.Site) + err, index := statusManager.GetRouterIndex(&siteStatus) + if err != nil { + return err + } + + mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&siteStatus.RouterStatus[index], &siteStatus.Site) if len(mapSiteLink) > 0 { siteLinks := siteLevel.NewChild("Linked sites:") @@ -87,26 +91,28 @@ func (s *SkupperKubeNetwork) Status(cmd *cobra.Command, args []string) error { routers := siteLevel.NewChild("Routers:") for _, routerStatus := range siteStatus.RouterStatus { routerId := strings.Split(routerStatus.Router.Name, "/") - routerItem := fmt.Sprintf("name: %s\n", routerId[1]) - detailsRouter := map[string]string{"image name": routerStatus.Router.ImageName, "image version": routerStatus.Router.ImageVersion} - routerLevel := routers.NewChildWithDetail(routerItem, detailsRouter) + // skip routers that belong to headless services + if len(routerId) > 1 && strings.HasPrefix(routerId[1], siteStatus.Site.Name) { + routerItem := fmt.Sprintf("name: %s\n", routerId[1]) + detailsRouter := map[string]string{"image name": routerStatus.Router.ImageName, "image version": routerStatus.Router.ImageVersion} - printableLinks := statusManager.RemoveLinksFromSameSite(routerStatus, siteStatus.Site) + routerLevel := routers.NewChildWithDetail(routerItem, detailsRouter) - if len(printableLinks) > 0 { - links := routerLevel.NewChild("Links:") - for _, link := range printableLinks { - linkItem := fmt.Sprintf("name: %s\n", link.Name) - detailsLink := map[string]string{"direction": link.Direction} - if link.LinkCost > 0 { - detailsLink["cost"] = strconv.FormatUint(link.LinkCost, 10) - } - links.NewChildWithDetail(linkItem, detailsLink) + printableLinks := statusManager.RemoveLinksFromSameSite(routerStatus, siteStatus.Site) + if len(printableLinks) > 0 { + links := routerLevel.NewChild("Links:") + for _, link := range printableLinks { + linkItem := fmt.Sprintf("name: %s\n", link.Name) + detailsLink := map[string]string{"direction": link.Direction} + if link.LinkCost > 0 { + detailsLink["cost"] = strconv.FormatUint(link.LinkCost, 10) + } + links.NewChildWithDetail(linkItem, detailsLink) + } } } - } } } diff --git a/cmd/skupper/skupper_kube_service.go b/cmd/skupper/skupper_kube_service.go index 33a4b0c85..7e7f892c7 100644 --- a/cmd/skupper/skupper_kube_service.go +++ b/cmd/skupper/skupper_kube_service.go @@ -62,7 +62,7 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error { } mapServiceSites := statusManager.GetServiceSitesMap() - mapSiteTargets := statusManager.GetSiteTargetsMap() + mapSiteTarget := statusManager.GetSiteTargetMap() var mapServiceLabels map[string]map[string]string if err == nil { @@ -74,10 +74,6 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error { } else { l := formatter.NewList() l.Item("Services exposed through Skupper:") - var addresses []string - for _, si := range currentNetworkStatus.Addresses { - addresses = append(addresses, si.Name) - } for _, si := range currentNetworkStatus.Addresses { svc := l.NewChild(fmt.Sprintf("%s (%s)", si.Name, si.Protocol)) @@ -91,7 +87,7 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error { theSite := sites.NewChildWithDetail(item, map[string]string{"policy": site.Site.Policy}) if si.ConnectorCount > 0 { - t := mapSiteTargets[site.Site.Identity][si.Name] + t := mapSiteTarget[site.Site.Identity][si.Name] if len(t.Address) > 0 { targets := theSite.NewChild("Targets:") diff --git a/cmd/skupper/skupper_kube_site.go b/cmd/skupper/skupper_kube_site.go index 79a2e72b8..dc2e68c6f 100644 --- a/cmd/skupper/skupper_kube_site.go +++ b/cmd/skupper/skupper_kube_site.go @@ -268,7 +268,12 @@ func (s *SkupperKubeSite) Status(cmd *cobra.Command, args []string) error { policies: currentSite.Site.Policy, } - mapSiteLink := statusManager.GetSiteLinkMapPerRouter(¤tSite.RouterStatus[0], ¤tSite.Site) + err, index := statusManager.GetRouterIndex(currentSite) + if err != nil { + return err + } + + mapSiteLink := statusManager.GetSiteLinkMapPerRouter(¤tSite.RouterStatus[index], ¤tSite.Site) totalSites := len(currentStatus.SiteStatus) // the current site does not count as a connection diff --git a/cmd/skupper/skupper_link.go b/cmd/skupper/skupper_link.go index 6b0e967a2..b19fa991b 100644 --- a/cmd/skupper/skupper_link.go +++ b/cmd/skupper/skupper_link.go @@ -227,7 +227,7 @@ func NewCmdLinkStatus(skupperClient SkupperLinkClient) *cobra.Command { }, } cmd.Flags().IntVar(&waitFor, "wait", 0, "The number of seconds to wait for links to become connected") - cmd.Flags().BoolVar(&verboseLinkStatus, "verbose", false, "Show detailed information about a link") + cmd.Flags().BoolVarP(&verboseLinkStatus, "verbose", "v", false, "Show detailed information about a link") return cmd diff --git a/cmd/skupper/skupper_mock_test.go b/cmd/skupper/skupper_mock_test.go index fa688b2c6..1f0918f6e 100644 --- a/cmd/skupper/skupper_mock_test.go +++ b/cmd/skupper/skupper_mock_test.go @@ -319,10 +319,6 @@ func (v *vanClientMock) NetworkStatus(ctx context.Context) (*network.NetworkStat return &result, nil } -func (v *vanClientMock) GetRemoteLinks(ctx context.Context, siteConfig *types.SiteConfig) ([]*network.RemoteLinkInfo, error) { - return nil, nil -} - func (v *vanClientMock) GetConsoleUrl(namespace string) (string, error) { return "", nil } diff --git a/pkg/kube/pods.go b/pkg/kube/pods.go index 82f5c11d8..bffc74477 100644 --- a/pkg/kube/pods.go +++ b/pkg/kube/pods.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "k8s.io/apimachinery/pkg/labels" "strings" "time" @@ -65,9 +66,17 @@ func FirstReadyPod(list []corev1.Pod) *corev1.Pod { return nil } -func GetReadyPod(namespace string, clientset kubernetes.Interface, component string) (*corev1.Pod, error) { - selector := "skupper.io/component=" + component - pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector}) +func GetReadyPod(namespace string, clientset kubernetes.Interface, component string, application string) (*corev1.Pod, error) { + matchLabels := map[string]string{"skupper.io/component": component} + + if application != "" { + matchLabels["application"] = application + } + + labelSelector := metav1.LabelSelector{MatchLabels: matchLabels} + listOptions := metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()} + + pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions) if err != nil { return nil, err } else if len(pods.Items) == 0 { @@ -96,7 +105,11 @@ func GetImageVersion(pod *corev1.Pod, container string) string { } func GetComponentVersion(namespace string, clientset kubernetes.Interface, component string, container string) string { - pod, err := GetReadyPod(namespace, clientset, component) + application := "" + if component == "router" { + application = "skupper-router" + } + pod, err := GetReadyPod(namespace, clientset, component, application) if err == nil { return GetImageVersion(pod, container) } else { diff --git a/pkg/kube/qdr/mgmt.go b/pkg/kube/qdr/mgmt.go index 956982696..a49028726 100644 --- a/pkg/kube/qdr/mgmt.go +++ b/pkg/kube/qdr/mgmt.go @@ -290,7 +290,7 @@ func getLocalRouterId(namespace string, clientset kubernetes.Interface, config * } func router_exec(command []string, namespace string, clientset kubernetes.Interface, config *restclient.Config) (*bytes.Buffer, error) { - pod, err := kube.GetReadyPod(namespace, clientset, "router") + pod, err := kube.GetReadyPod(namespace, clientset, "router", "skupper-router") if err != nil { return nil, err } diff --git a/pkg/network/network.go b/pkg/network/network.go index 07a230ae4..0b32a5638 100644 --- a/pkg/network/network.go +++ b/pkg/network/network.go @@ -17,39 +17,49 @@ func (s *SkupperStatus) GetServiceSitesMap() map[string][]SiteStatusInfo { for _, site := range s.NetworkStatus.SiteStatus { if len(site.RouterStatus) > 0 { - for _, listener := range site.RouterStatus[0].Listeners { - if mapServiceSites[listener.Name] != nil { - serviceSites := mapServiceSites[listener.Name] - - serviceSites = append(serviceSites, site) - mapServiceSites[listener.Name] = serviceSites - } else { - mapServiceSites[listener.Name] = []SiteStatusInfo{site} + for _, router := range site.RouterStatus { + for _, listener := range router.Listeners { + if len(mapServiceSites[listener.Address]) == 0 || !sliceContainsSite(mapServiceSites[listener.Address], site) { + mapServiceSites[listener.Address] = append(mapServiceSites[listener.Address], site) + } + } + + /* Checking for headless services: + the service can be available in the same site where the headless service was exposed, but in that site there is no + listeners for the statefulstet. + */ + for _, connector := range router.Connectors { + if len(mapServiceSites[connector.Address]) == 0 || !sliceContainsSite(mapServiceSites[connector.Address], site) { + mapServiceSites[connector.Address] = append(mapServiceSites[connector.Address], site) + } } } + } } return mapServiceSites } -func (s *SkupperStatus) GetSiteTargetsMap() map[string]map[string]ConnectorInfo { +func (s *SkupperStatus) GetSiteTargetMap() map[string]map[string]ConnectorInfo { - mapSiteTargets := make(map[string]map[string]ConnectorInfo) + mapSiteTarget := make(map[string]map[string]ConnectorInfo) for _, site := range s.NetworkStatus.SiteStatus { if len(site.RouterStatus) > 0 { - for _, connector := range site.RouterStatus[0].Connectors { - if mapSiteTargets[site.Site.Identity] == nil { - mapSiteTargets[site.Site.Identity] = make(map[string]ConnectorInfo) + for _, router := range site.RouterStatus { + for _, connector := range router.Connectors { + if mapSiteTarget[site.Site.Identity] == nil { + mapSiteTarget[site.Site.Identity] = make(map[string]ConnectorInfo) + } + mapSiteTarget[site.Site.Identity][connector.Address] = connector } - mapSiteTargets[site.Site.Identity][connector.Address] = connector } } } - return mapSiteTargets + return mapSiteTarget } func (s *SkupperStatus) GetRouterSiteMap() map[string]SiteStatusInfo { @@ -59,7 +69,11 @@ func (s *SkupperStatus) GetRouterSiteMap() map[string]SiteStatusInfo { for _, routerStatus := range siteStatus.RouterStatus { // the name of the router has a "0/" as a prefix that it is needed to remove routerName := strings.Split(routerStatus.Router.Name, "/") - mapRouterSite[routerName[1]] = siteStatus + + // Remove routers that belong to statefulsets for headless services + if len(routerName) > 1 && strings.HasPrefix(routerName[1], siteStatus.Site.Name) { + mapRouterSite[routerName[1]] = siteStatus + } } } } @@ -79,7 +93,6 @@ func (s *SkupperStatus) GetSiteById(siteId string) *SiteStatusInfo { } func (s *SkupperStatus) GetSiteLinkMapPerRouter(router *RouterStatusInfo, site *SiteInfo) map[string]LinkInfo { - routerSiteMap := s.GetRouterSiteMap() siteLinkMap := make(map[string]LinkInfo) if len(router.Links) > 0 { @@ -104,6 +117,21 @@ func (s *SkupperStatus) LinkBelongsToSameSite(linkName string, siteId string, ro } +func (s *SkupperStatus) GetRouterIndex(site *SiteStatusInfo) (error, int) { + + for index, router := range site.RouterStatus { + // Ignore routers that belong to statefulsets for headless services and any other router + routerId := strings.Split(router.Router.Name, "/") + + if len(routerId) > 1 && strings.HasPrefix(routerId[1], site.Site.Name) { + return nil, index + + } + } + + return fmt.Errorf("not valid router found"), -1 +} + func (s *SkupperStatus) RemoveLinksFromSameSite(router RouterStatusInfo, site SiteInfo) []LinkInfo { routerSiteMap := s.GetRouterSiteMap() var filteredLinks []LinkInfo @@ -129,3 +157,13 @@ func UnmarshalSkupperStatus(data map[string]string) (*NetworkStatusInfo, error) return networkStatusInfo, nil } + +func sliceContainsSite(sites []SiteStatusInfo, site SiteStatusInfo) bool { + for _, s := range sites { + if site.Site.Identity == s.Site.Identity { + return true + } + } + + return false +} diff --git a/pkg/network/network_test.go b/pkg/network/network_test.go index 93a443e15..1727f7beb 100644 --- a/pkg/network/network_test.go +++ b/pkg/network/network_test.go @@ -53,7 +53,7 @@ func TestGetSiteTargetsMap(t *testing.T) { expectedSite2 := "429c2780-003d-44cc-9a91-4139885c7d20" expectedService := "backend:8080" - result := skupperStatus.GetSiteTargetsMap() + result := skupperStatus.GetSiteTargetMap() assert.Check(t, result[expectedSite2] == nil) assert.Check(t, result[expectedSite1][expectedService].Address == expectedService) diff --git a/test/integration/acceptance/custom/basic/basic.go b/test/integration/acceptance/custom/basic/basic.go index 1f240e0a8..438243986 100644 --- a/test/integration/acceptance/custom/basic/basic.go +++ b/test/integration/acceptance/custom/basic/basic.go @@ -128,7 +128,7 @@ func (r *BasicTestRunner) Setup(ctx context.Context, createOptsPublic types.Site "StartTimeAfter=", podStartTimeAfter, "Router component restarted - POD status:", string(podStatus)) // Check if the Volume is shared by both containers - podContainers, _ := kube.GetReadyPod(prv1Cluster.Namespace, prv1Cluster.VanClient.KubeClient, "router") + podContainers, _ := kube.GetReadyPod(prv1Cluster.Namespace, prv1Cluster.VanClient.KubeClient, "router", "skupper-router") for _, container := range podContainers.Spec.Containers { foundCertVol := false for _, contVolume := range container.VolumeMounts { diff --git a/test/integration/acceptance/custom/hello_policy/issues_test.go b/test/integration/acceptance/custom/hello_policy/issues_test.go index 1920f5c8a..c65236766 100644 --- a/test/integration/acceptance/custom/hello_policy/issues_test.go +++ b/test/integration/acceptance/custom/hello_policy/issues_test.go @@ -26,7 +26,7 @@ import ( // // The pod name comes from kube.GetReadyPod for service-controller func seekServiceControllerAndDelete(ctx *base.ClusterContext) { - pod, err := kube.GetReadyPod(ctx.Namespace, ctx.VanClient.KubeClient, "service-controller") + pod, err := kube.GetReadyPod(ctx.Namespace, ctx.VanClient.KubeClient, "service-controller", "") if err != nil { log.Printf("Ignoring pod listing error '%v'", err) return diff --git a/test/integration/performance/common/setup.go b/test/integration/performance/common/setup.go index 5d60977e5..678171962 100644 --- a/test/integration/performance/common/setup.go +++ b/test/integration/performance/common/setup.go @@ -404,7 +404,7 @@ func tailRouterLogs(ctx context.Context, saveLogs *bool) *sync.WaitGroup { var err error log.Printf("Waiting for router pod to be running") err = pkgutils.RetryWithContext(ctx, time.Second*5, func() (bool, error) { - pod, err = kube.GetReadyPod(cli.Namespace, cli.KubeClient, "router") + pod, err = kube.GetReadyPod(cli.Namespace, cli.KubeClient, "router", "skupper-router") if pod != nil && err == nil { return true, nil }