Skip to content

Commit

Permalink
Fix the output in status commands when headless services are exposed …
Browse files Browse the repository at this point in the history
…in skupper (#1306)

* fix stateful sets in service status command

* fix querying connections from router, for links or status. It was getting the first pod with the label router, and headless services have statefulsets with routers using the same label

* ignore routers from statefulsets in skupper network command
  • Loading branch information
nluaces authored Dec 15, 2023
1 parent 8c6376f commit 88c6b72
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 66 deletions.
1 change: 0 additions & 1 deletion api/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,5 @@ type VanClientInterface interface {
GetIngressDefault() string
RevokeAccess(ctx context.Context) error
NetworkStatus(ctx context.Context) (*network.NetworkStatusInfo, error)
GetRemoteLinks(ctx context.Context, siteConfig *SiteConfig) ([]*network.RemoteLinkInfo, error)
GetConsoleUrl(namespace string) (string, error)
}
10 changes: 0 additions & 10 deletions client/network_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/pkg/domain/kube"
k8s "github.com/skupperproject/skupper/pkg/kube"
"github.com/skupperproject/skupper/pkg/network"
)
Expand All @@ -29,12 +28,3 @@ func (cli *VanClient) NetworkStatus(ctx context.Context) (*network.NetworkStatus

return vanInfo, nil
}

func (cli *VanClient) GetRemoteLinks(ctx context.Context, siteConfig *types.SiteConfig) ([]*network.RemoteLinkInfo, error) {
cfg, err := cli.getRouterConfig(ctx, cli.Namespace)
if err != nil {
return nil, err
}
linkHander := kube.NewLinkHandlerKube(cli.Namespace, siteConfig, cfg, cli.KubeClient, cli.RestConfig)
return linkHander.RemoteLinks(ctx)
}
2 changes: 1 addition & 1 deletion client/router_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func getSelf(sites []data.Site, siteId string) *data.Site {
}

