Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support to get tenant info from multiple cluster #263

Merged
merged 1 commit into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/push-sidecar-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2022 The Notification-Manager Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: WorkFlow for Building sidecar image

on:
push:
branches:
- 'master'
paths:
- '.github/workflows/push-sidecar-image.yaml'
- 'sidecar/kubesphere/4.0.0/backend.go'
- 'sidecar/kubesphere/4.0.0/Dockerfile'
- 'sidecar/kubesphere/4.0.0/main.go'
- 'sidecar/kubesphere/4.0.0/Makefile'
- 'sidecar/kubesphere/4.0.0/go.sum'
- 'sidecar/kubesphere/4.0.0/go.mod'

env:
REPO_OP: 'kubesphere'

jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 30
name: Build Operator Image
steps:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.20.x

- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Checkout code
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Set up QEMU
id: qemu
uses: docker/setup-qemu-action@v1
with:
image: tonistiigi/binfmt:latest
platforms: all

- name: Login to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.REGISTRY_USER }}
password: ${{ secrets.REGISTRY_PASSWORD }}

- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1

- name: Build and Push image
run: |
cd sidecar/kubesphere/4.0.0 && make
2 changes: 1 addition & 1 deletion config/samples/template.yaml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ data:
{{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}

{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 0 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.subject" }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .CommonLabels.SortedPairs) 0 }} for {{ range .CommonLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}
{{ define "nm.subject" }}{{ $numAlerts := len .Alerts }}{{ if eq $numAlerts 0 }}Show nothing{{ else if eq $numAlerts 1 }}{{ range .Alerts }}{{ $alertTitle := "" }}{{ $alertType := .Labels.alerttype }}{{ $alertName := .Labels.alertname }}{{ $cluster := .Labels.cluster }}{{ $node := .Labels.node }}{{ $pod := .Labels.pod }}{{ $namespace := .Labels.namespace }}{{ if eq $alertType "metric" }}{{ $alertTitle = "[Metrics Alert]" }}{{ else if eq $alertType "auditing" }}{{ $alertTitle = "[Audit Alert]" }}{{ else if eq $alertType "event" }}{{ $alertTitle = "[Event Alert]" }}{{ else }}{{ $alertTitle = "[Unknown Alert]" }}{{ end }}{{ $output := printf "%s" $alertTitle }}{{ if $alertName }}{{ $output = printf "%s alertname=%s" $output $alertName }}{{ end }}{{ if $cluster }}{{ $output = printf "%s | cluster=%s" $output $cluster }}{{ end }}{{ if $namespace }}{{ $output = printf "%s | namespace=%s" $output $namespace }}{{ end }}{{ if $pod }}{{ $output = printf "%s | pod=%s" $output $pod }}{{ if $node }}{{ $output = printf "%s/node=%s" $output $node }}{{ end }}{{ else }}{{ if $node }}{{ $output = printf "%s | node=%s" $output $node }}{{ end }}{{ end }}{{ $output }}{{ end }}{{ else }}{{ $alertTitle := "[Aggregation" }}{{ $alertType := index .GroupLabels "alerttype" }}{{ $alertName := index .GroupLabels "alertname" }}{{ $cluster := index .GroupLabels "cluster" }}{{ $namespace := index .GroupLabels "namespace" }}{{ if eq $alertType "metric" }}{{ $alertTitle = printf "%s Metrics Alert]" $alertTitle }}{{ else if eq $alertType "auditing" }}{{ $alertTitle = printf "%s Audit Alert]" $alertTitle }}{{ else if eq $alertType "event" }}{{ $alertTitle = printf "%s Event Alert]" $alertTitle }}{{ else }}{{ $alertTitle = printf "%s Alert]" $alertTitle }}{{ end }}{{ $output := printf "%s" $alertTitle }}{{ if $alertName }}{{ $output = printf "%s alertname=%s" $output $alertName }}{{ end }}{{ if $cluster }}{{ $output = printf "%s | cluster=%s" $output $cluster }}{{ end }}{{ if $namespace }}{{ $output = printf "%s | namespace=%s" $output $namespace }}{{ end }}{{ $output }}{{ end }}{{ end }}

{{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }}
{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}
1 change: 1 addition & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ const (
DiscordContent = "content"
DiscordEmbed = "embed"

Cluster = "cluster"
Namespace = "namespace"

AlertFiring = "firing"
11 changes: 6 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -505,7 +505,7 @@ func (c *Controller) receiverChanged(t *task) {
_ = level.Info(c.logger).Log("msg", "Receiver changed", "op", t.op, "name", receiver.Name)
}

func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) {
func (c *Controller) tenantIDFromNs(cluster, namespace string) ([]string, error) {
tenantIDs := make([]string, 0)
// Use namespace as TenantID directly if tenantSidecar not provided.
if !c.tenantSidecar {
@@ -514,7 +514,8 @@ func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) {
}

p := make(map[string]string)
p["namespace"] = namespace
p[constants.Cluster] = cluster
p[constants.Namespace] = namespace
u, err := utils.UrlWithParameters(tenantSidecarURL, p)
if err != nil {
return nil, err
@@ -536,7 +537,7 @@ func (c *Controller) tenantIDFromNs(namespace string) ([]string, error) {
return nil, err
}

_ = level.Debug(c.logger).Log("msg", "get tenants from namespace", "namespace", namespace, "tenant", utils.ArrayToString(res, ","))
_ = level.Debug(c.logger).Log("msg", "get tenants from namespace", "cluster", cluster, "namespace", namespace, "tenant", utils.ArrayToString(res, ","))

return res, nil
}
@@ -580,13 +581,13 @@ func getMatchedConfig(r internal.Receiver, configs map[string]map[string]interna
}
}

func (c *Controller) RcvsFromNs(namespace *string) []internal.Receiver {
func (c *Controller) RcvsFromNs(cluster string, namespace *string) []internal.Receiver {

// Global receiver should receive all notifications.
tenants := []string{globalTenantID}
if namespace != nil && len(*namespace) > 0 {
// Get all tenants which need to receive the notifications in this namespace.
tenantIDs, err := c.tenantIDFromNs(*namespace)
tenantIDs, err := c.tenantIDFromNs(cluster, *namespace)
if err != nil {
_ = level.Error(c.logger).Log("msg", "get tenantID error", "err", err, "namespace", *namespace)
} else {
17 changes: 12 additions & 5 deletions pkg/route/router.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@ package route

import (
"context"
"fmt"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
@@ -51,25 +53,30 @@ func (s *routeStage) Exec(ctx context.Context, l log.Logger, data interface{}) (
return ctx, nil, err
}

// Grouping alerts by namespace
// Grouping alerts by cluster and namespace
alertMap := make(map[string][]*template.Alert)
for _, alert := range input {
ns := alert.Labels[constants.Namespace]
as := alertMap[ns]
cluster := alert.Labels[constants.Cluster]
key := fmt.Sprintf("%s|%s", cluster, ns)
as := alertMap[key]
as = append(as, alert)
alertMap[ns] = as
alertMap[key] = as
}

m := make(map[string]*packet)
routePolicy := s.notifierCtl.GetRoutePolicy()
for ns, alerts := range alertMap {
for key, alerts := range alertMap {
flag := false
pair := strings.Split(key, "|")
cluster := pair[0]
ns := pair[1]
var tenantRcvs []internal.Receiver
for _, alert := range alerts {
rcvs := s.rcvsFromRouter(alert, routers)
if routePolicy == RouterPolicyAll || (routePolicy == RouterFirst && len(rcvs) == 0) {
if len(tenantRcvs) == 0 && !flag {
tenantRcvs = s.notifierCtl.RcvsFromNs(&ns)
tenantRcvs = s.notifierCtl.RcvsFromNs(cluster, &ns)
flag = true
}
rcvs = append(rcvs, tenantRcvs...)
2 changes: 1 addition & 1 deletion sidecar/kubesphere/4.0.0/Makefile
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
# Use of this source code is governed by a Apache license
# that can be found in the LICENSE file.

IMG ?= kubesphere/notification-tenant-sidecar:v4.0.0
IMG ?= kubesphere/notification-tenant-sidecar:v4.0.1
AMD64 ?= -amd64

all: docker-build
169 changes: 88 additions & 81 deletions sidecar/kubesphere/4.0.0/backend.go
Original file line number Diff line number Diff line change
@@ -2,9 +2,9 @@ package main

import (
"context"
"fmt"
"time"

"k8s.io/api/authorization/v1beta1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
iamv1beta1 "kubesphere.io/api/iam/v1beta1"
@@ -13,12 +13,14 @@ import (

type Backend struct {
ksClient *rest.RESTClient
tenants map[string]map[string]string
// map[cluster]map[namespace][]{users}
tenants map[string]map[string][]string

interval time.Duration
interval time.Duration
batchSize int
}

func NewBackend(host, username, password string, interval time.Duration) (*Backend, error) {
func NewBackend(host, username, password string, interval time.Duration, batchSize int) (*Backend, error) {
var config *rest.Config
if username != "" && password != "" {
config = &rest.Config{
@@ -39,24 +41,20 @@ func NewBackend(host, username, password string, interval time.Duration) (*Backe
}

return &Backend{
ksClient: c,
tenants: make(map[string]map[string]string),
interval: interval,
ksClient: c,
tenants: make(map[string]map[string][]string),
interval: interval,
batchSize: batchSize,
}, err
}

func (b *Backend) FromNamespace(ns string) []string {

m, ok := b.tenants[ns]
func (b *Backend) FromNamespace(cluster, ns string) []string {
cm, ok := b.tenants[cluster]
if !ok {
return nil
}

array := make([]string, 0)
for k := range m {
array = append(array, k)
}
return array
return cm[ns]
}

func (b *Backend) Run() {
@@ -79,22 +77,42 @@ func (b *Backend) reload() {
klog.Info("end reload tenant")
}()

clusters, err := b.listClusters()
if err != nil {
klog.Errorf("list clusters error, %s", err.Error())
}

users, err := b.listUsers()
if err != nil {
klog.Errorf("list users error, %s", err.Error())
return
}

namespaces, err := b.listNamespaces()
tenants := make(map[string]map[string][]string)
for _, cluster := range clusters {
m, err := getTenantInfoFromCluster(cluster, users)
if err != nil {
return
}

tenants[cluster] = m
}

b.tenants = tenants
}

func getTenantInfoFromCluster(cluster string, users []string) (map[string][]string, error) {
namespaces, err := b.listNamespaces(cluster)
if err != nil {
klog.Errorf("list namespaces error, %s", err.Error())
return nil, err
}

var items []iamv1beta1.SubjectAccessReview

m := make(map[string]map[string]string)
m := make(map[string][]string)
for _, namespace := range namespaces {
for _, user := range users {
sar := iamv1beta1.SubjectAccessReview{
items = append(items, iamv1beta1.SubjectAccessReview{
Spec: iamv1beta1.SubjectAccessReviewSpec{
ResourceAttributes: &iamv1beta1.ResourceAttributes{
Namespace: namespace,
@@ -107,12 +125,42 @@ func (b *Backend) reload() {
User: user, // "X-Remote-User" request header
Groups: []string{}, // "X-Remote-Group" request header
},
})

if len(items) >= batchSize {
if err := b.batchRequest(cluster, items, m); err != nil {
return nil, err
}
items = items[:0]
continue
}
items = append(items, sar)
}
}
b.batchRequest(items, m)
b.tenants = m

if err := b.batchRequest(cluster, items, m); err != nil {
return nil, err
}

return m, err
}

func (b *Backend) listClusters() ([]string, error) {
res := b.ksClient.Get().AbsPath("/kapis/cluster.kubesphere.io/v1alpha1/clusters").Do(context.Background())
if err := res.Error(); err != nil {
return nil, err
}
clusterList := &iamv1beta1.UserList{}
err := res.Into(clusterList)
if err != nil {
return nil, err
}

var clusters []string
for _, cluster := range clusterList.Items {
clusters = append(clusters, cluster.Name)
}

return clusters, nil
}

func (b *Backend) listUsers() ([]string, error) {
@@ -134,8 +182,8 @@ func (b *Backend) listUsers() ([]string, error) {
return users, nil
}

func (b *Backend) listNamespaces() ([]string, error) {
res := b.ksClient.Get().AbsPath("/api/v1/namespaces").Do(context.Background())
func (b *Backend) listNamespaces(cluster string) ([]string, error) {
res := b.ksClient.Get().AbsPath(fmt.Sprintf("/clusters/%s/api/v1/namespaces", cluster)).Do(context.Background())
if err := res.Error(); err != nil {
return nil, err
}
@@ -153,65 +201,24 @@ func (b *Backend) listNamespaces() ([]string, error) {
return namespaces, nil
}

func (b *Backend) canAccess(user, namespace string) (bool, error) {
subjectAccessReview := &v1beta1.SubjectAccessReview{
Spec: v1beta1.SubjectAccessReviewSpec{
ResourceAttributes: &v1beta1.ResourceAttributes{
Namespace: namespace,
Verb: "get",
Group: "notification.kubesphere.io",
Version: "v2beta2",
Resource: "receivenotification",
},
NonResourceAttributes: nil,
User: user, // "X-Remote-User" request header
Groups: []string{}, // "X-Remote-Group" request header
},
}

if err := b.ksClient.Post().AbsPath("/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews").
Body(subjectAccessReview).
Do(context.Background()).
Into(subjectAccessReview); err != nil {
return false, err
func (b *Backend) batchRequest(cluster string, items []iamv1beta1.SubjectAccessReview, m map[string][]string) error {
list := &iamv1beta1.SubjectAccessReviewList{
Items: items,
}

return subjectAccessReview.Status.Allowed, nil
}

func (b *Backend) batchRequest(subjectAccessReviews []iamv1beta1.SubjectAccessReview, m map[string]map[string]string) {

batchSize := 500
for i := 0; i < len(subjectAccessReviews); i += batchSize {
items := subjectAccessReviews[i:minimum(i+batchSize, len(subjectAccessReviews))]
list := &iamv1beta1.SubjectAccessReviewList{
Items: items,
}
if err := b.ksClient.Post().AbsPath("/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews").
Body(list).
Do(context.Background()).
Into(list); err != nil {
klog.Errorf("get access view error: %s", err.Error())
return
}
for _, item := range items {
if item.Status.Allowed {
ns := item.Spec.ResourceAttributes.Namespace
user := item.Spec.User
array, ok := m[ns]
if !ok {
array = make(map[string]string)
}
array[user] = ""
m[ns] = array
}
if err := b.ksClient.Post().AbsPath(fmt.Sprintf("/clusters/%s/kapis/iam.kubesphere.io/v1beta1/subjectaccessreviews", cluster)).
Body(list).
Do(context.Background()).
Into(list); err != nil {
klog.Errorf("get access view error: %s", err.Error())
return err
}
for _, item := range items {
if item.Status.Allowed {
ns := item.Spec.ResourceAttributes.Namespace
user := item.Spec.User
m[ns] = append(m[ns], user)
}
}
}

func minimum(a, b int) int {
if a < b {
return a
}
return b
return nil
}
14 changes: 11 additions & 3 deletions sidecar/kubesphere/4.0.0/main.go
Original file line number Diff line number Diff line change
@@ -29,12 +29,18 @@ import (
"k8s.io/klog"
)

const (
defaultInterval = time.Minute * 5
defaultBatchSize = 500
)

var (
waitHandlerGroup sync.WaitGroup
host string
username string
password string
interval time.Duration
batchSize int

b *Backend
)
@@ -50,7 +56,8 @@ func AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&host, "host", "ks-apiserver.kubesphere-system", "the host of kubesphere apiserver")
fs.StringVar(&username, "username", "", "the username of kubesphere")
fs.StringVar(&password, "password", "", "the password of kubesphere")
fs.DurationVar(&interval, "interval", time.Minute*5, "interval to reload")
fs.DurationVar(&interval, "interval", defaultInterval, "interval to reload")
fs.IntVar(&batchSize, "batchSize", defaultBatchSize, "interval to reload")
}

func NewServerCommand() *cobra.Command {
@@ -73,7 +80,7 @@ func Run() error {
})

var err error
b, err = NewBackend(host, username, password, interval)
b, err = NewBackend(host, username, password, interval, batchSize)
if err != nil {
return err
}
@@ -113,8 +120,9 @@ func handler(req *restful.Request, resp *restful.Response) {
waitHandlerGroup.Add(1)
defer waitHandlerGroup.Done()

cluster := req.QueryParameter("cluster")
ns := req.QueryParameter("namespace")
tenants := b.FromNamespace(ns)
tenants := b.FromNamespace(cluster, ns)
if tenants == nil {
responseWithHeaderAndEntity(resp, http.StatusNotFound, "")
return
2 changes: 1 addition & 1 deletion sidecar/kubesphere/4.0.0/test/get-tenants.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
curl -XGET http://127.0.0.1:19094/api/v2/tenant?namespace=kubesphere-monitoring-system
curl -XGET http://127.0.0.1:19094/api/v2/tenant?cluster=host&namespace=kubesphere-monitoring-system