Skip to content

Commit

Permalink
Merge pull request #44 from tsuru/team-label
Browse files Browse the repository at this point in the history
Add support to set up tsuru team label
  • Loading branch information
wpjunior authored Oct 4, 2024
2 parents 0c1ecab + 78f27e5 commit 8b6d39f
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 149 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/tsuru/deploy-agent/pkg/build"
"github.com/tsuru/deploy-agent/pkg/build/buildkit"
"github.com/tsuru/deploy-agent/pkg/build/buildkit/autodiscovery"
buildpb "github.com/tsuru/deploy-agent/pkg/build/grpc_build_v1"
"github.com/tsuru/deploy-agent/pkg/health"
"github.com/tsuru/deploy-agent/pkg/repository"
Expand Down Expand Up @@ -181,7 +182,7 @@ func newBuildKit() (*buildkit.BuildKit, error) {
return nil, err
}

kdopts := buildkit.KubernertesDiscoveryOptions{
kdopts := autodiscovery.KubernertesDiscoveryOptions{
Timeout: cfg.BuildKitAutoDiscoveryTimeout,
PodSelector: cfg.BuildKitAutoDiscoveryKubernetesPodSelector,
Namespace: cfg.BuildKitAutoDiscoveryKubernetesNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package buildkit
package autodiscovery

import (
"context"
Expand Down Expand Up @@ -32,7 +32,7 @@ import (
)

var (
noopFunc = func() {}
noopCleaner = func() {}

tsuruAppGVR = schema.GroupVersionResource{
Group: "tsuru.io",
Expand All @@ -41,20 +41,32 @@ var (
}
)

type k8sDiscoverer struct {
cs *kubernetes.Clientset
dcs dynamic.Interface
type KubernertesDiscoveryOptions struct {
PodSelector string
Namespace string
LeasePrefix string
Statefulset string
Port int
UseSameNamespaceAsApp bool
SetTsuruAppLabel bool
ScaleGracefulPeriod time.Duration
Timeout time.Duration
}

func (d *k8sDiscoverer) Discover(ctx context.Context, opts KubernertesDiscoveryOptions, req *pb.BuildRequest, w io.Writer) (*client.Client, func(), error) {
type K8sDiscoverer struct {
KubernetesInterface kubernetes.Interface
DynamicInterface dynamic.Interface
}

func (d *K8sDiscoverer) Discover(ctx context.Context, opts KubernertesDiscoveryOptions, req *pb.BuildRequest, w io.Writer) (*client.Client, func(), error) {
if req.App == nil {
return nil, noopFunc, fmt.Errorf("there's only support for discovering BuildKit pods from Tsuru apps")
return nil, noopCleaner, fmt.Errorf("there's only support for discovering BuildKit pods from Tsuru apps")
}

return d.discoverBuildKitClientFromApp(ctx, opts, req.App.Name, w)
return d.discoverBuildKitClientFromApp(ctx, opts, req.App, w)
}

func (d *k8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts KubernertesDiscoveryOptions, app string, w io.Writer) (*client.Client, func(), error) {
func (d *K8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts KubernertesDiscoveryOptions, app *pb.TsuruApp, w io.Writer) (*client.Client, func(), error) {
leaderCtx, leaderCancel := context.WithCancel(ctx)
cfns := []func(){
func() {
Expand All @@ -63,22 +75,22 @@ func (d *k8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts
},
}

pod, err := d.discoverBuildKitPod(leaderCtx, opts, app, w)
pod, err := d.discoverBuildKitPod(leaderCtx, opts, app.Name, w)
if err != nil {
return nil, cleanUps(cfns...), err
}

if opts.SetTsuruAppLabel {
klog.V(4).Infoln("Setting Tsuru app labels in the pod", pod.Name)

err = setTsuruAppLabelOnBuildKitPod(ctx, d.cs, pod.Name, pod.Namespace, app)
err = setTsuruAppLabelOnBuildKitPod(ctx, d.KubernetesInterface, pod.Name, pod.Namespace, app)
if err != nil {
return nil, cleanUps(cfns...), fmt.Errorf("failed to set Tsuru app labels on BuildKit's pod: %w", err)
}

cfns = append(cfns, func() {
klog.V(4).Infoln("Removing Tsuru app labels in the pod", pod.Name)
nerr := unsetTsuruAppLabelOnBuildKitPod(ctx, d.cs, pod.Name, pod.Namespace)
nerr := unsetTsuruAppLabelOnBuildKitPod(ctx, d.KubernetesInterface, pod.Name, pod.Namespace)
if nerr != nil {
klog.Errorf("failed to unset Tsuru app labels: %s", nerr)
}
Expand All @@ -102,7 +114,7 @@ func (d *k8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts
return c, cleanUps(cfns...), nil
}

func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts KubernertesDiscoveryOptions, app string, w io.Writer) (*corev1.Pod, error) {
func (d *K8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts KubernertesDiscoveryOptions, app string, w io.Writer) (*corev1.Pod, error) {
deadlineCtx, deadlineCancel := context.WithCancel(ctx)
defer deadlineCancel()

Expand All @@ -121,7 +133,7 @@ func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts Kubernerte
defer watchCancel() // watch cancellation must happen before than closing the pods channel

go func() {
nerr := watchBuildKitPods(watchCtx, d.cs, opts, ns, pods, w)
nerr := watchBuildKitPods(watchCtx, d.KubernetesInterface, opts, ns, pods, w)
if nerr != nil {
errCh <- nerr
}
Expand All @@ -141,7 +153,7 @@ func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts Kubernerte
leaseCtx, leaseCancel := context.WithCancel(ctx)
leaseCancelByPod[pod.Name] = leaseCancel

go acquireLeaseForPod(leaseCtx, d.cs, selected, pod, errCh, opts)
go acquireLeaseForPod(leaseCtx, d.KubernetesInterface, selected, pod, errCh, opts)
}
}()

Expand Down Expand Up @@ -175,14 +187,14 @@ func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts Kubernerte
return pod, nil
}

func (d *k8sDiscoverer) buildkitPodNamespace(ctx context.Context, opts KubernertesDiscoveryOptions, app string) (string, error) {
func (d *K8sDiscoverer) buildkitPodNamespace(ctx context.Context, opts KubernertesDiscoveryOptions, app string) (string, error) {
if !opts.UseSameNamespaceAsApp {
return opts.Namespace, nil
}

klog.V(4).Infof("Discovering the namespace where app %s is running on...", app)

tsuruApp, err := d.dcs.Resource(tsuruAppGVR).Namespace(metadata.TsuruAppNamespace).Get(ctx, app, metav1.GetOptions{})
tsuruApp, err := d.DynamicInterface.Resource(tsuruAppGVR).Namespace(metadata.TsuruAppNamespace).Get(ctx, app, metav1.GetOptions{})
if err != nil {
return "", err
}
Expand All @@ -202,7 +214,7 @@ func (d *k8sDiscoverer) buildkitPodNamespace(ctx context.Context, opts Kubernert
return ns, nil
}

func watchBuildKitPods(ctx context.Context, cs *kubernetes.Clientset, opts KubernertesDiscoveryOptions, ns string, pods chan<- *corev1.Pod, writer io.Writer) error {
func watchBuildKitPods(ctx context.Context, cs kubernetes.Interface, opts KubernertesDiscoveryOptions, ns string, pods chan<- *corev1.Pod, writer io.Writer) error {
if opts.Statefulset != "" {
scaleErr := scaler.MayUpscale(ctx, cs, ns, opts.Statefulset, writer)
if scaleErr != nil {
Expand Down Expand Up @@ -240,7 +252,7 @@ func watchBuildKitPods(ctx context.Context, cs *kubernetes.Clientset, opts Kuber
}
}

func acquireLeaseForPod(ctx context.Context, cs *kubernetes.Clientset, ch chan<- *corev1.Pod, pod *corev1.Pod, errCh chan<- error, opts KubernertesDiscoveryOptions) {
func acquireLeaseForPod(ctx context.Context, cs kubernetes.Interface, ch chan<- *corev1.Pod, pod *corev1.Pod, errCh chan<- error, opts KubernertesDiscoveryOptions) {
podname := os.Getenv("POD_NAME")
if podname == "" {
hostname, err := os.Hostname()
Expand Down Expand Up @@ -280,19 +292,19 @@ func acquireLeaseForPod(ctx context.Context, cs *kubernetes.Clientset, ch chan<-
klog.V(4).Infof("Received context cancelation: %s/%s", pod.Namespace, pod.Name)
}
},
OnStoppedLeading: noopFunc,
OnStoppedLeading: func() {},
},
})

klog.V(4).Infof("Shutting off the lease for %s/%s pod", pod.Namespace, pod.Name)
}

func setTsuruAppLabelOnBuildKitPod(ctx context.Context, cs *kubernetes.Clientset, pod, ns, app string) error {
patch, err := json.Marshal([]any{
func setTsuruAppLabelOnBuildKitPod(ctx context.Context, cs kubernetes.Interface, pod, ns string, app *pb.TsuruApp) error {
changes := []any{
map[string]any{
"op": "replace",
"path": fmt.Sprintf("/metadata/labels/%s", normalizeAppLabelForJSONPatch(metadata.TsuruAppNameLabelKey)),
"value": app,
"value": app.Name,
},
map[string]any{
"op": "replace",
Expand All @@ -309,7 +321,17 @@ func setTsuruAppLabelOnBuildKitPod(ctx context.Context, cs *kubernetes.Clientset
"path": fmt.Sprintf("/metadata/annotations/%s", normalizeAppLabelForJSONPatch(metadata.DeployAgentLastBuildStartingLabelKey)),
"value": strconv.FormatInt(time.Now().Unix(), 10),
},
})
}

if app.Team != "" {
changes = append(changes, map[string]any{
"op": "replace",
"path": fmt.Sprintf("/metadata/labels/%s", normalizeAppLabelForJSONPatch(metadata.TsuruAppTeamLabelKey)),
"value": app.Team,
})
}

patch, err := json.Marshal(changes)
if err != nil {
return err
}
Expand All @@ -318,12 +340,16 @@ func setTsuruAppLabelOnBuildKitPod(ctx context.Context, cs *kubernetes.Clientset
return err
}

func unsetTsuruAppLabelOnBuildKitPod(ctx context.Context, cs *kubernetes.Clientset, pod, ns string) error {
func unsetTsuruAppLabelOnBuildKitPod(ctx context.Context, cs kubernetes.Interface, pod, ns string) error {
patch, err := json.Marshal([]any{
map[string]any{
"op": "remove",
"path": fmt.Sprintf("/metadata/labels/%s", normalizeAppLabelForJSONPatch(metadata.TsuruAppNameLabelKey)),
},
map[string]any{
"op": "remove",
"path": fmt.Sprintf("/metadata/labels/%s", normalizeAppLabelForJSONPatch(metadata.TsuruAppTeamLabelKey)),
},
map[string]any{
"op": "remove",
"path": fmt.Sprintf("/metadata/labels/%s", normalizeAppLabelForJSONPatch(metadata.TsuruIsBuildLabelKey)),
Expand Down
96 changes: 96 additions & 0 deletions pkg/build/buildkit/autodiscovery/k8s_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2024 tsuru authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package autodiscovery

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/tsuru/deploy-agent/pkg/build/grpc_build_v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
fakeDynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/fake"
kuberntesTesting "k8s.io/client-go/testing"
)

func TestK8sDiscoverer_Discover(t *testing.T) {
buildKitPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "tsuru",
Labels: map[string]string{
"app": "test-app",
},
Annotations: map[string]string{
"foo": "bar",
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "127.0.0.1",
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
fakeClient := fake.NewSimpleClientset(buildKitPod)

fakeClient.PrependWatchReactor("*", func(action kuberntesTesting.Action) (handled bool, ret watch.Interface, err error) {
watcher := watch.NewFake()

go func() {
time.Sleep(time.Millisecond * 100)
watcher.Add(buildKitPod)
}()
return true, watcher, nil
})

fakeDynamicClient := fakeDynamic.NewSimpleDynamicClient(runtime.NewScheme())

discoverer := K8sDiscoverer{
KubernetesInterface: fakeClient,
DynamicInterface: fakeDynamicClient,
}

_, _, err := discoverer.Discover(
context.TODO(),
KubernertesDiscoveryOptions{
PodSelector: "app=test-app",
Namespace: "tsuru",
Timeout: time.Second * 2,
SetTsuruAppLabel: true,
},
&grpc_build_v1.BuildRequest{
App: &grpc_build_v1.TsuruApp{
Name: "test-app",
Team: "test-team",
},
},
os.Stdout,
)
assert.NoError(t, err)

existingPod, err := fakeClient.CoreV1().Pods("tsuru").Get(context.TODO(), "test-app", metav1.GetOptions{})
assert.NoError(t, err)

assert.Equal(t, map[string]string{
"app": "test-app",
"tsuru.io/app-name": "test-app",
"tsuru.io/app-team": "test-team",
"tsuru.io/is-build": "true",
}, existingPod.Labels)

assert.Equal(t, "", existingPod.Annotations["deploy-agent.tsuru.io/last-build-ending-time"])
assert.NotEqual(t, "", existingPod.Annotations["deploy-agent.tsuru.io/last-build-starting-time"])
}
29 changes: 10 additions & 19 deletions pkg/build/buildkit/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/tsuru/deploy-agent/pkg/build"
"github.com/tsuru/deploy-agent/pkg/build/buildkit/autodiscovery"
"github.com/tsuru/deploy-agent/pkg/build/buildkit/scaler"
pb "github.com/tsuru/deploy-agent/pkg/build/grpc_build_v1"
repo "github.com/tsuru/deploy-agent/pkg/repository"
Expand All @@ -56,7 +57,7 @@ type BuildKit struct {
cli *client.Client
k8s *kubernetes.Clientset
dk8s dynamic.Interface
kdopts *KubernertesDiscoveryOptions
kdopts *autodiscovery.KubernertesDiscoveryOptions
opts BuildKitOptions
m sync.RWMutex
}
Expand All @@ -65,19 +66,7 @@ func NewBuildKit(c *client.Client, opts BuildKitOptions) *BuildKit {
return &BuildKit{cli: c, opts: opts}
}

type KubernertesDiscoveryOptions struct {
PodSelector string
Namespace string
LeasePrefix string
Statefulset string
Port int
UseSameNamespaceAsApp bool
SetTsuruAppLabel bool
ScaleGracefulPeriod time.Duration
Timeout time.Duration
}

func (b *BuildKit) WithKubernetesDiscovery(cs *kubernetes.Clientset, dcs dynamic.Interface, opts KubernertesDiscoveryOptions) *BuildKit {
func (b *BuildKit) WithKubernetesDiscovery(cs *kubernetes.Clientset, dcs dynamic.Interface, opts autodiscovery.KubernertesDiscoveryOptions) *BuildKit {
b.k8s = cs
b.dk8s = dcs
b.kdopts = &opts
Expand Down Expand Up @@ -597,16 +586,18 @@ func callBuildKitToExtractTsuruConfigs(ctx context.Context, c *client.Client, lo
return tc, nil
}

func (b *BuildKit) client(ctx context.Context, req *pb.BuildRequest, w io.Writer) (*client.Client, func(), error) {
type clientCleanUp func()

func (b *BuildKit) client(ctx context.Context, req *pb.BuildRequest, w io.Writer) (*client.Client, clientCleanUp, error) {
isBuildForApp := strings.HasPrefix(pb.BuildKind_name[int32(req.Kind)], "BUILD_KIND_APP_")

if isBuildForApp && b.opts.DiscoverBuildKitClientForApp {
d := &k8sDiscoverer{
cs: b.k8s,
dcs: b.dk8s,
d := &autodiscovery.K8sDiscoverer{
KubernetesInterface: b.k8s,
DynamicInterface: b.dk8s,
}
return d.Discover(ctx, *b.kdopts, req, w)
}

return b.cli, noopFunc, nil
return b.cli, func() {}, nil
}
Loading

0 comments on commit 8b6d39f

Please sign in to comment.