func (cli *VanClient) exec(command []string, namespace string) (*bytes.Buffer, error) {
pod, err := kube.GetReadyPod(namespace, cli.KubeClient, "service-controller")
pod, err := kube.GetReadyPod(namespace, cli.KubeClient, "service-controller", "")
if err != nil {
return nil, err
}
Expand Down
38 changes: 22 additions & 16 deletions cmd/skupper/skupper_kube_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ func (s *SkupperKubeNetwork) Status(cmd *cobra.Command, args []string) error {

if len(siteStatus.RouterStatus) > 0 {

//to get the generic information about the links of a site, we can get the first router, because in case of multiple routers the information will be the same.
mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&siteStatus.RouterStatus[0], &siteStatus.Site)
err, index := statusManager.GetRouterIndex(&siteStatus)
if err != nil {
return err
}

mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&siteStatus.RouterStatus[index], &siteStatus.Site)

if len(mapSiteLink) > 0 {
siteLinks := siteLevel.NewChild("Linked sites:")
Expand All @@ -87,26 +91,28 @@ func (s *SkupperKubeNetwork) Status(cmd *cobra.Command, args []string) error {
routers := siteLevel.NewChild("Routers:")
for _, routerStatus := range siteStatus.RouterStatus {
routerId := strings.Split(routerStatus.Router.Name, "/")
routerItem := fmt.Sprintf("name: %s\n", routerId[1])
detailsRouter := map[string]string{"image name": routerStatus.Router.ImageName, "image version": routerStatus.Router.ImageVersion}

routerLevel := routers.NewChildWithDetail(routerItem, detailsRouter)
// skip routers that belong to headless services
if len(routerId) > 1 && strings.HasPrefix(routerId[1], siteStatus.Site.Name) {
routerItem := fmt.Sprintf("name: %s\n", routerId[1])
detailsRouter := map[string]string{"image name": routerStatus.Router.ImageName, "image version": routerStatus.Router.ImageVersion}

printableLinks := statusManager.RemoveLinksFromSameSite(routerStatus, siteStatus.Site)
routerLevel := routers.NewChildWithDetail(routerItem, detailsRouter)

if len(printableLinks) > 0 {
links := routerLevel.NewChild("Links:")
for _, link := range printableLinks {
linkItem := fmt.Sprintf("name: %s\n", link.Name)
detailsLink := map[string]string{"direction": link.Direction}
if link.LinkCost > 0 {
detailsLink["cost"] = strconv.FormatUint(link.LinkCost, 10)
}
links.NewChildWithDetail(linkItem, detailsLink)
printableLinks := statusManager.RemoveLinksFromSameSite(routerStatus, siteStatus.Site)

if len(printableLinks) > 0 {
links := routerLevel.NewChild("Links:")
for _, link := range printableLinks {
linkItem := fmt.Sprintf("name: %s\n", link.Name)
detailsLink := map[string]string{"direction": link.Direction}
if link.LinkCost > 0 {
detailsLink["cost"] = strconv.FormatUint(link.LinkCost, 10)
}
links.NewChildWithDetail(linkItem, detailsLink)
}
}
}

}
}
}
Expand Down
8 changes: 2 additions & 6 deletions cmd/skupper/skupper_kube_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error {
}

mapServiceSites := statusManager.GetServiceSitesMap()
mapSiteTargets := statusManager.GetSiteTargetsMap()
mapSiteTarget := statusManager.GetSiteTargetMap()

var mapServiceLabels map[string]map[string]string
if err == nil {
Expand All @@ -74,10 +74,6 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error {
} else {
l := formatter.NewList()
l.Item("Services exposed through Skupper:")
var addresses []string
for _, si := range currentNetworkStatus.Addresses {
addresses = append(addresses, si.Name)
}

for _, si := range currentNetworkStatus.Addresses {
svc := l.NewChild(fmt.Sprintf("%s (%s)", si.Name, si.Protocol))
Expand All @@ -91,7 +87,7 @@ func (s *SkupperKubeService) Status(cmd *cobra.Command, args []string) error {
theSite := sites.NewChildWithDetail(item, map[string]string{"policy": site.Site.Policy})

if si.ConnectorCount > 0 {
t := mapSiteTargets[site.Site.Identity][si.Name]
t := mapSiteTarget[site.Site.Identity][si.Name]

if len(t.Address) > 0 {
targets := theSite.NewChild("Targets:")
Expand Down
7 changes: 6 additions & 1 deletion cmd/skupper/skupper_kube_site.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ func (s *SkupperKubeSite) Status(cmd *cobra.Command, args []string) error {
policies: currentSite.Site.Policy,
}

mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&currentSite.RouterStatus[0], &currentSite.Site)
err, index := statusManager.GetRouterIndex(currentSite)
if err != nil {
return err
}

mapSiteLink := statusManager.GetSiteLinkMapPerRouter(&currentSite.RouterStatus[index], &currentSite.Site)

totalSites := len(currentStatus.SiteStatus)
// the current site does not count as a connection
Expand Down
2 changes: 1 addition & 1 deletion cmd/skupper/skupper_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func NewCmdLinkStatus(skupperClient SkupperLinkClient) *cobra.Command {
},
}
cmd.Flags().IntVar(&waitFor, "wait", 0, "The number of seconds to wait for links to become connected")
cmd.Flags().BoolVar(&verboseLinkStatus, "verbose", false, "Show detailed information about a link")
cmd.Flags().BoolVarP(&verboseLinkStatus, "verbose", "v", false, "Show detailed information about a link")

return cmd

Expand Down
4 changes: 0 additions & 4 deletions cmd/skupper/skupper_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,6 @@ func (v *vanClientMock) NetworkStatus(ctx context.Context) (*network.NetworkStat
return &result, nil
}

func (v *vanClientMock) GetRemoteLinks(ctx context.Context, siteConfig *types.SiteConfig) ([]*network.RemoteLinkInfo, error) {
return nil, nil
}

func (v *vanClientMock) GetConsoleUrl(namespace string) (string, error) {
return "", nil
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/kube/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"k8s.io/apimachinery/pkg/labels"
"strings"
"time"

Expand Down Expand Up @@ -65,9 +66,17 @@ func FirstReadyPod(list []corev1.Pod) *corev1.Pod {
return nil
}

func GetReadyPod(namespace string, clientset kubernetes.Interface, component string) (*corev1.Pod, error) {
selector := "skupper.io/component=" + component
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector})
func GetReadyPod(namespace string, clientset kubernetes.Interface, component string, application string) (*corev1.Pod, error) {
matchLabels := map[string]string{"skupper.io/component": component}

if application != "" {
matchLabels["application"] = application
}

labelSelector := metav1.LabelSelector{MatchLabels: matchLabels}
listOptions := metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()}

pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
return nil, err
} else if len(pods.Items) == 0 {
Expand Down Expand Up @@ -96,7 +105,11 @@ func GetImageVersion(pod *corev1.Pod, container string) string {
}

func GetComponentVersion(namespace string, clientset kubernetes.Interface, component string, container string) string {
pod, err := GetReadyPod(namespace, clientset, component)
application := ""
if component == "router" {
application = "skupper-router"
}
pod, err := GetReadyPod(namespace, clientset, component, application)
if err == nil {
return GetImageVersion(pod, container)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kube/qdr/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func getLocalRouterId(namespace string, clientset kubernetes.Interface, config *
}

func router_exec(command []string, namespace string, clientset kubernetes.Interface, config *restclient.Config) (*bytes.Buffer, error) {
pod, err := kube.GetReadyPod(namespace, clientset, "router")
pod, err := kube.GetReadyPod(namespace, clientset, "router", "skupper-router")
if err != nil {
return nil, err
}
Expand Down
72 changes: 55 additions & 17 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,49 @@ func (s *SkupperStatus) GetServiceSitesMap() map[string][]SiteStatusInfo {
for _, site := range s.NetworkStatus.SiteStatus {

if len(site.RouterStatus) > 0 {
for _, listener := range site.RouterStatus[0].Listeners {
if mapServiceSites[listener.Name] != nil {
serviceSites := mapServiceSites[listener.Name]

serviceSites = append(serviceSites, site)
mapServiceSites[listener.Name] = serviceSites
} else {
mapServiceSites[listener.Name] = []SiteStatusInfo{site}
for _, router := range site.RouterStatus {
for _, listener := range router.Listeners {
if len(mapServiceSites[listener.Address]) == 0 || !sliceContainsSite(mapServiceSites[listener.Address], site) {
mapServiceSites[listener.Address] = append(mapServiceSites[listener.Address], site)
}
}

/* Checking for headless services:
the service can be available in the same site where the headless service was exposed, but in that site there is no
listeners for the statefulstet.
*/
for _, connector := range router.Connectors {
if len(mapServiceSites[connector.Address]) == 0 || !sliceContainsSite(mapServiceSites[connector.Address], site) {
mapServiceSites[connector.Address] = append(mapServiceSites[connector.Address], site)
}
}
}

}
}

return mapServiceSites
}

func (s *SkupperStatus) GetSiteTargetsMap() map[string]map[string]ConnectorInfo {
func (s *SkupperStatus) GetSiteTargetMap() map[string]map[string]ConnectorInfo {

mapSiteTargets := make(map[string]map[string]ConnectorInfo)
mapSiteTarget := make(map[string]map[string]ConnectorInfo)

for _, site := range s.NetworkStatus.SiteStatus {

if len(site.RouterStatus) > 0 {
for _, connector := range site.RouterStatus[0].Connectors {
if mapSiteTargets[site.Site.Identity] == nil {
mapSiteTargets[site.Site.Identity] = make(map[string]ConnectorInfo)
for _, router := range site.RouterStatus {
for _, connector := range router.Connectors {
if mapSiteTarget[site.Site.Identity] == nil {
mapSiteTarget[site.Site.Identity] = make(map[string]ConnectorInfo)
}
mapSiteTarget[site.Site.Identity][connector.Address] = connector
}
mapSiteTargets[site.Site.Identity][connector.Address] = connector
}
}
}

return mapSiteTargets
return mapSiteTarget
}

func (s *SkupperStatus) GetRouterSiteMap() map[string]SiteStatusInfo {
Expand All @@ -59,7 +69,11 @@ func (s *SkupperStatus) GetRouterSiteMap() map[string]SiteStatusInfo {
for _, routerStatus := range siteStatus.RouterStatus {
// the name of the router has a "0/" as a prefix that it is needed to remove
routerName := strings.Split(routerStatus.Router.Name, "/")
mapRouterSite[routerName[1]] = siteStatus

// Remove routers that belong to statefulsets for headless services
if len(routerName) > 1 && strings.HasPrefix(routerName[1], siteStatus.Site.Name) {
mapRouterSite[routerName[1]] = siteStatus
}
}
}
}
Expand All @@ -79,7 +93,6 @@ func (s *SkupperStatus) GetSiteById(siteId string) *SiteStatusInfo {
}

func (s *SkupperStatus) GetSiteLinkMapPerRouter(router *RouterStatusInfo, site *SiteInfo) map[string]LinkInfo {

routerSiteMap := s.GetRouterSiteMap()
siteLinkMap := make(map[string]LinkInfo)
if len(router.Links) > 0 {
Expand All @@ -104,6 +117,21 @@ func (s *SkupperStatus) LinkBelongsToSameSite(linkName string, siteId string, ro

}

func (s *SkupperStatus) GetRouterIndex(site *SiteStatusInfo) (error, int) {

for index, router := range site.RouterStatus {
// Ignore routers that belong to statefulsets for headless services and any other router
routerId := strings.Split(router.Router.Name, "/")

if len(routerId) > 1 && strings.HasPrefix(routerId[1], site.Site.Name) {
return nil, index

}
}

return fmt.Errorf("not valid router found"), -1
}

func (s *SkupperStatus) RemoveLinksFromSameSite(router RouterStatusInfo, site SiteInfo) []LinkInfo {
routerSiteMap := s.GetRouterSiteMap()
var filteredLinks []LinkInfo
Expand All @@ -129,3 +157,13 @@ func UnmarshalSkupperStatus(data map[string]string) (*NetworkStatusInfo, error)

return networkStatusInfo, nil
}

func sliceContainsSite(sites []SiteStatusInfo, site SiteStatusInfo) bool {
for _, s := range sites {
if site.Site.Identity == s.Site.Identity {
return true
}
}

return false
}
2 changes: 1 addition & 1 deletion pkg/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestGetSiteTargetsMap(t *testing.T) {
expectedSite2 := "429c2780-003d-44cc-9a91-4139885c7d20"
expectedService := "backend:8080"

result := skupperStatus.GetSiteTargetsMap()
result := skupperStatus.GetSiteTargetMap()

assert.Check(t, result[expectedSite2] == nil)
assert.Check(t, result[expectedSite1][expectedService].Address == expectedService)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/acceptance/custom/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *BasicTestRunner) Setup(ctx context.Context, createOptsPublic types.Site
"StartTimeAfter=", podStartTimeAfter, "Router component restarted - POD status:", string(podStatus))

// Check if the Volume is shared by both containers
podContainers, _ := kube.GetReadyPod(prv1Cluster.Namespace, prv1Cluster.VanClient.KubeClient, "router")
podContainers, _ := kube.GetReadyPod(prv1Cluster.Namespace, prv1Cluster.VanClient.KubeClient, "router", "skupper-router")
for _, container := range podContainers.Spec.Containers {
foundCertVol := false
for _, contVolume := range container.VolumeMounts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
//
// The pod name comes from kube.GetReadyPod for service-controller
func seekServiceControllerAndDelete(ctx *base.ClusterContext) {
pod, err := kube.GetReadyPod(ctx.Namespace, ctx.VanClient.KubeClient, "service-controller")
pod, err := kube.GetReadyPod(ctx.Namespace, ctx.VanClient.KubeClient, "service-controller", "")
if err != nil {
log.Printf("Ignoring pod listing error '%v'", err)
return
Expand Down
2 changes: 1 addition & 1 deletion test/integration/performance/common/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func tailRouterLogs(ctx context.Context, saveLogs *bool) *sync.WaitGroup {
var err error
log.Printf("Waiting for router pod to be running")
err = pkgutils.RetryWithContext(ctx, time.Second*5, func() (bool, error) {
pod, err = kube.GetReadyPod(cli.Namespace, cli.KubeClient, "router")
pod, err = kube.GetReadyPod(cli.Namespace, cli.KubeClient, "router", "skupper-router")
if pod != nil && err == nil {
return true, nil
}
Expand Down

0 comments on commit 88c6b72

Please sign in to comment.