diff --git a/flytectl/.github/workflows/checks.yml b/flytectl/.github/workflows/checks.yml index d7d36c4b18..2c036ba3a4 100644 --- a/flytectl/.github/workflows/checks.yml +++ b/flytectl/.github/workflows/checks.yml @@ -63,6 +63,7 @@ jobs: name: Test Getting started runs-on: ubuntu-latest steps: + - uses: insightsengineering/disk-space-reclaimer@v1 - name: Checkout uses: actions/checkout@v2 - uses: actions/cache@v2 @@ -78,7 +79,13 @@ jobs: - name: Build Flytectl binary run: make compile - name: Create a sandbox cluster - run: bin/flytectl sandbox start + run: | + bin/flytectl demo start + # Sleep is necessary here since `flyte-proxy` might not be ready + # to serve requests when the above command exits successfully. + # Fixed in: https://github.com/flyteorg/flyte/pull/4348 + # TODO (jeev): Remove this when ^ is released. + sleep 5 - name: Setup flytectl config run: bin/flytectl config init - name: Register cookbook diff --git a/flytectl/cmd/configuration/configuration.go b/flytectl/cmd/configuration/configuration.go index 7f06f92094..010416e36d 100644 --- a/flytectl/cmd/configuration/configuration.go +++ b/flytectl/cmd/configuration/configuration.go @@ -87,7 +87,7 @@ func initFlytectlConfig(reader io.Reader) error { } templateValues := configutil.ConfigTemplateSpec{ - Host: "dns:///localhost:30081", + Host: "dns:///localhost:30080", Insecure: true, } templateStr := configutil.GetTemplate() diff --git a/flytectl/go.mod b/flytectl/go.mod index 0344faeca5..4beaf4d5a6 100644 --- a/flytectl/go.mod +++ b/flytectl/go.mod @@ -3,6 +3,7 @@ module github.com/flyteorg/flytectl go 1.19 require ( + github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5 github.com/avast/retry-go v3.0.0+incompatible github.com/awalterschulze/gographviz v2.0.3+incompatible github.com/disiqueira/gotree v1.0.0 @@ -22,6 +23,7 @@ require ( github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 github.com/landoop/tableprinter v0.0.0-20180806200924-8bd8c2576d27 github.com/mitchellh/mapstructure v1.5.0 + github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 github.com/mouuff/go-rocket-update v1.5.1 github.com/opencontainers/image-spec v1.0.2 github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 @@ -41,6 +43,7 @@ require ( k8s.io/api v0.24.1 k8s.io/apimachinery v0.24.1 k8s.io/client-go v0.24.1 + k8s.io/kubernetes v1.13.0 sigs.k8s.io/yaml v1.3.0 ) @@ -54,6 +57,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 // indirect + github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.27 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect @@ -113,7 +117,6 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect - github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect diff --git a/flytectl/go.sum b/flytectl/go.sum index 41bdd143c0..00892a2001 100644 --- a/flytectl/go.sum +++ b/flytectl/go.sum @@ -123,6 +123,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5 h1:VYqcjykqpcq262cDxBAkAelSdg6HETkxgwzQRTS40Aw= +github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5/go.mod h1:E7x8aDc3AQzDKjEoIZCt+XYheHk2OkP+p2UgeNjecH8= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= @@ -282,6 +284,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKY github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1SMSibvLzxjeJLnrYEVLULFNiHY9YfQ= @@ -1426,6 +1429,7 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= +k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8= k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= diff --git a/flytectl/pkg/docker/docker_util.go b/flytectl/pkg/docker/docker_util.go index 77475f8667..e9315cab1e 100644 --- a/flytectl/pkg/docker/docker_util.go +++ b/flytectl/pkg/docker/docker_util.go @@ -17,12 +17,14 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/stdcopy" "github.com/docker/go-connections/nat" "github.com/flyteorg/flytectl/clierrors" "github.com/flyteorg/flytectl/cmd/config/subcommand/docker" cmdUtil "github.com/flyteorg/flytectl/pkg/commandutils" f "github.com/flyteorg/flytectl/pkg/filesystemutils" + "github.com/moby/term" ) var ( @@ -152,34 +154,51 @@ func PullDockerImage(ctx context.Context, cli Docker, image string, pullPolicy I PrintPullImage(image, imagePullOptions) return nil } - fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, image) - if pullPolicy == ImagePullPolicyAlways || pullPolicy == ImagePullPolicyIfNotPresent { - if pullPolicy == ImagePullPolicyIfNotPresent { - imageSummary, err := cli.ImageList(ctx, types.ImageListOptions{}) - if err != nil { - return err - } - for _, img := range imageSummary { - for _, tags := range img.RepoTags { - if image == tags { - return nil - } - } - } - } - r, err := cli.ImagePull(ctx, image, types.ImagePullOptions{ - RegistryAuth: imagePullOptions.RegistryAuth, - Platform: imagePullOptions.Platform, - }) + var needsPull bool + if pullPolicy == ImagePullPolicyAlways { + needsPull = true + } else { + imageSummary, err := cli.ImageList(ctx, types.ImageListOptions{}) if err != nil { return err } + found := false + for _, img := range imageSummary { + for _, tags := range img.RepoTags { + if image == tags { + found = true + break + } + } + if found { + break + } + } + needsPull = !found + } - _, err = io.Copy(os.Stdout, r) + // Image already exists, nothing to do. + if !needsPull { + return nil + } + + // Image needs to be pulled but pull policy prevents it + if pullPolicy == ImagePullPolicyNever { + return fmt.Errorf("Image does not exist, but image pull policy prevents pulling it: %s", image) + } + + fmt.Printf("%v Pulling image %s\n", emoji.Whale, image) + r, err := cli.ImagePull(ctx, image, types.ImagePullOptions{ + RegistryAuth: imagePullOptions.RegistryAuth, + Platform: imagePullOptions.Platform, + }) + if err != nil { return err } - return nil + defer r.Close() + termFd, isTerm := term.GetFdInfo(os.Stderr) + return jsonmessage.DisplayJSONMessagesStream(r, os.Stderr, termFd, isTerm, nil) } // PrintPullImage helper function to print the sandbox pull image command @@ -237,7 +256,7 @@ func StartContainer(ctx context.Context, cli Docker, volumes []mount.Mount, expo PrintCreateContainer(volumes, portBindings, name, image, Environment) return "", nil } - fmt.Printf("%v booting Flyte-sandbox container\n", emoji.FactoryWorker) + fmt.Printf("%v Starting container... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) resp, err := cli.ContainerCreate(ctx, &container.Config{ Env: Environment, Image: image, diff --git a/flytectl/pkg/docker/docker_util_test.go b/flytectl/pkg/docker/docker_util_test.go index 32ad1cda12..0de840bb7c 100644 --- a/flytectl/pkg/docker/docker_util_test.go +++ b/flytectl/pkg/docker/docker_util_test.go @@ -5,6 +5,7 @@ import ( "bufio" "context" "fmt" + "io" "os" "path/filepath" "strings" @@ -44,6 +45,10 @@ func setupSandbox() { containers = append(containers, container1) } +func dummyReader() io.ReadCloser { + return io.NopCloser(strings.NewReader("")) +} + func TestGetSandbox(t *testing.T) { setupSandbox() t.Run("Successfully get sandbox container", func(t *testing.T) { @@ -109,44 +114,68 @@ func TestRemoveSandboxWithNoReply(t *testing.T) { } func TestPullDockerImage(t *testing.T) { - t.Run("Successfully pull image Always", func(t *testing.T) { - setupSandbox() + t.Run("Successful pull existing image with ImagePullPolicyAlways", func(t *testing.T) { mockDocker := &mocks.Docker{} ctx := context.Background() // Verify the attributes - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) + mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{{RepoTags: []string{"nginx:latest"}}}, nil) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyAlways, ImagePullOptions{}, false) + assert.Nil(t, err) + }) + + t.Run("Successful pull non-existent image with ImagePullPolicyAlways", func(t *testing.T) { + mockDocker := &mocks.Docker{} + ctx := context.Background() + // Verify the attributes + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) + mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{}, nil) err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyAlways, ImagePullOptions{}, false) assert.Nil(t, err) }) t.Run("Error in pull image", func(t *testing.T) { - setupSandbox() mockDocker := &mocks.Docker{} ctx := context.Background() // Verify the attributes - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("error")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), fmt.Errorf("error")) err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyAlways, ImagePullOptions{}, false) assert.NotNil(t, err) }) - t.Run("Successfully pull image IfNotPresent", func(t *testing.T) { - setupSandbox() + t.Run("Success pull non-existent image with ImagePullPolicyIfNotPresent", func(t *testing.T) { mockDocker := &mocks.Docker{} ctx := context.Background() // Verify the attributes - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{}, nil) err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyIfNotPresent, ImagePullOptions{}, false) assert.Nil(t, err) }) - t.Run("Successfully pull image Never", func(t *testing.T) { - setupSandbox() + t.Run("Success skip existing image with ImagePullPolicyIfNotPresent", func(t *testing.T) { + mockDocker := &mocks.Docker{} + ctx := context.Background() + mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{{RepoTags: []string{"nginx:latest"}}}, nil) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyIfNotPresent, ImagePullOptions{}, false) + assert.Nil(t, err) + }) + + t.Run("Success skip existing image with ImagePullPolicyNever", func(t *testing.T) { mockDocker := &mocks.Docker{} ctx := context.Background() + mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{{RepoTags: []string{"nginx:latest"}}}, nil) err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyNever, ImagePullOptions{}, false) assert.Nil(t, err) }) + + t.Run("Error non-existent image with ImagePullPolicyNever", func(t *testing.T) { + mockDocker := &mocks.Docker{} + ctx := context.Background() + mockDocker.OnImageListMatch(ctx, types.ImageListOptions{}).Return([]types.ImageSummary{}, nil) + err := PullDockerImage(ctx, mockDocker, "nginx:latest", ImagePullPolicyNever, ImagePullOptions{}, false) + assert.ErrorContains(t, err, "Image does not exist, but image pull policy prevents pulling it") + }) } func TestStartContainer(t *testing.T) { diff --git a/flytectl/pkg/k8s/k8s.go b/flytectl/pkg/k8s/k8s.go index 705c6887e6..f185e53d2e 100644 --- a/flytectl/pkg/k8s/k8s.go +++ b/flytectl/pkg/k8s/k8s.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/enescakir/emoji" "github.com/pkg/errors" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -18,7 +19,7 @@ type K8s interface { //go:generate mockery -name=ContextOps -case=underscore type ContextOps interface { CheckConfig() error - CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName, targetCtxName string) error + CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName, targetCtxName, targetNamespace string) error RemoveContext(ctxName string) error } @@ -64,7 +65,7 @@ func (k *ContextManager) CheckConfig() error { } // CopyContext copies context srcCtxName part of srcConfigAccess to targetCtxName part of targetConfigAccess. -func (k *ContextManager) CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName, targetCtxName string) error { +func (k *ContextManager) CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName, targetCtxName, targetNamespace string) error { err := k.CheckConfig() if err != nil { return err @@ -86,7 +87,7 @@ func (k *ContextManager) CopyContext(srcConfigAccess clientcmd.ConfigAccess, src _, exists = toStartingConfig.Contexts[targetCtxName] if exists { - fmt.Printf("context %v already exist. Overwriting it\n", targetCtxName) + fmt.Printf("%v Context %q already exists. Overwriting it!\n", emoji.FactoryWorker, targetCtxName) } else { toStartingConfig.Contexts[targetCtxName] = clientcmdapi.NewContext() } @@ -97,12 +98,13 @@ func (k *ContextManager) CopyContext(srcConfigAccess clientcmd.ConfigAccess, src toStartingConfig.AuthInfos[targetCtxName].LocationOfOrigin = k.configAccess.GetDefaultFilename() toStartingConfig.Contexts[targetCtxName].Cluster = targetCtxName toStartingConfig.Contexts[targetCtxName].AuthInfo = targetCtxName + toStartingConfig.Contexts[targetCtxName].Namespace = targetNamespace toStartingConfig.CurrentContext = targetCtxName if err := clientcmd.ModifyConfig(k.configAccess, *toStartingConfig, true); err != nil { return err } - fmt.Printf("context modified for %q and switched over to it.\n", targetCtxName) + fmt.Printf("%v Activated context %q!\n", emoji.FactoryWorker, targetCtxName) return nil } diff --git a/flytectl/pkg/k8s/mocks/context_ops.go b/flytectl/pkg/k8s/mocks/context_ops.go index 6229f02b45..74bd6c7587 100644 --- a/flytectl/pkg/k8s/mocks/context_ops.go +++ b/flytectl/pkg/k8s/mocks/context_ops.go @@ -53,8 +53,8 @@ func (_m ContextOps_CopyContext) Return(_a0 error) *ContextOps_CopyContext { return &ContextOps_CopyContext{Call: _m.Call.Return(_a0)} } -func (_m *ContextOps) OnCopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName string, targetCtxName string) *ContextOps_CopyContext { - c_call := _m.On("CopyContext", srcConfigAccess, srcCtxName, targetCtxName) +func (_m *ContextOps) OnCopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName string, targetCtxName string, targetNamespace string) *ContextOps_CopyContext { + c_call := _m.On("CopyContext", srcConfigAccess, srcCtxName, targetCtxName, targetNamespace) return &ContextOps_CopyContext{Call: c_call} } @@ -63,13 +63,13 @@ func (_m *ContextOps) OnCopyContextMatch(matchers ...interface{}) *ContextOps_Co return &ContextOps_CopyContext{Call: c_call} } -// CopyContext provides a mock function with given fields: srcConfigAccess, srcCtxName, targetCtxName -func (_m *ContextOps) CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName string, targetCtxName string) error { - ret := _m.Called(srcConfigAccess, srcCtxName, targetCtxName) +// CopyContext provides a mock function with given fields: srcConfigAccess, srcCtxName, targetCtxName, targetNamespace +func (_m *ContextOps) CopyContext(srcConfigAccess clientcmd.ConfigAccess, srcCtxName string, targetCtxName string, targetNamespace string) error { + ret := _m.Called(srcConfigAccess, srcCtxName, targetCtxName, targetNamespace) var r0 error - if rf, ok := ret.Get(0).(func(clientcmd.ConfigAccess, string, string) error); ok { - r0 = rf(srcConfigAccess, srcCtxName, targetCtxName) + if rf, ok := ret.Get(0).(func(clientcmd.ConfigAccess, string, string, string) error); ok { + r0 = rf(srcConfigAccess, srcCtxName, targetCtxName, targetNamespace) } else { r0 = ret.Error(0) } diff --git a/flytectl/pkg/sandbox/start.go b/flytectl/pkg/sandbox/start.go index c8e6b7a56b..9c9c73fb06 100644 --- a/flytectl/pkg/sandbox/start.go +++ b/flytectl/pkg/sandbox/start.go @@ -9,6 +9,7 @@ import ( "path/filepath" "time" + "github.com/apoorvam/goterminal" "github.com/avast/retry-go" "github.com/docker/docker/api/types/mount" "github.com/docker/go-connections/nat" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubernetes/pkg/api/v1/pod" ) const ( @@ -62,13 +64,6 @@ func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, er return false, nil } -func isPodReady(v corev1api.Pod) bool { - if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) { - return true - } - return false -} - func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) { pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{}) if err != nil { @@ -78,11 +73,14 @@ func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*co } func WatchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error { - var data = os.Stdout - table := tablewriter.NewWriter(data) + writer := goterminal.New(os.Stdout) + defer writer.Reset() + + table := tablewriter.NewWriter(writer) table.SetHeader([]string{"Service", "Status", "Namespace"}) table.SetRowLine(true) + done := false for { isTaint, err := isNodeTainted(ctx, appsClient) if err != nil { @@ -100,15 +98,15 @@ func WatchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface table.SetAutoWrapText(false) table.SetAutoFormatHeaders(true) - // Clear os.Stdout - _, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J") - var total, ready int total = len(pods.Items) ready = 0 if total != 0 { for _, v := range pods.Items { - if isPodReady(v) { + // TODO (jeev): We should really be using + // `IsContainersReadyConditionTrue`, but that is not available until + // version v1.22.11. We are on v1.13.0 for some reason. + if pod.IsPodReadyConditionTrue(v.Status) { ready++ } if len(v.Status.Conditions) > 0 { @@ -117,14 +115,21 @@ func WatchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface } table.Render() if total == ready { - break + done = true } } else { table.Append([]string{"k8s: This might take a little bit", "Bootstrapping", ""}) table.Render() } - time.Sleep(40 * time.Second) + writer.Clear() + writer.Print() + + if done { + break + } + + time.Sleep(5 * time.Second) } return nil @@ -150,11 +155,11 @@ func UpdateLocalKubeContext(k8sCtxMgr k8s.ContextOps, dockerCtx string, contextN GlobalFile: kubeConfigPath, LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(), } - return k8sCtxMgr.CopyContext(srcConfigAccess, dockerCtx, contextName) + return k8sCtxMgr.CopyContext(srcConfigAccess, dockerCtx, contextName, flyteNamespace) } func startSandbox(ctx context.Context, cli docker.Docker, g github.GHRepoService, reader io.Reader, sandboxConfig *sandboxCmdConfig.Config, defaultImageName string, defaultImagePrefix string, exposedPorts map[nat.Port]struct{}, portBindings map[nat.Port][]nat.PortBinding, consolePort int) (*bufio.Scanner, error) { - fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) + fmt.Printf("%v Bootstrapping a brand new Flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) if sandboxConfig.DryRun { docker.PrintRemoveContainer(docker.FlyteSandboxClusterName) } else { @@ -291,10 +296,10 @@ func StartCluster(ctx context.Context, args []string, sandboxConfig *sandboxCmdC } // Live-ness check + fmt.Printf("%v Waiting for cluster to come up... %v\n", emoji.HourglassNotDone, emoji.HourglassNotDone) err = retry.Do( func() error { // Have to get a new client every time because you run into x509 errors if not - fmt.Println("Waiting for cluster to come up...") k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, K8sEndpoint) if err != nil { logger.Debugf(ctx, "Error getting K8s client in liveness check %s", err) diff --git a/flytectl/pkg/sandbox/start_test.go b/flytectl/pkg/sandbox/start_test.go index a99f5b91f5..1bfec25e26 100644 --- a/flytectl/pkg/sandbox/start_test.go +++ b/flytectl/pkg/sandbox/start_test.go @@ -3,6 +3,7 @@ package sandbox import ( "context" "fmt" + "io" "io/ioutil" "os" "strings" @@ -69,8 +70,17 @@ var fakeNode = &corev1.Node{ var fakePod = corev1.Pod{ Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{}, + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + }, }, } @@ -97,6 +107,10 @@ func sandboxSetup() { mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) } +func dummyReader() io.ReadCloser { + return io.NopCloser(strings.NewReader("")) +} + func TestStartFunc(t *testing.T) { defaultImagePrefix := "dind" exposedPorts, portBindings, _ := docker.GetSandboxPorts() @@ -117,7 +131,7 @@ func TestStartFunc(t *testing.T) { t.Run("Successfully run demo cluster", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, @@ -127,7 +141,7 @@ func TestStartFunc(t *testing.T) { }).Return(nil, nil) mockDocker.OnVolumeList(ctx, filters.NewArgs(filters.KeyValuePair{Key: mock.Anything, Value: mock.Anything})).Return(volume.VolumeListOKBody{Volumes: []*types.Volume{}}, nil) mockDocker.OnVolumeCreate(ctx, volume.VolumeCreateBody{Name: mock.Anything}).Return(types.Volume{}, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.Nil(t, err) }) t.Run("Successfully exit when demo cluster exist", func(t *testing.T) { @@ -140,7 +154,7 @@ func TestStartFunc(t *testing.T) { }, }, }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, @@ -158,14 +172,14 @@ func TestStartFunc(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, ShowStdout: true, Timestamps: true, Follow: true, }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.Nil(t, err) }) t.Run("Successfully run demo cluster with abs path of source code", func(t *testing.T) { @@ -174,21 +188,21 @@ func TestStartFunc(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, ShowStdout: true, Timestamps: true, Follow: true, }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.Nil(t, err) }) t.Run("Successfully run demo cluster with specific version", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, ShowStdout: true, @@ -202,7 +216,7 @@ func TestStartFunc(t *testing.T) { }, nil, nil) githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.Nil(t, err) }) t.Run("Failed run demo cluster with wrong version", func(t *testing.T) { @@ -210,14 +224,14 @@ func TestStartFunc(t *testing.T) { mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) sandboxCmdConfig.DefaultConfig.Image = "" githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("non-existent-tag")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) assert.Equal(t, "non-existent-tag", err.Error()) }) t.Run("Error in pulling image", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("failed to pull")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), fmt.Errorf("failed to pull")) sandboxCmdConfig.DefaultConfig.Image = "" tag := "v0.15.0" githubMock.OnGetReleaseByTagMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&github.RepositoryRelease{ @@ -225,7 +239,7 @@ func TestStartFunc(t *testing.T) { }, nil, nil) githubMock.OnGetCommitSHA1Match(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("dummySha", nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) assert.Equal(t, "failed to pull", err.Error()) }) @@ -239,7 +253,7 @@ func TestStartFunc(t *testing.T) { }, }, }, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("failed to remove container")) _, err := startSandbox(ctx, mockDocker, githubMock, strings.NewReader("y"), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) @@ -248,16 +262,16 @@ func TestStartFunc(t *testing.T) { t.Run("Error in start container", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("failed to run container")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) assert.Equal(t, "failed to run container", err.Error()) }) t.Run("Error in reading logs", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, @@ -265,14 +279,14 @@ func TestStartFunc(t *testing.T) { Timestamps: true, Follow: true, }).Return(nil, fmt.Errorf("failed to get container logs")) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), sandboxCmdConfig.DefaultConfig, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) assert.Equal(t, "failed to get container logs", err.Error()) }) t.Run("Error in list container", func(t *testing.T) { sandboxSetup() mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true, @@ -280,7 +294,7 @@ func TestStartFunc(t *testing.T) { Timestamps: true, Follow: true, }).Return(nil, nil) - _, err := startSandbox(ctx, mockDocker, githubMock, os.Stdin, config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) + _, err := startSandbox(ctx, mockDocker, githubMock, dummyReader(), config, sandboxImageName, defaultImagePrefix, exposedPorts, portBindings, util.SandBoxConsolePort) assert.NotNil(t, err) assert.Equal(t, "failed to list containers", err.Error()) }) @@ -300,7 +314,7 @@ func TestStartFunc(t *testing.T) { } sandboxSetup() mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) - mockDocker.OnImagePullMatch(mock.Anything, mock.Anything, mock.Anything).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(mock.Anything, mock.Anything, mock.Anything).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) stringReader := strings.NewReader(docker.SuccessMessage) reader := ioutil.NopCloser(stringReader) @@ -317,7 +331,7 @@ func TestStartFunc(t *testing.T) { k8s.ContextMgr = mockK8sContextMgr ghutil.Client = githubMock mockK8sContextMgr.OnCheckConfig().Return(nil) - mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err = StartSandboxCluster(context.Background(), []string{}, config) assert.Nil(t, err) }) @@ -327,7 +341,7 @@ func TestStartFunc(t *testing.T) { sandboxSetup() docker.Client = mockDocker mockDocker.OnContainerListMatch(mock.Anything, mock.Anything).Return([]types.Container{}, fmt.Errorf("failed to list containers")) - mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(dummyReader(), nil) mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ ShowStderr: true,