From f070c46bfd0ef17642856b6610323c1ebbac8a1e Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Mon, 13 May 2024 09:58:02 +0800 Subject: [PATCH] support to get tenant info from multiple cluster Signed-off-by: wanjunlei --- .github/workflows/push-sidecar-image.yaml | 75 ++++++++ config/samples/template.yaml | 2 +- pkg/constants/constants.go | 1 + pkg/controller/controller.go | 11 +- pkg/route/router.go | 17 +- sidecar/kubesphere/4.0.0/Makefile | 2 +- sidecar/kubesphere/4.0.0/backend.go | 169 ++++++++++--------- sidecar/kubesphere/4.0.0/main.go | 14 +- sidecar/kubesphere/4.0.0/test/get-tenants.sh | 2 +- 9 files changed, 196 insertions(+), 97 deletions(-) create mode 100644 .github/workflows/push-sidecar-image.yaml diff --git a/.github/workflows/push-sidecar-image.yaml b/.github/workflows/push-sidecar-image.yaml new file mode 100644 index 00000000..f6df8a25 --- /dev/null +++ b/.github/workflows/push-sidecar-image.yaml @@ -0,0 +1,75 @@ +# +# Copyright 2022 The Notification-Manager Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: WorkFlow for Building sidecar image + +on: + push: + branches: + - 'master' + paths: + - '.github/workflows/push-sidecar-image.yaml' + - 'sidecar/kubesphere/4.0.0/backend.go' + - 'sidecar/kubesphere/4.0.0/Dockerfile' + - 'sidecar/kubesphere/4.0.0/main.go' + - 'sidecar/kubesphere/4.0.0/Makefile' + - 'sidecar/kubesphere/4.0.0/go.sum' + - 'sidecar/kubesphere/4.0.0/go.mod' + +env: + REPO_OP: 'kubesphere' + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 30 + name: Build Operator Image + steps: + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: 1.20.x + + - uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + + - name: Checkout code + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Set up QEMU + id: qemu + uses: docker/setup-qemu-action@v1 + with: + image: tonistiigi/binfmt:latest + platforms: all + + - name: Login to Docker Hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.REGISTRY_USER }} + password: ${{ secrets.REGISTRY_PASSWORD }} + + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v1 + + - name: Build and Push image + run: | + cd sidecar/kubesphere/4.0.0 && make \ No newline at end of file diff --git a/config/samples/template.yaml b/config/samples/template.yaml index 4f543bbc..a9dc189f 100644 --- a/config/samples/template.yaml +++ b/config/samples/template.yaml @@ -5,7 +5,7 @@ data: {{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }} {{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 0 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }} - {{ define "nm.subject" }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .CommonLabels.SortedPairs) 0 }} for {{ range .CommonLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }} + {{ define "nm.subject" }}{{ $numAlerts := len .Alerts }}{{ if eq $numAlerts 0 }}Show nothing{{ else if eq $numAlerts 1 }}{{ range .Alerts }}{{ $alertTitle := "" }}{{ $alertType := .Labels.alerttype }}{{ $alertName := .Labels.alertname }}{{ $cluster := .Labels.cluster }}{{ $node := .Labels.node }}{{ $pod := .Labels.pod }}{{ $namespace := .Labels.namespace }}{{ if eq $alertType "metric" }}{{ $alertTitle = "[Metrics Alert]" }}{{ else if eq $alertType "auditing" }}{{ $alertTitle = "[Audit Alert]" }}{{ else if eq $alertType "event" }}{{ $alertTitle = "[Event Alert]" }}{{ else }}{{ $alertTitle = "[Unknown Alert]" }}{{ end }}{{ $output := printf "%s" $alertTitle }}{{ if $alertName }}{{ $output = printf "%s alertname=%s" $output $alertName }}{{ end }}{{ if $cluster }}{{ $output = printf "%s | cluster=%s" $output $cluster }}{{ end }}{{ if $namespace }}{{ $output = printf "%s | namespace=%s" $output $namespace }}{{ end }}{{ if $pod }}{{ $output = printf "%s | pod=%s" $output $pod }}{{ if $node }}{{ $output = printf "%s/node=%s" $output $node }}{{ end }}{{ else }}{{ if $node }}{{ $output = printf "%s | node=%s" $output $node }}{{ end }}{{ end }}{{ $output }}{{ end }}{{ else }}{{ $alertTitle := "[Aggregation" }}{{ $alertType := index .GroupLabels "alerttype" }}{{ $alertName := index .GroupLabels "alertname" }}{{ $cluster := index .GroupLabels "cluster" }}{{ $namespace := index .GroupLabels "namespace" }}{{ if eq $alertType "metric" }}{{ $alertTitle = printf "%s Metrics Alert]" $alertTitle }}{{ else if eq $alertType "auditing" }}{{ $alertTitle = printf "%s Audit Alert]" $alertTitle }}{{ else if eq $alertType "event" }}{{ $alertTitle = printf "%s Event Alert]" $alertTitle }}{{ else }}{{ $alertTitle = printf "%s Alert]" $alertTitle }}{{ end }}{{ $output := printf "%s" $alertTitle }}{{ if $alertName }}{{ $output = printf "%s alertname=%s" $output $alertName }}{{ end }}{{ if $cluster }}{{ $output = printf "%s | cluster=%s" $output $cluster }}{{ end }}{{ if $namespace }}{{ $output = printf "%s | namespace=%s" $output $namespace }}{{ end }}{{ $output }}{{ end }}{{ end }} {{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }} {{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }} diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a30be1d9..4380a913 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -26,6 +26,7 @@ const ( DiscordContent = "content" DiscordEmbed = "embed" + Cluster = "cluster" Namespace = "namespace" AlertFiring = "firing" diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d0f77a7c..64eda491 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -505,7 +505,7 @@ func (c *Controller) receiverChanged(t *task) { _ = level.Info(c.logger).Log("msg", "Receiver changed", "op", t.op, "name", receiver.Name) } -func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) { +func (c *Controller) tenantIDFromNs(cluster, namespace string) ([]string, error) { tenantIDs := make([]string, 0) // Use namespace as TenantID directly if tenantSidecar not provided. if !c.tenantSidecar { @@ -514,7 +514,8 @@ func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) { } p := make(map[string]string) - p["namespace"] = namespace + p[constants.Cluster] = cluster + p[constants.Namespace] = namespace u, err := utils.UrlWithParameters(tenantSidecarURL, p) if err != nil { return nil, err @@ -536,7 +537,7 @@ func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) { return nil, err } - _ = level.Debug(c.logger).Log("msg", "get tenants from namespace", "namespace", namespace, "tenant", utils.ArrayToString(res, ",")) + _ = level.Debug(c.logger).Log("msg", "get tenants from namespace", "cluster", cluster, "namespace", namespace, "tenant", utils.ArrayToString(res, ",")) return res, nil } @@ -580,13 +581,13 @@ func getMatchedConfig(r internal.Receiver, configs map[string]map[string]interna } } -func (c *Controller) RcvsFromNs(namespace *string) []internal.Receiver { +func (c *Controller) RcvsFromNs(cluster string, namespace *string) []internal.Receiver { // Global receiver should receive all notifications. tenants := []string{globalTenantID} if namespace != nil && len(*namespace) > 0 { // Get all tenants which need to receive the notifications in this namespace. - tenantIDs, err := c.tenantIDFromNs(*namespace) + tenantIDs, err := c.tenantIDFromNs(cluster, *namespace) if err != nil { _ = level.Error(c.logger).Log("msg", "get tenantID error", "err", err, "namespace", *namespace) } else { diff --git a/pkg/route/router.go b/pkg/route/router.go index 1ad0d47b..c9f8767a 100644 --- a/pkg/route/router.go +++ b/pkg/route/router.go @@ -2,6 +2,8 @@ package route import ( "context" + "fmt" + "strings" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -51,25 +53,30 @@ func (s *routeStage) Exec(ctx context.Context, l log.Logger, data interface{}) ( return ctx, nil, err } - // Grouping alerts by namespace + // Grouping alerts by cluster and namespace alertMap := make(map[string][]*template.Alert) for _, alert := range input { ns := alert.Labels[constants.Namespace] - as := alertMap[ns] + cluster := alert.Labels[constants.Cluster] + key := fmt.Sprintf("%s|%s", cluster, ns) + as := alertMap[key] as = append(as, alert) - alertMap[ns] = as + alertMap[key] = as } m := make(map[string]*packet) routePolicy := s.notifierCtl.GetRoutePolicy() - for ns, alerts := range alertMap { + for key, alerts := range alertMap { flag := false + pair := strings.Split(key, "|") + cluster := pair[0] + ns := pair[1] var tenantRcvs []internal.Receiver for _, alert := range alerts { rcvs := s.rcvsFromRouter(alert, routers) if routePolicy == RouterPolicyAll || (routePolicy == RouterFirst && len(rcvs) == 0) { if len(tenantRcvs) == 0 && !flag { - tenantRcvs = s.notifierCtl.RcvsFromNs(&ns) + tenantRcvs = s.notifierCtl.RcvsFromNs(cluster, &ns) flag = true } rcvs = append(rcvs, tenantRcvs...) diff --git a/sidecar/kubesphere/4.0.0/Makefile b/sidecar/kubesphere/4.0.0/Makefile index cd3fdbe0..8c5964fe 100644 --- a/sidecar/kubesphere/4.0.0/Makefile +++ b/sidecar/kubesphere/4.0.0/Makefile @@ -2,7 +2,7 @@ # Use of this source code is governed by a Apache license # that can be found in the LICENSE file. -IMG ?= kubesphere/notification-tenant-sidecar:v4.0.0 +IMG ?= kubesphere/notification-tenant-sidecar:v4.0.1 AMD64 ?= -amd64 all: docker-build diff --git a/sidecar/kubesphere/4.0.0/backend.go b/sidecar/kubesphere/4.0.0/backend.go index be3dda0e..7a7a12d2 100644 --- a/sidecar/kubesphere/4.0.0/backend.go +++ b/sidecar/kubesphere/4.0.0/backend.go @@ -2,9 +2,9 @@ package main import ( "context" + "fmt" "time" - "k8s.io/api/authorization/v1beta1" v1 "k8s.io/api/core/v1" "k8s.io/klog" iamv1beta1 "kubesphere.io/api/iam/v1beta1" @@ -13,12 +13,14 @@ import ( type Backend struct { ksClient *rest.RESTClient - tenants map[string]map[string]string + // map[cluster]map[namespace][]{users} + tenants map[string]map[string][]string - interval time.Duration + interval time.Duration + batchSize int } -func NewBackend(host, username, password string, interval time.Duration) (*Backend, error) { +func NewBackend(host, username, password string, interval time.Duration, batchSize int) (*Backend, error) { var config *rest.Config if username != "" && password != "" { config = &rest.Config{ @@ -39,24 +41,20 @@ func NewBackend(host, username, password string, interval time.Duration) (*Backe } return &Backend{ - ksClient: c, - tenants: make(map[string]map[string]string), - interval: interval, + ksClient: c, + tenants: make(map[string]map[string][]string), + interval: interval, + batchSize: batchSize, }, err } -func (b *Backend) FromNamespace(ns string) []string { - - m, ok := b.tenants[ns] +func (b *Backend) FromNamespace(cluster, ns string) []string { + cm, ok := b.tenants[cluster] if !ok { return nil } - array := make([]string, 0) - for k := range m { - array = append(array, k) - } - return array + return cm[ns] } func (b *Backend) Run() { @@ -79,22 +77,42 @@ func (b *Backend) reload() { klog.Info("end reload tenant") }() + clusters, err := b.listClusters() + if err != nil { + klog.Errorf("list clusters error, %s", err.Error()) + } + users, err := b.listUsers() if err != nil { klog.Errorf("list users error, %s", err.Error()) + return } - namespaces, err := b.listNamespaces() + tenants := make(map[string]map[string][]string) + for _, cluster := range clusters { + m, err := getTenantInfoFromCluster(cluster, users) + if err != nil { + return + } + + tenants[cluster] = m + } + + b.tenants = tenants +} + +func getTenantInfoFromCluster(cluster string, users []string) (map[string][]string, error) { + namespaces, err := b.listNamespaces(cluster) if err != nil { klog.Errorf("list namespaces error, %s", err.Error()) + return nil, err } var items []iamv1beta1.SubjectAccessReview - - m := make(map[string]map[string]string) + m := make(map[string][]string) for _, namespace := range namespaces { for _, user := range users { - sar := iamv1beta1.SubjectAccessReview{ + items = append(items, iamv1beta1.SubjectAccessReview{ Spec: iamv1beta1.SubjectAccessReviewSpec{ ResourceAttributes: &iamv1beta1.ResourceAttributes{ Namespace: namespace, @@ -107,12 +125,42 @@ func (b *Backend) reload() { User: user, // "X-Remote-User" request header Groups: []string{}, // "X-Remote-Group" request header }, + }) + + if len(items) >= batchSize { + if err := b.batchRequest(cluster, items, m); err != nil { + return nil, err + } + items = items[:0] + continue } - items = append(items, sar) } } - b.batchRequest(items, m) - b.tenants = m + + if err := b.batchRequest(cluster, items, m); err != nil { + return nil, err + } + + return m, err +} + +func (b *Backend) listClusters() ([]string, error) { + res := b.ksClient.Get().AbsPath("/kapis/cluster.kubesphere.io/v1alpha1/clusters").Do(context.Background()) + if err := res.Error(); err != nil { + return nil, err + } + clusterList := &iamv1beta1.UserList{} + err := res.Into(clusterList) + if err != nil { + return nil, err + } + + var clusters []string + for _, cluster := range clusterList.Items { + clusters = append(clusters, cluster.Name) + } + + return clusters, nil } func (b *Backend) listUsers() ([]string, error) { @@ -134,8 +182,8 @@ func (b *Backend) listUsers() ([]string, error) { return users, nil } -func (b *Backend) listNamespaces() ([]string, error) { - res := b.ksClient.Get().AbsPath("/api/v1/namespaces").Do(context.Background()) +func (b *Backend) listNamespaces(cluster string) ([]string, error) { + res := b.ksClient.Get().AbsPath(fmt.Sprintf("/clusters/%s/api/v1/namespaces", cluster)).Do(context.Background()) if err := res.Error(); err != nil { return nil, err } @@ -153,65 +201,24 @@ func (b *Backend) listNamespaces() ([]string, error) { return namespaces, nil } -func (b *Backend) canAccess(user, namespace string) (bool, error) { - subjectAccessReview := &v1beta1.SubjectAccessReview{ - Spec: v1beta1.SubjectAccessReviewSpec{ - ResourceAttributes: &v1beta1.ResourceAttributes{ - Namespace: namespace, - Verb: "get", - Group: "notification.kubesphere.io", - Version: "v2beta2", - Resource: "receivenotification", - }, - NonResourceAttributes: nil, - User: user, // "X-Remote-User" request header - Groups: []string{}, // "X-Remote-Group" request header - }, - } - - if err := b.ksClient.Post().AbsPath("/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews"). - Body(subjectAccessReview). - Do(context.Background()). - Into(subjectAccessReview); err != nil { - return false, err +func (b *Backend) batchRequest(cluster string, items []iamv1beta1.SubjectAccessReview, m map[string][]string) error { + list := &iamv1beta1.SubjectAccessReviewList{ + Items: items, } - - return subjectAccessReview.Status.Allowed, nil -} - -func (b *Backend) batchRequest(subjectAccessReviews []iamv1beta1.SubjectAccessReview, m map[string]map[string]string) { - - batchSize := 500 - for i := 0; i < len(subjectAccessReviews); i += batchSize { - items := subjectAccessReviews[i:minimum(i+batchSize, len(subjectAccessReviews))] - list := &iamv1beta1.SubjectAccessReviewList{ - Items: items, - } - if err := b.ksClient.Post().AbsPath("/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews"). - Body(list). - Do(context.Background()). - Into(list); err != nil { - klog.Errorf("get access view error: %s", err.Error()) - return - } - for _, item := range items { - if item.Status.Allowed { - ns := item.Spec.ResourceAttributes.Namespace - user := item.Spec.User - array, ok := m[ns] - if !ok { - array = make(map[string]string) - } - array[user] = "" - m[ns] = array - } + if err := b.ksClient.Post().AbsPath(fmt.Sprintf("/clusters/%s/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews", cluster)). + Body(list). + Do(context.Background()). + Into(list); err != nil { + klog.Errorf("get access view error: %s", err.Error()) + return err + } + for _, item := range items { + if item.Status.Allowed { + ns := item.Spec.ResourceAttributes.Namespace + user := item.Spec.User + m[ns] = append(m[ns], user) } } -} -func minimum(a, b int) int { - if a < b { - return a - } - return b + return nil } diff --git a/sidecar/kubesphere/4.0.0/main.go b/sidecar/kubesphere/4.0.0/main.go index cf3add2b..1defce53 100644 --- a/sidecar/kubesphere/4.0.0/main.go +++ b/sidecar/kubesphere/4.0.0/main.go @@ -29,12 +29,18 @@ import ( "k8s.io/klog" ) +const ( + defaultInterval = time.Minute * 5 + defaultBatchSize = 500 +) + var ( waitHandlerGroup sync.WaitGroup host string username string password string interval time.Duration + batchSize int b *Backend ) @@ -50,7 +56,8 @@ func AddFlags(fs *pflag.FlagSet) { fs.StringVar(&host, "host", "ks-apiserver.kubesphere-system", "the host of kubesphere apiserver") fs.StringVar(&username, "username", "", "the username of kubesphere") fs.StringVar(&password, "password", "", "the password of kubesphere") - fs.DurationVar(&interval, "interval", time.Minute*5, "interval to reload") + fs.DurationVar(&interval, "interval", defaultInterval, "interval to reload") + fs.IntVar(&batchSize, "batchSize", defaultBatchSize, "interval to reload") } func NewServerCommand() *cobra.Command { @@ -73,7 +80,7 @@ func Run() error { }) var err error - b, err = NewBackend(host, username, password, interval) + b, err = NewBackend(host, username, password, interval, batchSize) if err != nil { return err } @@ -113,8 +120,9 @@ func handler(req *restful.Request, resp *restful.Response) { waitHandlerGroup.Add(1) defer waitHandlerGroup.Done() + cluster := req.QueryParameter("cluster") ns := req.QueryParameter("namespace") - tenants := b.FromNamespace(ns) + tenants := b.FromNamespace(cluster, ns) if tenants == nil { responseWithHeaderAndEntity(resp, http.StatusNotFound, "") return diff --git a/sidecar/kubesphere/4.0.0/test/get-tenants.sh b/sidecar/kubesphere/4.0.0/test/get-tenants.sh index d060b29a..a90fcf8a 100755 --- a/sidecar/kubesphere/4.0.0/test/get-tenants.sh +++ b/sidecar/kubesphere/4.0.0/test/get-tenants.sh @@ -1,2 +1,2 @@ #!/bin/bash -curl -XGET http://127.0.0.1:19094/api/v2/tenant?namespace=kubesphere-monitoring-system +curl -XGET http://127.0.0.1:19094/api/v2/tenant?cluster=host&namespace=kubesphere-monitoring-system