Skip to content

Commit

Permalink
Start sandbox logic shared for demo/sandbox cmds #minor (flyteorg#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
dav009 authored and robert-ulbrich-mercedes-benz committed Jul 2, 2024
1 parent c03fbd7 commit c38d54d
Show file tree
Hide file tree
Showing 13 changed files with 806 additions and 1,416 deletions.
42 changes: 7 additions & 35 deletions flytectl/cmd/config/subcommand/sandbox/sandbox_config.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,6 @@
package sandbox

//go:generate enumer -type=ImagePullPolicy -trimprefix=ImagePullPolicy --json
type ImagePullPolicy int

const (
ImagePullPolicyAlways ImagePullPolicy = iota
ImagePullPolicyIfNotPresent
ImagePullPolicyNever
)

// Set implements PFlag's Value interface to attempt to set the value of the flag from string.
func (i *ImagePullPolicy) Set(val string) error {
policy, err := ImagePullPolicyString(val)
if err != nil {
return err
}

*i = policy
return nil
}

// Type implements PFlag's Value interface to return type name.
func (i ImagePullPolicy) Type() string {
return "ImagePullPolicy"
}

//go:generate pflags Config --default-var DefaultConfig --bind-default-var
var (
DefaultConfig = &Config{}
)
import "github.com/flyteorg/flytectl/pkg/docker"

