From 34cdd61983f56268e3799c467b58264e1172a10f Mon Sep 17 00:00:00 2001 From: Tejal Desai Date: Thu, 27 Jun 2019 14:10:46 -0700 Subject: [PATCH] Move WaitForDeploymentsToStabilize to webhook. --- cmd/skaffold/app/cmd/flags.go | 14 +--- docs/content/en/docs/references/cli/_index.md | 21 ----- integration/debug_test.go | 2 +- integration/helm_test.go | 2 - integration/run_test.go | 1 - integration/util.go | 51 ------------ pkg/skaffold/kubernetes/wait.go | 39 ---------- pkg/skaffold/runner/deploy_test.go | 78 +------------------ pkg/skaffold/runner/runner.go | 49 ------------ pkg/webhook/kubernetes/deployment.go | 48 +++++++++++- webhook/webhook.go | 3 +- 11 files changed, 51 insertions(+), 257 deletions(-) diff --git a/cmd/skaffold/app/cmd/flags.go b/cmd/skaffold/app/cmd/flags.go index a0114504329..a113b3a95f6 100644 --- a/cmd/skaffold/app/cmd/flags.go +++ b/cmd/skaffold/app/cmd/flags.go @@ -222,23 +222,11 @@ var FlagRegistry = []Flag{ }, { Name: "status-check", -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD Usage: "Wait for deployed resources to stabilize", -======= - Usage: "", ->>>>>>> wip -======= - Usage: "Wait for deployed resources to stabalize", ->>>>>>> fix linter -======= - Usage: "Wait for deployed resources to stabilize", ->>>>>>> address @balintp's comment Value: &opts.StatusCheck, DefValue: true, FlagAddMethod: "BoolVar", - DefinedOn: []string{"dev", "debug", "deploy"}, + DefinedOn: []string{"dev", "debug", "deploy", "run"}, }, } diff --git a/docs/content/en/docs/references/cli/_index.md b/docs/content/en/docs/references/cli/_index.md index 41f1f79dcf6..66e57a763f7 100644 --- a/docs/content/en/docs/references/cli/_index.md +++ b/docs/content/en/docs/references/cli/_index.md @@ -277,13 +277,6 @@ Flags: --rpc-http-port int tcp port to expose event REST API over HTTP (default 50052) --rpc-port int tcp port to expose event API (default 50051) --skip-tests Whether to skip the tests after building -<<<<<<< HEAD -<<<<<<< HEAD -======= - --status-check Wait for deployed resources to stabalize (default true) ->>>>>>> fix linter -======= ->>>>>>> address @balintp's comment --tail Stream logs from deployed objects (default true) --toot Emit a terminal beep after the deploy is complete @@ -363,13 +356,6 @@ Flags: -p, --profile strings Activate profiles by name --rpc-http-port int tcp port to expose event REST API over HTTP (default 50052) --rpc-port int tcp port to expose event API (default 50051) -<<<<<<< HEAD -<<<<<<< HEAD -======= - --status-check Wait for deployed resources to stabalize (default true) ->>>>>>> fix linter -======= ->>>>>>> address @balintp's comment --tail Stream logs from deployed objects (default false) --toot Emit a terminal beep after the deploy is complete @@ -421,13 +407,6 @@ Flags: --rpc-http-port int tcp port to expose event REST API over HTTP (default 50052) --rpc-port int tcp port to expose event API (default 50051) --skip-tests Whether to skip the tests after building -<<<<<<< HEAD -<<<<<<< HEAD -======= - --status-check Wait for deployed resources to stabalize (default true) ->>>>>>> fix linter -======= ->>>>>>> address @balintp's comment --tail Stream logs from deployed objects (default true) --toot Emit a terminal beep after the deploy is complete --trigger string How are changes detected? (polling, manual or notify) (default "polling") diff --git a/integration/debug_test.go b/integration/debug_test.go index 833e6eba637..fd0741b4393 100644 --- a/integration/debug_test.go +++ b/integration/debug_test.go @@ -62,7 +62,7 @@ func TestDebug(t *testing.T) { defer stop() client.WaitForPodsReady(test.pods...) - client.WaitForDeploymentsToStabilize(test.deployments...) + for _, depName := range test.deployments { deploy := client.GetDeployment(depName) annotations := deploy.Spec.Template.GetAnnotations() diff --git a/integration/helm_test.go b/integration/helm_test.go index 7e41882b2af..37e15187990 100644 --- a/integration/helm_test.go +++ b/integration/helm_test.go @@ -52,8 +52,6 @@ func TestHelmDeploy(t *testing.T) { skaffold.Deploy(runArgs...).InDir(helmDir).InNs(ns.Name).WithEnv(env).RunOrFailOutput(t) - client.WaitForDeploymentsToStabilize(depName) - expectedLabels := map[string]string{ "app.kubernetes.io/managed-by": TestVersion, "release": depName, diff --git a/integration/run_test.go b/integration/run_test.go index 5e08eae0766..dbe9a194e78 100644 --- a/integration/run_test.go +++ b/integration/run_test.go @@ -124,7 +124,6 @@ func TestRun(t *testing.T) { skaffold.Run(test.args...).WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFailOutput(t) client.WaitForPodsReady(test.pods...) - client.WaitForDeploymentsToStabilize(test.deployments...) skaffold.Delete().WithConfig(test.filename).InDir(test.dir).InNs(ns.Name).WithEnv(test.env).RunOrFail(t) }) diff --git a/integration/util.go b/integration/util.go index 51f3a40da4a..b55a857c7ad 100644 --- a/integration/util.go +++ b/integration/util.go @@ -126,8 +126,6 @@ func (k *NSKubernetesClient) WaitForPodsReady(podNames ...string) { // GetDeployment gets a deployment by name. func (k *NSKubernetesClient) GetDeployment(depName string) *appsv1.Deployment { - k.WaitForDeploymentsToStabilize(depName) - dep, err := k.client.AppsV1().Deployments(k.ns).Get(depName, meta_v1.GetOptions{}) if err != nil { k.t.Fatalf("Could not find deployment: %s in namespace %s", depName, k.ns) @@ -135,51 +133,6 @@ func (k *NSKubernetesClient) GetDeployment(depName string) *appsv1.Deployment { return dep } -// WaitForDeploymentsToStabilize waits for a list of deployments to become stable. -func (k *NSKubernetesClient) WaitForDeploymentsToStabilize(depNames ...string) { - if len(depNames) == 0 { - return - } - - logrus.Infoln("Waiting for deployments", depNames, "to stabilize") - - ctx, cancelTimeout := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancelTimeout() - - w, err := k.client.AppsV1().Deployments(k.ns).Watch(meta_v1.ListOptions{}) - if err != nil { - k.t.Fatalf("Unable to watch deployments: %v", err) - } - defer w.Stop() - - deployments := map[string]*appsv1.Deployment{} - - for { - waitLoop: - select { - case <-ctx.Done(): - k.debug("deployments.apps") - k.debug("pods") - k.t.Fatalf("Timed out waiting for deployments %v to stabilize in namespace %s", depNames, k.ns) - - case event := <-w.ResultChan(): - dp := event.Object.(*appsv1.Deployment) - logrus.Infof("Deployment %s: Generation %d/%d, Replicas %d/%d", dp.Name, dp.Status.ObservedGeneration, dp.Generation, dp.Status.Replicas, *(dp.Spec.Replicas)) - - deployments[dp.Name] = dp - - for _, depName := range depNames { - if d, present := deployments[depName]; !present || !isStable(d) { - break waitLoop - } - } - - logrus.Infoln("Deployments", depNames, "are stable") - return - } - } -} - // debug is used to print all the details about pods or deployments func (k *NSKubernetesClient) debug(entities string) { cmd := exec.Command("kubectl", "-n", k.ns, "get", entities, "-oyaml") @@ -189,7 +142,3 @@ func (k *NSKubernetesClient) debug(entities string) { // Use fmt.Println, not logrus, for prettier output fmt.Println(string(out)) } - -func isStable(dp *appsv1.Deployment) bool { - return dp.Generation <= dp.Status.ObservedGeneration && *(dp.Spec.Replicas) == dp.Status.Replicas -} diff --git a/pkg/skaffold/kubernetes/wait.go b/pkg/skaffold/kubernetes/wait.go index 0d18fad624e..63bd787c9b4 100644 --- a/pkg/skaffold/kubernetes/wait.go +++ b/pkg/skaffold/kubernetes/wait.go @@ -22,16 +22,10 @@ import ( "fmt" "time" - "github.com/golang/glog" "github.com/sirupsen/logrus" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -121,36 +115,3 @@ func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podNam return false, nil }) } - -// WaitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status. -func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error { - logrus.Infof("Waiting for %s to stabilize", name) - - fields := fields.Set{ - "metadata.name": name, - "metadata.namespace": ns, - } - w, err := c.AppsV1().Deployments(ns).Watch(meta_v1.ListOptions{ - FieldSelector: fields.AsSelector().String(), - }) - if err != nil { - return fmt.Errorf("initializing deployment watcher: %s", err) - } - - return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) { - if event.Type == watch.Deleted { - return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "") - } - - if dp, ok := event.Object.(*appsv1.Deployment); ok { - if dp.Name == name && dp.Namespace == ns && - dp.Generation <= dp.Status.ObservedGeneration && - *(dp.Spec.Replicas) == dp.Status.Replicas { - return true, nil - } - glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d", - name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas) - } - return false, nil - }) -} diff --git a/pkg/skaffold/runner/deploy_test.go b/pkg/skaffold/runner/deploy_test.go index 846430afe89..4fddd218f46 100644 --- a/pkg/skaffold/runner/deploy_test.go +++ b/pkg/skaffold/runner/deploy_test.go @@ -17,44 +17,16 @@ limitations under the License. package runner import ( -<<<<<<< HEAD -<<<<<<< HEAD "bytes" "context" "errors" "strings" -======= - "context" - "io/ioutil" ->>>>>>> wip -======= - "bytes" - "context" - "errors" - "strings" ->>>>>>> add status check flag which does nothing for now and test "testing" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy" runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context" "github.com/GoogleContainerTools/skaffold/testutil" -<<<<<<< HEAD -<<<<<<< HEAD -) - -func TestDeploy(t *testing.T) { - expectedOutput := "Waiting for deployments to stabilize" - var tests = []struct { - description string - testBench *TestBench - statusCheck bool - shouldErr bool - shouldWait bool -======= - "github.com/pkg/errors" -======= ->>>>>>> add status check flag which does nothing for now and test ) func TestDeploy(t *testing.T) { @@ -63,25 +35,14 @@ func TestDeploy(t *testing.T) { description string testBench *TestBench statusCheck bool -<<<<<<< HEAD ->>>>>>> wip -======= shouldErr bool shouldWait bool ->>>>>>> add status check flag which does nothing for now and test }{ { description: "deploy shd perform status check", testBench: &TestBench{}, statusCheck: true, -<<<<<<< HEAD -<<<<<<< HEAD - shouldWait: true, -======= ->>>>>>> wip -======= shouldWait: true, ->>>>>>> add status check flag which does nothing for now and test }, { description: "deploy shd not perform status check", @@ -89,21 +50,9 @@ func TestDeploy(t *testing.T) { }, { description: "deploy shd not perform status check when deployer is in error", -<<<<<<< HEAD -<<<<<<< HEAD shouldErr: true, statusCheck: true, testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}}, -======= - testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}}, - shouldError: true, - statusCheck: true, ->>>>>>> wip -======= - shouldErr: true, - statusCheck: true, - testBench: &TestBench{deployErrors: []error{errors.New("deploy error")}}, ->>>>>>> add status check flag which does nothing for now and test }, } @@ -112,27 +61,7 @@ func TestDeploy(t *testing.T) { } originalStatusCheck := deploy.StatusCheck for _, test := range tests { -<<<<<<< HEAD -<<<<<<< HEAD - testutil.Run(t, test.description, func(t *testutil.T) { - - runner := createRunner(t, test.testBench) - runner.runCtx.Opts.StatusCheck = test.statusCheck - out := new(bytes.Buffer) - - err := runner.Deploy(context.Background(), out, []build.Artifact{ - {ImageName: "img1", Tag: "img1:tag1"}, - {ImageName: "img2", Tag: "img2:tag2"}, - }) - t.CheckError(test.shouldErr, err) - if strings.Contains(out.String(), expectedOutput) != test.shouldWait { - t.Errorf("expected %s to contain %s %t. But found %t", out.String(), expectedOutput, test.shouldWait, !test.shouldWait) - } -======= - t.Run(test.description, func(t *testing.T) { -======= testutil.Run(t, test.description, func(t *testutil.T) { ->>>>>>> add status check flag which does nothing for now and test runner := createRunner(t, test.testBench) runner.runCtx.Opts.StatusCheck = test.statusCheck @@ -147,15 +76,10 @@ func TestDeploy(t *testing.T) { {ImageName: "img1", Tag: "img1:tag1"}, {ImageName: "img2", Tag: "img2:tag2"}, }) -<<<<<<< HEAD - testutil.CheckError(t, test.shouldError, err) ->>>>>>> wip -======= t.CheckError(test.shouldErr, err) if strings.Contains(out.String(), expectedOutput) != test.shouldWait { t.Errorf("expected %s to contain %s %t. But found %t", out.String(), expectedOutput, test.shouldWait, !test.shouldWait) } ->>>>>>> add status check flag which does nothing for now and test }) } -} \ No newline at end of file +} diff --git a/pkg/skaffold/runner/runner.go b/pkg/skaffold/runner/runner.go index d55e51f5bf9..86a68f6a57a 100644 --- a/pkg/skaffold/runner/runner.go +++ b/pkg/skaffold/runner/runner.go @@ -207,10 +207,6 @@ func getTagger(t latest.TagPolicy, customTag string) (tag.Tagger, error) { } } -<<<<<<< HEAD -<<<<<<< HEAD -======= ->>>>>>> add status check flag which does nothing for now and test func (r *SkaffoldRunner) Deploy(ctx context.Context, out io.Writer, artifacts []build.Artifact) error { if cfg.IsKindCluster(r.runCtx.KubeContext) { // With `kind`, docker images have to be loaded with the `kind` CLI. @@ -224,67 +220,22 @@ func (r *SkaffoldRunner) Deploy(ctx context.Context, out io.Writer, artifacts [] if err != nil { return err } -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD - return r.performStatusCheck(out) -======= return r.performStatusCheck(ctx, out) ->>>>>>> fix gofmt and integration tests } func (r *SkaffoldRunner) performStatusCheck(ctx context.Context, out io.Writer) error { // Check if we need to perform deploy status if r.runCtx.Opts.StatusCheck { -<<<<<<< HEAD -<<<<<<< HEAD fmt.Fprintln(out, "Waiting for deployments to stabilize") -======= - return r.performStatusCheck(ctx, out) -======= - return r.performStatusCheck(out) ->>>>>>> fix linter -} - -func (r *SkaffoldRunner) performStatusCheck(out io.Writer) error { - // Check if we need to perform deploy status - if r.runCtx.Opts.StatusCheck { -<<<<<<< HEAD - fmt.Fprintln(out, "Performing status check") ->>>>>>> add status check flag which does nothing for now and test -======= - fmt.Fprintln(out, "Waiting for deployments to stabalize") ->>>>>>> fix linter - // TODO : Actually perform status check -======= - fmt.Fprintln(out, "Performing status check") - err:= deploy.StatusCheck(ctx, r.runCtx) -======= - fmt.Fprintln(out, "Waiting for deployments to stabilize") -<<<<<<< HEAD - err:= statusCheck(ctx, r.runCtx) ->>>>>>> fix test -======= err := statusCheck(ctx, r.defaultLabeller, r.runCtx) ->>>>>>> fix gofmt and integration tests if err != nil { fmt.Fprintln(out, err.Error()) } return err ->>>>>>> more testing } return nil } -<<<<<<< HEAD -======= ->>>>>>> wip -======= - -<<<<<<< HEAD ->>>>>>> add status check flag which does nothing for now and test -======= ->>>>>>> fix linter // HasDeployed returns true if this runner has deployed something. func (r *SkaffoldRunner) HasDeployed() bool { return r.hasDeployed diff --git a/pkg/webhook/kubernetes/deployment.go b/pkg/webhook/kubernetes/deployment.go index e3c1318d50c..5bf435b82c0 100644 --- a/pkg/webhook/kubernetes/deployment.go +++ b/pkg/webhook/kubernetes/deployment.go @@ -28,12 +28,19 @@ import ( pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes" "github.com/GoogleContainerTools/skaffold/pkg/webhook/constants" "github.com/GoogleContainerTools/skaffold/pkg/webhook/labels" + "github.com/golang/glog" "github.com/google/go-github/github" "github.com/pkg/errors" + "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" ) const ( @@ -128,7 +135,7 @@ func WaitForDeploymentToStabilize(d *appsv1.Deployment, ip string) error { if err != nil { return errors.Wrap(err, "getting clientset") } - if err := pkgkubernetes.WaitForDeploymentToStabilize(context.Background(), client, d.Namespace, d.Name, 5*time.Minute); err != nil { + if err := waitForDeploymentToStabilize(context.Background(), client, d.Namespace, d.Name, 5*time.Minute); err != nil { return errors.Wrap(err, "waiting for deployment to stabilize") } // wait up to five minutes for the URL to return a valid endpoint @@ -166,3 +173,42 @@ func Logs(d *appsv1.Deployment) string { } return fmt.Sprintf("Init container logs: \n %s \nContainer Logs: \n %s", initLogs, logs) } + +// waitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status. +func waitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error { + logrus.Infof("Waiting for %s to stabilize", name) + + fields := fields.Set{ + "metadata.name": name, + "metadata.namespace": ns, + } + w, err := c.AppsV1().Deployments(ns).Watch(metav1.ListOptions{ + FieldSelector: fields.AsSelector().String(), + }) + if err != nil { + return fmt.Errorf("initializing deployment watcher: %s", err) + } + + ctx, cancelTimeout := context.WithTimeout(ctx, timeout) + defer cancelTimeout() + + for { + select { + case <-ctx.Done(): + return errors.New("context closed while waiting for condition") + case event := <-w.ResultChan(): + if event.Type == watch.Deleted { + return apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "") + } + if dp, ok := event.Object.(*appsv1.Deployment); ok { + if dp.Name == name && dp.Namespace == ns && + dp.Generation <= dp.Status.ObservedGeneration && + *(dp.Spec.Replicas) == dp.Status.Replicas { + return nil + } + glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d", + name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas) + } + } + } +} diff --git a/webhook/webhook.go b/webhook/webhook.go index 7a8944537ba..fef6c480838 100644 --- a/webhook/webhook.go +++ b/webhook/webhook.go @@ -23,9 +23,8 @@ import ( "log" "net/http" - "github.com/GoogleContainerTools/skaffold/pkg/webhook/gcs" - "github.com/GoogleContainerTools/skaffold/pkg/webhook/constants" + "github.com/GoogleContainerTools/skaffold/pkg/webhook/gcs" pkggithub "github.com/GoogleContainerTools/skaffold/pkg/webhook/github" "github.com/GoogleContainerTools/skaffold/pkg/webhook/kubernetes" "github.com/GoogleContainerTools/skaffold/pkg/webhook/labels"