From 28c59bea593da3fb80f6783d0c5a7bf0cdb19618 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Apr 2022 06:06:58 +0000 Subject: [PATCH] Launch Flyte cluster in the single binary mode (#306) * Using sandbox-lite Signed-off-by: Kevin Su * Add demo command Signed-off-by: Kevin Su * wip Signed-off-by: Kevin Su * wip Signed-off-by: Kevin Su --- flytectl/cmd/demo/demo.go | 64 +++ flytectl/cmd/demo/demo_test.go | 39 ++ flytectl/cmd/demo/exec.go | 47 ++ flytectl/cmd/demo/exec_test.go | 76 +++ flytectl/cmd/demo/start.go | 306 ++++++++++++ flytectl/cmd/demo/start_test.go | 731 +++++++++++++++++++++++++++++ flytectl/cmd/demo/status.go | 42 ++ flytectl/cmd/demo/status_test.go | 39 ++ flytectl/cmd/demo/teardown.go | 62 +++ flytectl/cmd/demo/teardown_test.go | 64 +++ flytectl/cmd/root.go | 2 + flytectl/cmd/sandbox/start.go | 4 +- flytectl/pkg/docker/docker_util.go | 8 +- flytectl/pkg/util/util.go | 10 +- flytectl/pkg/util/util_test.go | 2 +- 15 files changed, 1485 insertions(+), 11 deletions(-) create mode 100644 flytectl/cmd/demo/demo.go create mode 100644 flytectl/cmd/demo/demo_test.go create mode 100644 flytectl/cmd/demo/exec.go create mode 100644 flytectl/cmd/demo/exec_test.go create mode 100644 flytectl/cmd/demo/start.go create mode 100644 flytectl/cmd/demo/start_test.go create mode 100644 flytectl/cmd/demo/status.go create mode 100644 flytectl/cmd/demo/status_test.go create mode 100644 flytectl/cmd/demo/teardown.go create mode 100644 flytectl/cmd/demo/teardown_test.go diff --git a/flytectl/cmd/demo/demo.go b/flytectl/cmd/demo/demo.go new file mode 100644 index 0000000000..83cb8afe11 --- /dev/null +++ b/flytectl/cmd/demo/demo.go @@ -0,0 +1,64 @@ +package demo + +import ( + sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + cmdcore "github.com/flyteorg/flytectl/cmd/core" + "github.com/spf13/cobra" +) + +// Long descriptions are whitespace sensitive when generating docs using sphinx. +const ( + demoShort = `Helps with demo interactions like start, teardown, status, and exec.` + demoLong = ` +Flyte Demo is a fully standalone minimal environment for running Flyte. +It provides a simplified way of running Flyte demo as a single Docker container locally. + +To create a demo cluster, run: +:: + + flytectl demo start + +To remove a demo cluster, run: +:: + + flytectl demo teardown + +To check the status of the demo container, run: +:: + + flytectl demo status + +To execute commands inside the demo container, use exec: +:: + + flytectl demo exec -- pwd +` +) + +// CreateDemoCommand will return demo command +func CreateDemoCommand() *cobra.Command { + demo := &cobra.Command{ + Use: "demo", + Short: demoShort, + Long: demoLong, + } + + demoResourcesFuncs := map[string]cmdcore.CommandEntry{ + "start": {CmdFunc: startDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true, + Short: startShort, + Long: startLong, PFlagProvider: sandboxConfig.DefaultConfig}, + "teardown": {CmdFunc: teardownDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true, + Short: teardownShort, + Long: teardownLong}, + "status": {CmdFunc: demoClusterStatus, Aliases: []string{}, ProjectDomainNotRequired: true, + Short: statusShort, + Long: statusLong}, + "exec": {CmdFunc: demoClusterExec, Aliases: []string{}, ProjectDomainNotRequired: true, + Short: execShort, + Long: execLong}, + } + + cmdcore.AddCommands(demo, demoResourcesFuncs) + + return demo +} diff --git a/flytectl/cmd/demo/demo_test.go b/flytectl/cmd/demo/demo_test.go new file mode 100644 index 0000000000..0ce332867a --- /dev/null +++ b/flytectl/cmd/demo/demo_test.go @@ -0,0 +1,39 @@ +package demo + +import ( + "fmt" + "sort" + "testing" + + "gotest.tools/assert" +) + +func TestCreateDemoCommand(t *testing.T) { + demoCommand := CreateDemoCommand() + assert.Equal(t, demoCommand.Use, "demo") + assert.Equal(t, demoCommand.Short, "Helps with demo interactions like start, teardown, status, and exec.") + fmt.Println(demoCommand.Commands()) + assert.Equal(t, len(demoCommand.Commands()), 4) + cmdNouns := demoCommand.Commands() + // Sort by Use value. + sort.Slice(cmdNouns, func(i, j int) bool { + return cmdNouns[i].Use < cmdNouns[j].Use + }) + + assert.Equal(t, cmdNouns[0].Use, "exec") + assert.Equal(t, cmdNouns[0].Short, execShort) + assert.Equal(t, cmdNouns[0].Long, execLong) + + assert.Equal(t, cmdNouns[1].Use, "start") + assert.Equal(t, cmdNouns[1].Short, startShort) + assert.Equal(t, cmdNouns[1].Long, startLong) + + assert.Equal(t, cmdNouns[2].Use, "status") + assert.Equal(t, cmdNouns[2].Short, statusShort) + assert.Equal(t, cmdNouns[2].Long, statusLong) + + assert.Equal(t, cmdNouns[3].Use, "teardown") + assert.Equal(t, cmdNouns[3].Short, teardownShort) + assert.Equal(t, cmdNouns[3].Long, teardownLong) + +} diff --git a/flytectl/cmd/demo/exec.go b/flytectl/cmd/demo/exec.go new file mode 100644 index 0000000000..b0d9510c72 --- /dev/null +++ b/flytectl/cmd/demo/exec.go @@ -0,0 +1,47 @@ +package demo + +import ( + "context" + "fmt" + + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/flyteorg/flytectl/pkg/docker" +) + +const ( + execShort = "Executes non-interactive command inside the demo container" + execLong = ` +Run non-interactive commands inside the demo container and immediately return the output. +By default, "flytectl exec" is present in the /root directory inside the demo container. + +:: + + flytectl demo exec -- ls -al + +Usage` +) + +func demoClusterExec(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + cli, err := docker.GetDockerClient() + if err != nil { + return err + } + if len(args) > 0 { + return execute(ctx, cli, args) + } + return fmt.Errorf("missing argument. Please check usage examples by running flytectl demo exec --help") +} + +func execute(ctx context.Context, cli docker.Docker, args []string) error { + c := docker.GetSandbox(ctx, cli) + if c != nil { + exec, err := docker.ExecCommend(ctx, cli, c.ID, args) + if err != nil { + return err + } + if err := docker.InspectExecResp(ctx, cli, exec.ID); err != nil { + return err + } + } + return nil +} diff --git a/flytectl/cmd/demo/exec_test.go b/flytectl/cmd/demo/exec_test.go new file mode 100644 index 0000000000..0375be916c --- /dev/null +++ b/flytectl/cmd/demo/exec_test.go @@ -0,0 +1,76 @@ +package demo + +import ( + "bufio" + "context" + "fmt" + "io" + "strings" + "testing" + + "github.com/flyteorg/flytectl/cmd/testutils" + + admin2 "github.com/flyteorg/flyteidl/clients/go/admin" + + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/stretchr/testify/assert" + + "github.com/docker/docker/api/types" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + "github.com/stretchr/testify/mock" +) + +func TestDemoClusterExec(t *testing.T) { + mockDocker := &mocks.Docker{} + mockOutStream := new(io.Writer) + ctx := context.Background() + mockClient := admin2.InitializeMockClientset() + cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream) + reader := bufio.NewReader(strings.NewReader("test")) + + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + docker.ExecConfig.Cmd = []string{"ls -al"} + mockDocker.OnContainerExecCreateMatch(ctx, mock.Anything, docker.ExecConfig).Return(types.IDResponse{}, nil) + mockDocker.OnContainerExecInspectMatch(ctx, mock.Anything).Return(types.ContainerExecInspect{}, nil) + mockDocker.OnContainerExecAttachMatch(ctx, mock.Anything, types.ExecStartCheck{}).Return(types.HijackedResponse{ + Reader: reader, + }, fmt.Errorf("Test")) + docker.Client = mockDocker + err := demoClusterExec(ctx, []string{"ls -al"}, cmdCtx) + + assert.NotNil(t, err) +} + +func TestSandboxClusterExecWithoutCmd(t *testing.T) { + mockDocker := &mocks.Docker{} + reader := bufio.NewReader(strings.NewReader("test")) + s := testutils.Setup() + ctx := s.Ctx + + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + docker.ExecConfig.Cmd = []string{} + mockDocker.OnContainerExecCreateMatch(ctx, mock.Anything, docker.ExecConfig).Return(types.IDResponse{}, nil) + mockDocker.OnContainerExecInspectMatch(ctx, mock.Anything).Return(types.ContainerExecInspect{}, nil) + mockDocker.OnContainerExecAttachMatch(ctx, mock.Anything, types.ExecStartCheck{}).Return(types.HijackedResponse{ + Reader: reader, + }, fmt.Errorf("Test")) + docker.Client = mockDocker + err := demoClusterExec(ctx, []string{}, s.CmdCtx) + + assert.NotNil(t, err) +} diff --git a/flytectl/cmd/demo/start.go b/flytectl/cmd/demo/start.go new file mode 100644 index 0000000000..459b5568f4 --- /dev/null +++ b/flytectl/cmd/demo/start.go @@ -0,0 +1,306 @@ +package demo + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/flyteorg/flytectl/clierrors" + "github.com/flyteorg/flytectl/pkg/githubutil" + + "github.com/avast/retry-go" + "github.com/olekukonko/tablewriter" + corev1api "k8s.io/api/core/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + + "github.com/docker/docker/api/types/mount" + "github.com/flyteorg/flytectl/pkg/configutil" + "github.com/flyteorg/flytectl/pkg/k8s" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/enescakir/emoji" + sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/util" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + startShort = "Starts the Flyte demo cluster." + startLong = ` +Flyte demo is a fully standalone minimal environment for running Flyte. +It provides a simplified way of running Flyte demo as a single Docker container locally. + +Starts the demo cluster without any source code: +:: + + flytectl demo start + +Mounts your source code repository inside the demo cluster: +:: + + flytectl demo start --source=$HOME/flyteorg/flytesnacks + +Specify a Flyte demo compliant image with the registry. This is useful in case you want to use an image from your registry. +:: + + flytectl demo start --image docker.io/my-override:latest + +Note: If image flag is passed then Flytectl will ignore version and pre flags. + +Specify a Flyte demo image pull policy. Possible pull policy values are Always, IfNotPresent, or Never: +:: + + flytectl demo start --image docker.io/my-override:latest --imagePullPolicy Always + +Start demo cluster passing environment variables. This can be used to pass docker specific env variables or flyte specific env variables. +eg : for passing timeout value in secs for the demo container use the following. +:: + + flytectl demo start --env FLYTE_TIMEOUT=700 + + +The DURATION can be a positive integer or a floating-point number, followed by an optional unit suffix:: +s - seconds (default) +m - minutes +h - hours +d - days +When no unit is used, it defaults to seconds. If the duration is set to zero, the associated timeout is disabled. + + +eg : for passing multiple environment variables +:: + + flytectl demo start --env USER=foo --env PASSWORD=bar + + +Usage +` + k8sEndpoint = "https://127.0.0.1:30086" + flyteNamespace = "flyte" + diskPressureTaint = "node.kubernetes.io/disk-pressure" + taintEffect = "NoSchedule" + demoContextName = "flyte-sandbox" + demoDockerContext = "default" + demoImageName = "cr.flyte.org/flyteorg/flyte-sandbox-lite" +) + +type ExecResult struct { + StdOut string + StdErr string + ExitCode int +} + +func startDemoCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + cli, err := docker.GetDockerClient() + if err != nil { + return err + } + + reader, err := startDemo(ctx, cli, os.Stdin) + if err != nil { + return err + } + if reader != nil { + docker.WaitForSandbox(reader, docker.SuccessMessage) + } + + if reader != nil { + var k8sClient k8s.K8s + err = retry.Do( + func() error { + k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint) + return err + }, + retry.Attempts(10), + ) + if err != nil { + return err + } + if err = updateLocalKubeContext(); err != nil { + return err + } + + if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil { + return err + } + util.PrintSandboxMessage(util.DemoConsolePort) + } + return nil +} + +func updateLocalKubeContext() error { + srcConfigAccess := &clientcmd.PathOptions{ + GlobalFile: docker.Kubeconfig, + LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(), + } + k8sCtxMgr := k8s.NewK8sContextManager() + return k8sCtxMgr.CopyContext(srcConfigAccess, demoDockerContext, demoContextName) +} + +func startDemo(ctx context.Context, cli docker.Docker, reader io.Reader) (*bufio.Scanner, error) { + fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench) + + if err := docker.RemoveSandbox(ctx, cli, reader); err != nil { + if err.Error() != clierrors.ErrSandboxExists { + return nil, err + } + fmt.Printf("Existing details of your demo cluster") + util.PrintSandboxMessage(util.DemoConsolePort) + return nil, nil + } + + if err := util.SetupFlyteDir(); err != nil { + return nil, err + } + + templateValues := configutil.ConfigTemplateSpec{ + Host: "localhost:30081", + Insecure: true, + } + if err := configutil.SetupConfig(configutil.FlytectlConfig, configutil.GetSandboxTemplate(), templateValues); err != nil { + return nil, err + } + + volumes := docker.Volumes + sandboxDefaultConfig := sandboxConfig.DefaultConfig + if vol, err := mountVolume(sandboxDefaultConfig.Source, docker.Source); err != nil { + return nil, err + } else if vol != nil { + volumes = append(volumes, *vol) + } + demoImage := sandboxConfig.DefaultConfig.Image + if len(demoImage) == 0 { + image, version, err := githubutil.GetFullyQualifiedImageName(sandboxConfig.DefaultConfig.Version, demoImageName, sandboxConfig.DefaultConfig.Prerelease) + if err != nil { + return nil, err + } + demoImage = image + fmt.Printf("%v Running Flyte %s release\n", emoji.Whale, version) + } + fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, demoImage) + if err := docker.PullDockerImage(ctx, cli, demoImage, sandboxConfig.DefaultConfig.ImagePullPolicy, sandboxConfig.DefaultConfig.ImagePullOptions); err != nil { + return nil, err + } + + fmt.Printf("%v booting flyte-demo container\n", emoji.FactoryWorker) + exposedPorts, portBindings, _ := docker.GetSandboxPorts() + ID, err := docker.StartContainer(ctx, cli, volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName, + demoImage, sandboxDefaultConfig.Env) + + if err != nil { + fmt.Printf("%v Something went wrong: Failed to start demo container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale) + return nil, err + } + + logReader, err := docker.ReadLogs(ctx, cli, ID) + if err != nil { + return nil, err + } + + return logReader, nil +} + +func mountVolume(file, destination string) (*mount.Mount, error) { + if len(file) > 0 { + source, err := filepath.Abs(file) + if err != nil { + return nil, err + } + return &mount.Mount{ + Type: mount.TypeBind, + Source: source, + Target: destination, + }, nil + } + return nil, nil +} + +func watchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error { + var data = os.Stdout + table := tablewriter.NewWriter(data) + table.SetHeader([]string{"Service", "Status", "Namespace"}) + table.SetRowLine(true) + + for { + isTaint, err := isNodeTainted(ctx, appsClient) + if err != nil { + return err + } + if isTaint { + return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes") + } + + pods, err := getFlyteDeployment(ctx, appsClient) + if err != nil { + return err + } + table.ClearRows() + table.SetAutoWrapText(false) + table.SetAutoFormatHeaders(true) + + // Clear os.Stdout + _, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J") + + var total, ready int + total = len(pods.Items) + ready = 0 + if total != 0 { + for _, v := range pods.Items { + if isPodReady(v) { + ready++ + } + if len(v.Status.Conditions) > 0 { + table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()}) + } + } + table.Render() + if total == ready { + break + } + } + + time.Sleep(40 * time.Second) + } + + return nil +} + +func isPodReady(v corev1api.Pod) bool { + if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) { + return true + } + return false +} + +func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) { + pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{}) + if err != nil { + return nil, err + } + return pods, nil +} + +func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) { + nodes, err := client.Nodes().List(ctx, v1.ListOptions{}) + if err != nil { + return false, err + } + match := 0 + for _, node := range nodes.Items { + for _, c := range node.Spec.Taints { + if c.Key == diskPressureTaint && c.Effect == taintEffect { + match++ + } + } + } + if match > 0 { + return true, nil + } + return false, nil +} diff --git a/flytectl/cmd/demo/start_test.go b/flytectl/cmd/demo/start_test.go new file mode 100644 index 0000000000..50fe22d78d --- /dev/null +++ b/flytectl/cmd/demo/start_test.go @@ -0,0 +1,731 @@ +package demo + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/admin" + + "github.com/flyteorg/flytectl/pkg/githubutil" + + "github.com/flyteorg/flytectl/pkg/k8s" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox" + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + f "github.com/flyteorg/flytectl/pkg/filesystemutils" + k8sMocks "github.com/flyteorg/flytectl/pkg/k8s/mocks" + "github.com/flyteorg/flytectl/pkg/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +var content = ` +apiVersion: v1 +clusters: +- cluster: + server: https://localhost:8080 + extensions: + - name: client.authentication.k8s.io/exec + extension: + audience: foo + other: bar + name: default +contexts: +- context: + cluster: default + user: default + namespace: bar + name: default +current-context: default +kind: Config +users: +- name: default + user: + exec: + apiVersion: client.authentication.k8s.io/v1alpha1 + args: + - arg-1 + - arg-2 + command: foo-command + provideClusterInfo: true +` + +var fakeNode = &corev1.Node{ + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{}, + }, +} + +var fakePod = corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{}, + }, +} + +func TestStartDemoFunc(t *testing.T) { + p1, p2, _ := docker.GetSandboxPorts() + assert.Nil(t, util.SetupFlyteDir()) + assert.Nil(t, os.MkdirAll(f.FilePathJoin(f.UserHomeDir(), ".flyte", "k3s"), os.ModePerm)) + assert.Nil(t, ioutil.WriteFile(docker.Kubeconfig, []byte(content), os.ModePerm)) + + fakePod.SetName("flyte") + + t.Run("Successfully run demo cluster", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + errCh := make(chan error) + sandboxConfig.DefaultConfig.Version = "v0.19.1" + bodyStatus := make(chan container.ContainerWaitOKBody) + image, _, err := githubutil.GetFullyQualifiedImageName(sandboxConfig.DefaultConfig.Version, demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully exit when demo cluster exist", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + errCh := make(chan error) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + reader, err := startDemo(ctx, mockDocker, strings.NewReader("n")) + assert.Nil(t, err) + assert.Nil(t, reader) + }) + t.Run("Successfully run demo cluster with source code", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = f.UserHomeDir() + sandboxConfig.DefaultConfig.Version = "" + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.Source, + Target: docker.Source, + }) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully run demo cluster with abs path of source code", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = "../" + sandboxConfig.DefaultConfig.Version = "" + absPath, err := filepath.Abs(sandboxConfig.DefaultConfig.Source) + assert.Nil(t, err) + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: absPath, + Target: docker.Source, + }) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully run demo cluster with specific version", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Version = "v0.18.0" + sandboxConfig.DefaultConfig.Source = "" + + image, _, err := githubutil.GetFullyQualifiedImageName(sandboxConfig.DefaultConfig.Version, demoImageName, false) + assert.Nil(t, err) + volumes := docker.Volumes + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Failed run demo cluster with wrong version", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Version = "v0.1444.0" + sandboxConfig.DefaultConfig.Source = "" + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + volumes := docker.Volumes + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in pulling image", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + sandboxConfig.DefaultConfig.Source = f.UserHomeDir() + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.Source, + Target: docker.Source, + }) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, fmt.Errorf("error")) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in removing existing cluster", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = f.UserHomeDir() + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.Source, + Target: docker.Source, + }) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("error")) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, strings.NewReader("y")) + assert.NotNil(t, err) + }) + t.Run("Error in start container", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = "" + sandboxConfig.DefaultConfig.Version = "" + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, fmt.Errorf("error")) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("error")) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in reading logs", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = f.UserHomeDir() + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.Source, + Target: docker.Source, + }) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, fmt.Errorf("error")) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.NotNil(t, err) + }) + t.Run("Error in list container", func(t *testing.T) { + ctx := context.Background() + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker := &mocks.Docker{} + sandboxConfig.DefaultConfig.Source = f.UserHomeDir() + sandboxConfig.DefaultConfig.Version = "" + volumes := docker.Volumes + volumes = append(volumes, mount.Mount{ + Type: mount.TypeBind, + Source: sandboxConfig.DefaultConfig.Source, + Target: docker.Source, + }) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, fmt.Errorf("error")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(nil, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + _, err = startDemo(ctx, mockDocker, os.Stdin) + assert.Nil(t, err) + }) + t.Run("Successfully run demo cluster command", func(t *testing.T) { + mockOutStream := new(io.Writer) + ctx := context.Background() + cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) + mockDocker := &mocks.Docker{} + errCh := make(chan error) + client := testclient.NewSimpleClientset() + k8s.Client = client + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + bodyStatus := make(chan container.ContainerWaitOKBody) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(nil) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + stringReader := strings.NewReader(docker.SuccessMessage) + reader := ioutil.NopCloser(stringReader) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(reader, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + mockK8sContextMgr := &k8sMocks.ContextOps{} + docker.Client = mockDocker + sandboxConfig.DefaultConfig.Source = "" + sandboxConfig.DefaultConfig.Version = "" + k8s.ContextMgr = mockK8sContextMgr + mockK8sContextMgr.OnCopyContextMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = startDemoCluster(ctx, []string{}, cmdCtx) + assert.Nil(t, err) + }) + t.Run("Error in running demo cluster command", func(t *testing.T) { + mockOutStream := new(io.Writer) + ctx := context.Background() + cmdCtx := cmdCore.NewCommandContext(admin.InitializeMockClientset(), *mockOutStream) + mockDocker := &mocks.Docker{} + errCh := make(chan error) + bodyStatus := make(chan container.ContainerWaitOKBody) + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + mockDocker.OnContainerCreate(ctx, &container.Config{ + Env: docker.Environment, + Image: image, + Tty: false, + ExposedPorts: p1, + }, &container.HostConfig{ + Mounts: docker.Volumes, + PortBindings: p2, + Privileged: true, + }, nil, nil, mock.Anything).Return(container.ContainerCreateCreatedBody{ + ID: "Hello", + }, nil) + mockDocker.OnContainerStart(ctx, "Hello", types.ContainerStartOptions{}).Return(fmt.Errorf("error")) + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, fmt.Errorf("error")) + mockDocker.OnImagePullMatch(ctx, mock.Anything, types.ImagePullOptions{}).Return(os.Stdin, nil) + stringReader := strings.NewReader(docker.SuccessMessage) + reader := ioutil.NopCloser(stringReader) + mockDocker.OnContainerLogsMatch(ctx, mock.Anything, types.ContainerLogsOptions{ + ShowStderr: true, + ShowStdout: true, + Timestamps: true, + Follow: true, + }).Return(reader, nil) + mockDocker.OnContainerWaitMatch(ctx, mock.Anything, container.WaitConditionNotRunning).Return(bodyStatus, errCh) + docker.Client = mockDocker + sandboxConfig.DefaultConfig.Source = "" + err = startDemoCluster(ctx, []string{}, cmdCtx) + assert.NotNil(t, err) + }) +} + +func TestMonitorFlyteDeployment(t *testing.T) { + t.Run("Monitor k8s deployment fail because of storage", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + k8s.Client = client + fakePod.SetName("flyte") + fakePod.SetName("flyte") + + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + fakeNode.Spec.Taints = append(fakeNode.Spec.Taints, corev1.Taint{ + Effect: "NoSchedule", + Key: "node.kubernetes.io/disk-pressure", + }) + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + + err = watchFlyteDeployment(ctx, client.CoreV1()) + assert.NotNil(t, err) + + }) + + t.Run("Monitor k8s deployment success", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + k8s.Client = client + fakePod.SetName("flyte") + fakePod.SetName("flyte") + + _, err := client.CoreV1().Pods("flyte").Create(ctx, &fakePod, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + fakeNode.SetName("master") + fakeNode.Spec.Taints = []corev1.Taint{} + _, err = client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + + err = watchFlyteDeployment(ctx, client.CoreV1()) + assert.Nil(t, err) + + }) + +} + +func TestGetFlyteDeploymentCount(t *testing.T) { + + ctx := context.Background() + client := testclient.NewSimpleClientset() + c, err := getFlyteDeployment(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, 0, len(c.Items)) +} + +func TestGetNodeTaintStatus(t *testing.T) { + t.Run("Check node taint with success", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + fakeNode.SetName("master") + _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + c, err := isNodeTainted(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, false, c) + }) + t.Run("Check node taint with fail", func(t *testing.T) { + ctx := context.Background() + client := testclient.NewSimpleClientset() + fakeNode.SetName("master") + _, err := client.CoreV1().Nodes().Create(ctx, fakeNode, v1.CreateOptions{}) + if err != nil { + t.Error(err) + } + node, err := client.CoreV1().Nodes().Get(ctx, "master", v1.GetOptions{}) + if err != nil { + t.Error(err) + } + node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ + Effect: taintEffect, + Key: diskPressureTaint, + }) + _, err = client.CoreV1().Nodes().Update(ctx, node, v1.UpdateOptions{}) + if err != nil { + t.Error(err) + } + c, err := isNodeTainted(ctx, client.CoreV1()) + assert.Nil(t, err) + assert.Equal(t, true, c) + }) +} + +func TestGetDemoImage(t *testing.T) { + t.Run("Get Latest demo cluster", func(t *testing.T) { + image, _, err := githubutil.GetFullyQualifiedImageName("", demoImageName, false) + assert.Nil(t, err) + assert.Equal(t, true, strings.HasPrefix(image, "cr.flyte.org/flyteorg/flyte-sandbox-lite:dind-")) + }) + + t.Run("Get demo image with version ", func(t *testing.T) { + image, _, err := githubutil.GetFullyQualifiedImageName("v0.14.0", demoImageName, false) + assert.Nil(t, err) + assert.Equal(t, true, strings.HasPrefix(image, demoImageName)) + }) + t.Run("Get demo image with wrong version ", func(t *testing.T) { + _, _, err := githubutil.GetFullyQualifiedImageName("v100.1.0", demoImageName, false) + assert.NotNil(t, err) + }) + t.Run("Get demo image with wrong version ", func(t *testing.T) { + _, _, err := githubutil.GetFullyQualifiedImageName("aaaaaa", demoImageName, false) + assert.NotNil(t, err) + }) + t.Run("Get demo image with version that is not supported", func(t *testing.T) { + _, _, err := githubutil.GetFullyQualifiedImageName("v0.10.0", demoImageName, false) + assert.NotNil(t, err) + }) + +} diff --git a/flytectl/cmd/demo/status.go b/flytectl/cmd/demo/status.go new file mode 100644 index 0000000000..67a7767afb --- /dev/null +++ b/flytectl/cmd/demo/status.go @@ -0,0 +1,42 @@ +package demo + +import ( + "context" + "fmt" + + "github.com/enescakir/emoji" + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/flyteorg/flytectl/pkg/docker" +) + +const ( + statusShort = "Gets the status of the demo environment." + statusLong = ` +Retrieves the status of the demo environment. Currently, Flyte demo runs as a local Docker container. + +Usage +:: + + flytectl demo status + +` +) + +func demoClusterStatus(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + cli, err := docker.GetDockerClient() + if err != nil { + return err + } + + return printStatus(ctx, cli) +} + +func printStatus(ctx context.Context, cli docker.Docker) error { + c := docker.GetSandbox(ctx, cli) + if c == nil { + fmt.Printf("%v no demo cluster found \n", emoji.StopSign) + return nil + } + fmt.Printf("Flyte demo cluster container image [%s] with status [%s] is in state [%s]", c.Image, c.Status, c.State) + return nil +} diff --git a/flytectl/cmd/demo/status_test.go b/flytectl/cmd/demo/status_test.go new file mode 100644 index 0000000000..7fae8bc43a --- /dev/null +++ b/flytectl/cmd/demo/status_test.go @@ -0,0 +1,39 @@ +package demo + +import ( + "testing" + + "github.com/flyteorg/flytectl/cmd/testutils" + + "github.com/docker/docker/api/types" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + "github.com/stretchr/testify/assert" +) + +func TestDemoStatus(t *testing.T) { + t.Run("Demo status with zero result", func(t *testing.T) { + mockDocker := &mocks.Docker{} + s := testutils.Setup() + mockDocker.OnContainerList(s.Ctx, types.ContainerListOptions{All: true}).Return([]types.Container{}, nil) + docker.Client = mockDocker + err := demoClusterStatus(s.Ctx, []string{}, s.CmdCtx) + assert.Nil(t, err) + }) + t.Run("Demo status with running", func(t *testing.T) { + s := testutils.Setup() + ctx := s.Ctx + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return([]types.Container{ + { + ID: docker.FlyteSandboxClusterName, + Names: []string{ + docker.FlyteSandboxClusterName, + }, + }, + }, nil) + docker.Client = mockDocker + err := demoClusterStatus(ctx, []string{}, s.CmdCtx) + assert.Nil(t, err) + }) +} diff --git a/flytectl/cmd/demo/teardown.go b/flytectl/cmd/demo/teardown.go new file mode 100644 index 0000000000..1308b10b14 --- /dev/null +++ b/flytectl/cmd/demo/teardown.go @@ -0,0 +1,62 @@ +package demo + +import ( + "context" + "fmt" + + "github.com/flyteorg/flytectl/pkg/configutil" + + "github.com/flyteorg/flytectl/pkg/docker" + + "github.com/docker/docker/api/types" + "github.com/enescakir/emoji" + + cmdCore "github.com/flyteorg/flytectl/cmd/core" + "github.com/flyteorg/flytectl/pkg/k8s" +) + +const ( + teardownShort = "Cleans up the demo environment" + teardownLong = ` +Removes the demo cluster and all the Flyte config created by 'demo start': +:: + + flytectl demo teardown + + +Usage +` +) + +func teardownDemoCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { + cli, err := docker.GetDockerClient() + if err != nil { + return err + } + + return tearDownDemo(ctx, cli) +} + +func tearDownDemo(ctx context.Context, cli docker.Docker) error { + c := docker.GetSandbox(ctx, cli) + if c != nil { + if err := cli.ContainerRemove(context.Background(), c.ID, types.ContainerRemoveOptions{ + Force: true, + }); err != nil { + return err + } + } + if err := configutil.ConfigCleanup(); err != nil { + fmt.Printf("Config cleanup failed. Which Failed due to %v \n ", err) + } + if err := removeDemoKubeContext(); err != nil { + fmt.Printf("Kubecontext cleanup failed. Which Failed due to %v \n ", err) + } + fmt.Printf("%v %v Demo cluster is removed successfully. \n", emoji.Broom, emoji.Broom) + return nil +} + +func removeDemoKubeContext() error { + k8sCtxMgr := k8s.NewK8sContextManager() + return k8sCtxMgr.RemoveContext(demoContextName) +} diff --git a/flytectl/cmd/demo/teardown_test.go b/flytectl/cmd/demo/teardown_test.go new file mode 100644 index 0000000000..7741272a2b --- /dev/null +++ b/flytectl/cmd/demo/teardown_test.go @@ -0,0 +1,64 @@ +package demo + +import ( + "context" + "fmt" + "testing" + + "github.com/docker/docker/api/types" + "github.com/flyteorg/flytectl/cmd/testutils" + "github.com/flyteorg/flytectl/pkg/configutil" + "github.com/flyteorg/flytectl/pkg/docker" + "github.com/flyteorg/flytectl/pkg/docker/mocks" + "github.com/flyteorg/flytectl/pkg/k8s" + k8sMocks "github.com/flyteorg/flytectl/pkg/k8s/mocks" + "github.com/flyteorg/flytectl/pkg/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var containers []types.Container + +func TestTearDownFunc(t *testing.T) { + container1 := types.Container{ + ID: "FlyteSandboxClusterName", + Names: []string{ + docker.FlyteSandboxClusterName, + }, + } + containers = append(containers, container1) + + t.Run("Success", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + mockK8sContextMgr := &k8sMocks.ContextOps{} + k8s.ContextMgr = mockK8sContextMgr + mockK8sContextMgr.OnRemoveContextMatch(mock.Anything).Return(nil) + err := tearDownDemo(ctx, mockDocker) + assert.Nil(t, err) + }) + t.Run("Error", func(t *testing.T) { + ctx := context.Background() + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(fmt.Errorf("err")) + err := tearDownDemo(ctx, mockDocker) + assert.NotNil(t, err) + }) + +} + +func TestTearDownClusterFunc(t *testing.T) { + _ = util.SetupFlyteDir() + _ = util.WriteIntoFile([]byte("data"), configutil.FlytectlConfig) + s := testutils.Setup() + ctx := s.Ctx + mockDocker := &mocks.Docker{} + mockDocker.OnContainerList(ctx, types.ContainerListOptions{All: true}).Return(containers, nil) + mockDocker.OnContainerRemove(ctx, mock.Anything, types.ContainerRemoveOptions{Force: true}).Return(nil) + docker.Client = mockDocker + err := teardownDemoCluster(ctx, []string{}, s.CmdCtx) + assert.Nil(t, err) +} diff --git a/flytectl/cmd/root.go b/flytectl/cmd/root.go index ca08c94966..7ebdd46ab2 100644 --- a/flytectl/cmd/root.go +++ b/flytectl/cmd/root.go @@ -10,6 +10,7 @@ import ( cmdCore "github.com/flyteorg/flytectl/cmd/core" "github.com/flyteorg/flytectl/cmd/create" "github.com/flyteorg/flytectl/cmd/delete" + "github.com/flyteorg/flytectl/cmd/demo" "github.com/flyteorg/flytectl/cmd/get" "github.com/flyteorg/flytectl/cmd/register" "github.com/flyteorg/flytectl/cmd/sandbox" @@ -61,6 +62,7 @@ func newRootCmd() *cobra.Command { rootCmd.AddCommand(register.RemoteRegisterCommand()) rootCmd.AddCommand(delete.RemoteDeleteCommand()) rootCmd.AddCommand(sandbox.CreateSandboxCommand()) + rootCmd.AddCommand(demo.CreateDemoCommand()) rootCmd.AddCommand(configuration.CreateConfigCommand()) rootCmd.AddCommand(completionCmd) // Added version command diff --git a/flytectl/cmd/sandbox/start.go b/flytectl/cmd/sandbox/start.go index f45c791f08..ceda4c04cd 100644 --- a/flytectl/cmd/sandbox/start.go +++ b/flytectl/cmd/sandbox/start.go @@ -144,7 +144,7 @@ func startSandboxCluster(ctx context.Context, args []string, cmdCtx cmdCore.Comm if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil { return err } - util.PrintSandboxMessage() + util.PrintSandboxMessage(util.SandBoxConsolePort) } return nil } @@ -166,7 +166,7 @@ func startSandbox(ctx context.Context, cli docker.Docker, reader io.Reader) (*bu return nil, err } fmt.Printf("Existing details of your sandbox") - util.PrintSandboxMessage() + util.PrintSandboxMessage(util.SandBoxConsolePort) return nil, nil } diff --git a/flytectl/pkg/docker/docker_util.go b/flytectl/pkg/docker/docker_util.go index 3b6be1c91c..a3825841fc 100644 --- a/flytectl/pkg/docker/docker_util.go +++ b/flytectl/pkg/docker/docker_util.go @@ -94,11 +94,11 @@ func RemoveSandbox(ctx context.Context, cli Docker, reader io.Reader) error { // GetSandboxPorts will return sandbox ports func GetSandboxPorts() (map[nat.Port]struct{}, map[nat.Port][]nat.PortBinding, error) { return nat.ParsePortSpecs([]string{ - "0.0.0.0:30081:30081", // Flyteconsole Port - "0.0.0.0:30082:30082", // Flyteadmin Port + "0.0.0.0:30080:30080", // Flyteconsole Port + "0.0.0.0:30081:30081", // Flyteadmin Port + "0.0.0.0:30082:30082", // K8s Dashboard Port "0.0.0.0:30084:30084", // Minio API Port - "0.0.0.0:30086:30086", // K8s Dashboard Port - "0.0.0.0:30087:30087", // Old Minio Console Port, keeping around for old images + "0.0.0.0:30086:30086", // K8s cluster "0.0.0.0:30088:30088", // Minio Console Port }) } diff --git a/flytectl/pkg/util/util.go b/flytectl/pkg/util/util.go index c5f5470dc0..a34aeba5c4 100644 --- a/flytectl/pkg/util/util.go +++ b/flytectl/pkg/util/util.go @@ -18,7 +18,9 @@ import ( ) const ( - progressSuccessMessage = "Flyte is ready! Flyte UI is available at http://localhost:30081/console" + ProgressSuccessMessage = "Flyte is ready! Flyte UI is available at" + SandBoxConsolePort = 30081 + DemoConsolePort = 30080 ) var Ext string @@ -51,14 +53,14 @@ func SetupFlyteDir() error { } // PrintSandboxMessage will print sandbox success message -func PrintSandboxMessage() { +func PrintSandboxMessage(flyteConsolePort int) { kubeconfig := strings.Join([]string{ "$KUBECONFIG", f.FilePathJoin(f.UserHomeDir(), ".kube", "config"), docker.Kubeconfig, }, ":") - - fmt.Printf("%v %v %v %v %v \n", emoji.ManTechnologist, progressSuccessMessage, emoji.Rocket, emoji.Rocket, emoji.PartyPopper) + successMsg := fmt.Sprintf("%v http://localhost:%v/console", ProgressSuccessMessage, flyteConsolePort) + fmt.Printf("%v %v %v %v %v \n", emoji.ManTechnologist, successMsg, emoji.Rocket, emoji.Rocket, emoji.PartyPopper) fmt.Printf("Add KUBECONFIG and FLYTECTL_CONFIG to your environment variable \n") fmt.Printf("export KUBECONFIG=%v \n", kubeconfig) fmt.Printf("export FLYTECTL_CONFIG=%v \n", configutil.FlytectlConfig) diff --git a/flytectl/pkg/util/util_test.go b/flytectl/pkg/util/util_test.go index 185b20db82..718a244d9c 100644 --- a/flytectl/pkg/util/util_test.go +++ b/flytectl/pkg/util/util_test.go @@ -25,7 +25,7 @@ func TestSetupFlyteDir(t *testing.T) { func TestPrintSandboxMessage(t *testing.T) { t.Run("Print Sandbox Message", func(t *testing.T) { - PrintSandboxMessage() + PrintSandboxMessage(SandBoxConsolePort) }) }