//Config holds configuration flags for sandbox command.
type Config struct {
Expand All @@ -52,12 +24,12 @@ type Config struct {

// Optionally it is possible to use local sandbox image
// Flytectl will not pull the image from the registry if the local flag passes. It is usually useful while testing your local images without pushing them to a registry.
ImagePullPolicy ImagePullPolicy `json:"imagePullPolicy" pflag:",Optional. Defines the image pull behavior [Always/IfNotPresent/Never]"`
ImagePullPolicy docker.ImagePullPolicy `json:"imagePullPolicy" pflag:",Optional. Defines the image pull behavior [Always/IfNotPresent/Never]"`

ImagePullOptions ImagePullOptions `json:"imagePullOptions" pflag:",Optional. Defines image pull options (e.g. auth)"`
ImagePullOptions docker.ImagePullOptions `json:"imagePullOptions" pflag:",Optional. Defines image pull options (e.g. auth)"`
}

type ImagePullOptions struct {
RegistryAuth string `json:"registryAuth" pflag:",The base64 encoded credentials for the registry."`
Platform string `json:"platform" pflag:",Forces a specific platform's image to be pulled.'"`
}
//go:generate pflags Config --default-var DefaultConfig --bind-default-var
var (
DefaultConfig = &Config{}
)
4 changes: 2 additions & 2 deletions flytectl/cmd/demo/demo.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package demo

import (
sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox"
sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox"
cmdcore "github.com/flyteorg/flytectl/cmd/core"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -46,7 +46,7 @@ func CreateDemoCommand() *cobra.Command {
demoResourcesFuncs := map[string]cmdcore.CommandEntry{
"start": {CmdFunc: startDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: startShort,
Long: startLong, PFlagProvider: sandboxConfig.DefaultConfig},
Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig},
"teardown": {CmdFunc: teardownDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: teardownShort,
Long: teardownLong},
Expand Down
272 changes: 5 additions & 267 deletions flytectl/cmd/demo/start.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,12 @@
package demo

import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"github.com/flyteorg/flytectl/clierrors"
"github.com/flyteorg/flytectl/pkg/github"
"github.com/flyteorg/flytectl/pkg/sandbox"

"github.com/avast/retry-go"
"github.com/olekukonko/tablewriter"
corev1api "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/docker/docker/api/types/mount"
"github.com/flyteorg/flytectl/pkg/configutil"
"github.com/flyteorg/flytectl/pkg/k8s"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/enescakir/emoji"
sandboxConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox"
sandboxCmdConfig "github.com/flyteorg/flytectl/cmd/config/subcommand/sandbox"
cmdCore "github.com/flyteorg/flytectl/cmd/core"
"github.com/flyteorg/flytectl/pkg/docker"
"github.com/flyteorg/flytectl/pkg/util"
"k8s.io/client-go/tools/clientcmd"
)

const (
Expand Down Expand Up @@ -93,252 +72,11 @@ eg : for passing multiple environment variables
Usage
`
k8sEndpoint = "https://127.0.0.1:30086"
flyteNamespace = "flyte"
diskPressureTaint = "node.kubernetes.io/disk-pressure"
taintEffect = "NoSchedule"
demoContextName = "flyte-sandbox"
demoDockerContext = "default"
demoImageName = "cr.flyte.org/flyteorg/flyte-sandbox-lite"
demoContextName = "flyte-sandbox"
)

type ExecResult struct {
StdOut string
StdErr string
ExitCode int
}

func primeFlytekitPod(ctx context.Context, podService corev1.PodInterface) {
_, err := podService.Create(ctx, &corev1api.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "py39-cacher",
},
Spec: corev1api.PodSpec{
RestartPolicy: corev1api.RestartPolicyNever,
Containers: []corev1api.Container{
{
Name: "flytekit",
Image: "ghcr.io/flyteorg/flytekit:py3.9-latest",
Command: []string{"echo"},
Args: []string{"Flyte"},
},
},
},
}, v1.CreateOptions{})
if err != nil {
fmt.Printf("Failed to create primer pod - %s", err)
}
}

func startDemoCluster(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
cli, err := docker.GetDockerClient()
if err != nil {
return err
}

ghRepo := github.GetGHRepoService()

reader, err := startDemo(ctx, cli, ghRepo, os.Stdin)
if err != nil {
return err
}
if reader != nil {
docker.WaitForSandbox(reader, docker.SuccessMessage)
}

if reader != nil {
var k8sClient k8s.K8s
err = retry.Do(
func() error {
k8sClient, err = k8s.GetK8sClient(docker.Kubeconfig, k8sEndpoint)
return err
},
retry.Attempts(10),
)
if err != nil {
return err
}
if err = updateLocalKubeContext(); err != nil {
return err
}

if err := watchFlyteDeployment(ctx, k8sClient.CoreV1()); err != nil {
return err
}
primeFlytekitPod(ctx, k8sClient.CoreV1().Pods("default"))
util.PrintDemoMessage(util.DemoConsolePort)
}
return nil
}

func updateLocalKubeContext() error {
srcConfigAccess := &clientcmd.PathOptions{
GlobalFile: docker.Kubeconfig,
LoadingRules: clientcmd.NewDefaultClientConfigLoadingRules(),
}
k8sCtxMgr := k8s.NewK8sContextManager()
return k8sCtxMgr.CopyContext(srcConfigAccess, demoDockerContext, demoContextName)
}

func startDemo(ctx context.Context, cli docker.Docker, g github.GHRepoService, reader io.Reader) (*bufio.Scanner, error) {
fmt.Printf("%v Bootstrapping a brand new flyte cluster... %v %v\n", emoji.FactoryWorker, emoji.Hammer, emoji.Wrench)

if err := docker.RemoveSandbox(ctx, cli, reader); err != nil {
if err.Error() != clierrors.ErrSandboxExists {
return nil, err
}
fmt.Printf("Existing details of your demo cluster")
util.PrintDemoMessage(util.DemoConsolePort)
return nil, nil
}

if err := util.SetupFlyteDir(); err != nil {
return nil, err
}

templateValues := configutil.ConfigTemplateSpec{
Host: "localhost:30081",
Insecure: true,
}
if err := configutil.SetupConfig(configutil.ConfigFile, configutil.GetTemplate(), templateValues); err != nil {
return nil, err
}

volumes := docker.Volumes
sandboxDefaultConfig := sandboxConfig.DefaultConfig
if vol, err := mountVolume(sandboxDefaultConfig.Source, docker.Source); err != nil {
return nil, err
} else if vol != nil {
volumes = append(volumes, *vol)
}
demoImage := sandboxConfig.DefaultConfig.Image
if len(demoImage) == 0 {
image, version, err := github.GetFullyQualifiedImageName("sha", sandboxConfig.DefaultConfig.Version, demoImageName, sandboxConfig.DefaultConfig.Prerelease, g)
if err != nil {
return nil, err
}
demoImage = image
fmt.Printf("%v Running Flyte %s release\n", emoji.Whale, version)
}
fmt.Printf("%v pulling docker image for release %s\n", emoji.Whale, demoImage)
if err := docker.PullDockerImage(ctx, cli, demoImage, sandboxConfig.DefaultConfig.ImagePullPolicy, sandboxConfig.DefaultConfig.ImagePullOptions); err != nil {
return nil, err
}

fmt.Printf("%v booting flyte-demo container\n", emoji.FactoryWorker)
exposedPorts, portBindings, _ := docker.GetDemoPorts()
ID, err := docker.StartContainer(ctx, cli, volumes, exposedPorts, portBindings, docker.FlyteSandboxClusterName,
demoImage, sandboxDefaultConfig.Env)

if err != nil {
fmt.Printf("%v Something went wrong: Failed to start demo container %v, Please check your docker client and try again. \n", emoji.GrimacingFace, emoji.Whale)
return nil, err
}

logReader, err := docker.ReadLogs(ctx, cli, ID)
if err != nil {
return nil, err
}

return logReader, nil
}

func mountVolume(file, destination string) (*mount.Mount, error) {
if len(file) > 0 {
source, err := filepath.Abs(file)
if err != nil {
return nil, err
}
return &mount.Mount{
Type: mount.TypeBind,
Source: source,
Target: destination,
}, nil
}
return nil, nil
}

func watchFlyteDeployment(ctx context.Context, appsClient corev1.CoreV1Interface) error {
var data = os.Stdout
table := tablewriter.NewWriter(data)
table.SetHeader([]string{"Service", "Status", "Namespace"})
table.SetRowLine(true)

for {
isTaint, err := isNodeTainted(ctx, appsClient)
if err != nil {
return err
}
if isTaint {
return fmt.Errorf("docker sandbox doesn't have sufficient memory available. Please run docker system prune -a --volumes")
}

pods, err := getFlyteDeployment(ctx, appsClient)
if err != nil {
return err
}
table.ClearRows()
table.SetAutoWrapText(false)
table.SetAutoFormatHeaders(true)

// Clear os.Stdout
_, _ = data.WriteString("\x1b[3;J\x1b[H\x1b[2J")

var total, ready int
total = len(pods.Items)
ready = 0
if total > 0 {
for _, v := range pods.Items {
if isPodReady(v) {
ready++
}
if len(v.Status.Conditions) > 0 {
table.Append([]string{v.GetName(), string(v.Status.Phase), v.GetNamespace()})
}
}
table.Render()
if total == ready {
return nil
}
} else {
table.Append([]string{"k8s: This might take a little bit", "Bootstrapping", ""})
table.Render()
}

time.Sleep(10 * time.Second)
}
}

func isPodReady(v corev1api.Pod) bool {
if (v.Status.Phase == corev1api.PodRunning) || (v.Status.Phase == corev1api.PodSucceeded) {
return true
}
return false
}

func getFlyteDeployment(ctx context.Context, client corev1.CoreV1Interface) (*corev1api.PodList, error) {
pods, err := client.Pods(flyteNamespace).List(ctx, v1.ListOptions{})
if err != nil {
return nil, err
}
return pods, nil
}
sandboxDefaultConfig := sandboxCmdConfig.DefaultConfig
return sandbox.StartDemoCluster(ctx, args, sandboxDefaultConfig)

func isNodeTainted(ctx context.Context, client corev1.CoreV1Interface) (bool, error) {
nodes, err := client.Nodes().List(ctx, v1.ListOptions{})
if err != nil {
return false, err
}
match := 0
for _, node := range nodes.Items {
for _, c := range node.Spec.Taints {
if c.Key == diskPressureTaint && c.Effect == taintEffect {
match++
}
}
}
if match > 0 {
return true, nil
}
return false, nil
}
Loading

0 comments on commit c38d54d

Please sign in to comment.