From 661b6656029d6a83d797f7e9f7eb12b7f9b146ff Mon Sep 17 00:00:00 2001 From: Simon Baier Date: Tue, 26 Oct 2021 00:31:29 +0200 Subject: [PATCH 01/17] feat: initial fast, tool-node-less loading facilities for container images in the runtime. Add necessary interface methods + implementations for docker runtime for streaming images directly from runtime to containerd in k3d nodes --- pkg/client/tools.go | 56 ++++++++++++++++--------------------- pkg/runtimes/docker/node.go | 56 +++++++++++++++++++++++++++++++++++-- pkg/runtimes/runtime.go | 2 ++ 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 8e1dd5032..eb48c7f3a 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -25,11 +25,10 @@ package client import ( "context" "fmt" + "io" "os" - "path" "strings" "sync" - "time" l "github.com/rancher/k3d/v5/pkg/logger" "github.com/rancher/k3d/v5/pkg/runtimes" @@ -49,12 +48,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, return fmt.Errorf("No valid images specified") } - // create tools node to export images - toolsNode, err := EnsureToolsNode(ctx, runtime, cluster) - if err != nil { - return fmt.Errorf("failed to ensure that tools node is running: %w", err) - } - /* TODO: * Loop over list of images and check, whether they are files (tar archives) and sort them respectively * Special case: '-' means "read from stdin" @@ -66,16 +59,19 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, var importTarNames []string if len(imagesFromRuntime) > 0 { - // save image to tarfile in shared volume - l.Log().Infof("Saving %d image(s) from runtime...", len(imagesFromRuntime)) - tarName := fmt.Sprintf("%s/k3d-%s-images-%s.tar", k3d.DefaultImageVolumeMountPath, cluster.Name, time.Now().Format("20060102150405")) - if err := runtime.ExecInNode(ctx, toolsNode, append([]string{"./k3d-tools", "save-image", "-d", tarName}, imagesFromRuntime...)); err != nil { - return fmt.Errorf("failed to save image(s) in tools container for cluster '%s': %w", cluster.Name, err) + l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) + // open a stream to all given images + stream, err := runtime.GetImageStream(ctx, imagesFromRuntime) + loadImageFromStream(ctx, runtime, stream, cluster) + if err != nil { + return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err) } - importTarNames = append(importTarNames, tarName) + // load the images directly into the nodes + } - if len(imagesFromTar) > 0 { + // TODO: stdin for tar images as well + /*if len(imagesFromTar) > 0 { // copy tarfiles to shared volume l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar)) for _, file := range imagesFromTar { @@ -86,7 +82,7 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } importTarNames = append(importTarNames, tarName) } - } + }*/ // import image in each node l.Log().Infoln("Importing images into nodes...") @@ -108,28 +104,24 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } importWaitgroup.Wait() - // remove tarball - if !opts.KeepTar && len(importTarNames) > 0 { - l.Log().Infoln("Removing the tarball(s) from image volume...") - if err := runtime.ExecInNode(ctx, toolsNode, []string{"rm", "-f", strings.Join(importTarNames, " ")}); err != nil { - l.Log().Errorf("failed to delete one or more tarballs from '%+v': %v", importTarNames, err) - } - } - - // delete tools container - if !opts.KeepToolsNode { - l.Log().Infoln("Removing k3d-tools node...") - if err := runtime.DeleteNode(ctx, toolsNode); err != nil { - l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err) - } - } - l.Log().Infoln("Successfully imported image(s)") return nil } +func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) { + for _, node := range cluster.Nodes { + // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) + if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { + l.Log().Infof("Importing images into node '%s'...", node.Name) + if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil { + l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) + } + } + } +} + type runtimeImageGetter interface { GetImages(context.Context) ([]string, error) } diff --git a/pkg/runtimes/docker/node.go b/pkg/runtimes/docker/node.go index f5d5539fc..c8a5ad9d7 100644 --- a/pkg/runtimes/docker/node.go +++ b/pkg/runtimes/docker/node.go @@ -309,7 +309,7 @@ func (d Docker) GetNodeLogs(ctx context.Context, node *k3d.Node, since time.Time // ExecInNodeGetLogs executes a command inside a node and returns the logs to the caller, e.g. to parse them func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []string) (*bufio.Reader, error) { - resp, err := executeInNode(ctx, node, cmd) + resp, err := executeInNode(ctx, node, cmd, nil) if resp != nil { defer resp.Close() } @@ -322,9 +322,23 @@ func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []str return resp.Reader, nil } +// GetImageStream creates a tar stream for the given images, to be read (and closed) by the caller +func (d Docker) GetImageStream(ctx context.Context, image []string) (io.ReadCloser, error) { + docker, err := GetDockerClient() + if err != nil { + return nil, err + } + reader, err := docker.ImageSave(ctx, image) + return reader, err +} + // ExecInNode execs a command inside a node func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) error { - execConnection, err := executeInNode(ctx, node, cmd) + return execInNode(ctx, node, cmd, nil) +} + +func execInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error { + execConnection, err := executeInNode(ctx, node, cmd, stdin) if execConnection != nil { defer execConnection.Close() } @@ -340,7 +354,11 @@ func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) er return err } -func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.HijackedResponse, error) { +func (d Docker) ExecInNodeWithStdin(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error { + return execInNode(ctx, node, cmd, stdin) +} + +func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) (*types.HijackedResponse, error) { l.Log().Debugf("Executing command '%+v' in node '%s'", cmd, node.Name) @@ -357,12 +375,18 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi } defer docker.Close() + var attachStdin bool = false + if stdin != nil { + attachStdin = true + } + // exec exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{ Privileged: true, Tty: true, AttachStderr: true, AttachStdout: true, + AttachStdin: attachStdin, Cmd: cmd, }) if err != nil { @@ -380,6 +404,32 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi return nil, fmt.Errorf("docker failed to start exec process in node '%s': %w", node.Name, err) } + // If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin + if stdin != nil { + go func() { + // read in 4K blocks + buffer := make([]byte, 4096) + defer stdin.Close() + for { + n, err := stdin.Read(buffer) + if err != nil { + l.Log().Tracef("Stopping read from stdin stream due to error: %w", err) + return + } + if n == 0 { + l.Log().Tracef("stdin stream appears to have ended, stopping write to exec connection") + return + } + + _, err = execConnection.Conn.Write(buffer) + if err != nil { + l.Log().Debugf("Failed to write stdin to exec stream, stopping write to exec connection: %w", err) + return + } + } + }() + } + for { // get info about exec process inside container execInfo, err := docker.ContainerExecInspect(ctx, exec.ID) diff --git a/pkg/runtimes/runtime.go b/pkg/runtimes/runtime.go index 3e925e6a0..0234be4ce 100644 --- a/pkg/runtimes/runtime.go +++ b/pkg/runtimes/runtime.go @@ -65,8 +65,10 @@ type Runtime interface { CreateVolume(context.Context, string, map[string]string) error DeleteVolume(context.Context, string) error GetVolume(string) (string, error) + GetImageStream(context.Context, []string) (io.ReadCloser, error) GetRuntimePath() string // returns e.g. '/var/run/docker.sock' for a default docker setup ExecInNode(context.Context, *k3d.Node, []string) error + ExecInNodeWithStdin(context.Context, *k3d.Node, []string, io.ReadCloser) error ExecInNodeGetLogs(context.Context, *k3d.Node, []string) (*bufio.Reader, error) GetNodeLogs(context.Context, *k3d.Node, time.Time, *runtimeTypes.NodeLogsOpts) (io.ReadCloser, error) GetImages(context.Context) ([]string, error) From 07354742ff05315380a73555f34dc0491a59b59d Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 09:29:42 +0200 Subject: [PATCH 02/17] feat: add parallel loading to nodes, add tar loading --- pkg/client/tools.go | 59 ++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index eb48c7f3a..945efd938 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -56,12 +56,12 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, * 3. From stdin: save to tar -> import * Note: temporary storage location is always the shared image volume and actions are always executed by the tools node */ - var importTarNames []string if len(imagesFromRuntime) > 0 { l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) // open a stream to all given images stream, err := runtime.GetImageStream(ctx, imagesFromRuntime) + // TODO this isn't going to work for multiple streams. Wrap in nopcloser and close here afterwards or something like that loadImageFromStream(ctx, runtime, stream, cluster) if err != nil { return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err) @@ -71,55 +71,54 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } // TODO: stdin for tar images as well - /*if len(imagesFromTar) > 0 { + if len(imagesFromTar) > 0 { // copy tarfiles to shared volume l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar)) - for _, file := range imagesFromTar { - tarName := fmt.Sprintf("%s/k3d-%s-images-%s-file-%s", k3d.DefaultImageVolumeMountPath, cluster.Name, time.Now().Format("20060102150405"), path.Base(file)) - if err := runtime.CopyToNode(ctx, file, tarName, toolsNode); err != nil { - l.Log().Errorf("failed to copy image tar '%s' to tools node! Error below:\n%+v", file, err) + + files := make([]*os.File, len(imagesFromTar)) + readers := make([]io.Reader, len(imagesFromTar)) + failedFiles := 0 + for i, fileName := range imagesFromTar { + file, err := os.Open(fileName) + if err != nil { + l.Log().Errorf("failed to read file '%s', skipping. Error below:\n%+v", fileName, err) + failedFiles++ continue } - importTarNames = append(importTarNames, tarName) + files[i] = file + readers[i] = file } - }*/ - - // import image in each node - l.Log().Infoln("Importing images into nodes...") - var importWaitgroup sync.WaitGroup - for _, tarName := range importTarNames { - for _, node := range cluster.Nodes { - // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) - if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { - importWaitgroup.Add(1) - go func(node *k3d.Node, wg *sync.WaitGroup, tarPath string) { - l.Log().Infof("Importing images from tarball '%s' into node '%s'...", tarPath, node.Name) - if err := runtime.ExecInNode(ctx, node, []string{"ctr", "image", "import", tarPath}); err != nil { - l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) - } - wg.Done() - }(node, &importWaitgroup, tarName) + multiReader := io.MultiReader(readers...) + loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster) + for _, file := range files { + err := file.Close() + if err != nil { + l.Log().Errorf("Failed to close file '%s' after reading. Error below:\n%+v", file.Name(), err) } } } - importWaitgroup.Wait() + // TODO: stdin reading from args l.Log().Infoln("Successfully imported image(s)") - return nil } func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) { + var importWaitgroup sync.WaitGroup for _, node := range cluster.Nodes { // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { - l.Log().Infof("Importing images into node '%s'...", node.Name) - if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil { - l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) - } + importWaitgroup.Add(1) + go func(node *k3d.Node, wg *sync.WaitGroup, stream io.ReadCloser) { + l.Log().Infof("Importing images into node '%s'...", node.Name) + if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil { + l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) + } + }(node, &importWaitgroup, stream) } } + importWaitgroup.Wait() } type runtimeImageGetter interface { From b0ac288b75d7a85f703bdc0d4ecdec4b1bcbee04 Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 12:32:26 +0200 Subject: [PATCH 03/17] feat: tar + runtime image loading working again, fixed cluster node initialization --- cmd/image/imageImport.go | 22 ++++++++++++++++++---- pkg/client/tools.go | 1 + pkg/runtimes/docker/node.go | 35 ++++++++++++++++++++++------------- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index e3b847f55..d8a4e2d6a 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -63,7 +63,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of errOccured := false for _, cluster := range clusters { l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name) - if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, &cluster, loadImageOpts); err != nil { + if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, cluster, loadImageOpts); err != nil { l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err) errOccured = true } @@ -94,16 +94,30 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of } // parseLoadImageCmd parses the command input into variables required to create a cluster -func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []k3d.Cluster) { +func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Cluster) { // --cluster clusterNames, err := cmd.Flags().GetStringArray("cluster") if err != nil { l.Log().Fatalln(err) } - clusters := []k3d.Cluster{} + clusters := []*k3d.Cluster{} for _, clusterName := range clusterNames { - clusters = append(clusters, k3d.Cluster{Name: clusterName}) + clusters = append(clusters, &k3d.Cluster{Name: clusterName}) + } + + // TODO is this actually necessary? looks like it never worked as intended at first glance. + // Figure out the nodes for each cluster + nodeList, err := client.NodeList(cmd.Context(), runtimes.SelectedRuntime) + if err != nil { + l.Log().Fatalf("Failed to list clusters %v", err) + } + for _, node := range nodeList { + for _, cluster := range clusters { + if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] { + cluster.Nodes = append(cluster.Nodes, node) + } + } } // images diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 945efd938..afcf123e6 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -115,6 +115,7 @@ func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream i if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil { l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) } + wg.Done() }(node, &importWaitgroup, stream) } } diff --git a/pkg/runtimes/docker/node.go b/pkg/runtimes/docker/node.go index c8a5ad9d7..3f681ab2b 100644 --- a/pkg/runtimes/docker/node.go +++ b/pkg/runtimes/docker/node.go @@ -375,15 +375,16 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.R } defer docker.Close() - var attachStdin bool = false + attachStdin := false if stdin != nil { attachStdin = true } // exec exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{ - Privileged: true, - Tty: true, + Privileged: true, + // Don't use tty true when piping stdin. + Tty: !attachStdin, AttachStderr: true, AttachStdout: true, AttachStdin: attachStdin, @@ -406,28 +407,36 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.R // If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin if stdin != nil { - go func() { - // read in 4K blocks - buffer := make([]byte, 4096) - defer stdin.Close() + go func(stdin io.ReadCloser) { + // pipe in 8K blocks + buffer := make([]byte, 8192) + defer func(stdin io.ReadCloser) { + err := stdin.Close() + if err != nil { + l.Log().Tracef("Failed to close stdin stream: %+v", err) + } + }(stdin) + nWritten := 0 for { n, err := stdin.Read(buffer) - if err != nil { - l.Log().Tracef("Stopping read from stdin stream due to error: %w", err) + + if n == 0 || (err != nil && err.Error() == "EOF") { + l.Log().Tracef("stdin stream appears to have ended, stopping write to exec connection") return } - if n == 0 { - l.Log().Tracef("stdin stream appears to have ended, stopping write to exec connection") + if err != nil { + l.Log().Tracef("Stopping read from stdin stream due to error: %w", err) return } - _, err = execConnection.Conn.Write(buffer) + nWrite, err := execConnection.Conn.Write(buffer) if err != nil { l.Log().Debugf("Failed to write stdin to exec stream, stopping write to exec connection: %w", err) return } + nWritten = nWritten + nWrite } - }() + }(stdin) } for { From 2f540d003c5f451b46fe61e1ab265b3d14308a86 Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 18:12:45 +0200 Subject: [PATCH 04/17] fix: remove outdated TODO --- pkg/client/tools.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index afcf123e6..e716757c8 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -61,7 +61,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) // open a stream to all given images stream, err := runtime.GetImageStream(ctx, imagesFromRuntime) - // TODO this isn't going to work for multiple streams. Wrap in nopcloser and close here afterwards or something like that loadImageFromStream(ctx, runtime, stream, cluster) if err != nil { return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err) @@ -101,7 +100,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, l.Log().Infoln("Successfully imported image(s)") return nil - } func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) { From ba58de1a1ca7b571c446ff18e3f9d3214b45189e Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 18:12:58 +0200 Subject: [PATCH 05/17] feat: allow importing images from stdin as well --- pkg/client/tools.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index e716757c8..f4d86f2fe 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -38,6 +38,13 @@ import ( // ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export // images from the runtime to import them into the nodes of the selected cluster func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, images []string, cluster *k3d.Cluster, opts k3d.ImageImportOpts) error { + + // stdin case + if len(images) == 1 && images[0] == "-" { + loadImageFromStream(ctx, runtime, os.Stdin, cluster) + return nil + } + imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images) if err != nil { return fmt.Errorf("failed to find images: %w", err) From 6e2d8b012cf7d484cb6c2c664b287c8d1cc27500 Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 20:16:43 +0200 Subject: [PATCH 06/17] fix: multiplex the readers so we can copy to multiple nodes in any case, refactor stdin writing in docker exec to also use io.Copy instead --- pkg/client/tools.go | 28 ++++++++++++++++++++++++++- pkg/runtimes/docker/node.go | 38 +++++++++---------------------------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index f4d86f2fe..ee68edc67 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -111,6 +111,31 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) { var importWaitgroup sync.WaitGroup + + numNodes := 0 + for _, node := range cluster.Nodes { + // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) + if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { + numNodes++ + } + } + // multiplex the stream so we can write to multiple nodes + pipeReaders := make([]*io.PipeReader, numNodes) + pipeWriters := make([]io.Writer, numNodes) + for i := 0; i < numNodes; i++ { + reader, writer := io.Pipe() + pipeReaders[i] = reader + pipeWriters[i] = writer + } + + go func() { + _, err := io.Copy(io.MultiWriter(pipeWriters...), stream) + if err != nil { + l.Log().Errorf("Failed to copy read stream. %v", err) + } + }() + + pipeId := 0 for _, node := range cluster.Nodes { // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { @@ -121,7 +146,8 @@ func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream i l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) } wg.Done() - }(node, &importWaitgroup, stream) + }(node, &importWaitgroup, pipeReaders[pipeId]) + pipeId++ } } importWaitgroup.Wait() diff --git a/pkg/runtimes/docker/node.go b/pkg/runtimes/docker/node.go index 3f681ab2b..820ddc581 100644 --- a/pkg/runtimes/docker/node.go +++ b/pkg/runtimes/docker/node.go @@ -407,36 +407,16 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.R // If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin if stdin != nil { - go func(stdin io.ReadCloser) { - // pipe in 8K blocks - buffer := make([]byte, 8192) - defer func(stdin io.ReadCloser) { - err := stdin.Close() - if err != nil { - l.Log().Tracef("Failed to close stdin stream: %+v", err) - } - }(stdin) - nWritten := 0 - for { - n, err := stdin.Read(buffer) - - if n == 0 || (err != nil && err.Error() == "EOF") { - l.Log().Tracef("stdin stream appears to have ended, stopping write to exec connection") - return - } - if err != nil { - l.Log().Tracef("Stopping read from stdin stream due to error: %w", err) - return - } - - nWrite, err := execConnection.Conn.Write(buffer) - if err != nil { - l.Log().Debugf("Failed to write stdin to exec stream, stopping write to exec connection: %w", err) - return - } - nWritten = nWritten + nWrite + go func() { + _, err := io.Copy(execConnection.Conn, stdin) + if err != nil { + l.Log().Errorf("Failed to copy read stream. %v", err) + } + err = stdin.Close() + if err != nil { + l.Log().Errorf("Failed to close stdin stream. %v", err) } - }(stdin) + }() } for { From 5a65aff05d3e02ce4fb4388abd086eba87ec5bed Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 20:20:34 +0200 Subject: [PATCH 07/17] refactor: remove no longer valid TODOs --- pkg/client/tools.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index ee68edc67..d8ac81565 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -76,7 +76,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } - // TODO: stdin for tar images as well if len(imagesFromTar) > 0 { // copy tarfiles to shared volume l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar)) @@ -103,7 +102,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } } } - // TODO: stdin reading from args l.Log().Infoln("Successfully imported image(s)") return nil From f365ace320e0ddf15c7d62f2cb6db7ed1d4c49b4 Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 26 Oct 2021 20:24:53 +0200 Subject: [PATCH 08/17] fix: ensure initial ReadCloser stream gets closed properly after copying --- pkg/client/tools.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index d8ac81565..6b58fa388 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -131,6 +131,10 @@ func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream i if err != nil { l.Log().Errorf("Failed to copy read stream. %v", err) } + err = stream.Close() + if err != nil { + l.Log().Errorf("Failed to close stream. %v", err) + } }() pipeId := 0 From 052d6914e7a79b0c062bd3a65ed6760586f80a6b Mon Sep 17 00:00:00 2001 From: sbaier Date: Thu, 28 Oct 2021 01:02:21 +0200 Subject: [PATCH 09/17] feat: initial implementation of switchable loading modes with support for all methods --- cmd/image/imageImport.go | 20 ++++- docs/usage/commands/k3d_image_import.md | 1 + pkg/client/tools.go | 113 +++++++++++++++++++++++- pkg/types/types.go | 17 ++++ 4 files changed, 144 insertions(+), 7 deletions(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index d8a4e2d6a..7199ece7f 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -59,16 +59,28 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { images, clusters := parseLoadImageCmd(cmd, args) + + loadModeStr, err := cmd.Flags().GetString("load-mode") + if err != nil { + l.Log().Errorln("No load-mode specified") + l.Log().Fatalln(err) + } + if mode, ok := k3d.LoadModes[loadModeStr]; !ok { + l.Log().Fatalf("Unknown image loading mode '%s'\n", loadModeStr) + } else { + loadImageOpts.LoadingMode = mode + } + l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters) - errOccured := false + errOccurred := false for _, cluster := range clusters { l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name) if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, cluster, loadImageOpts); err != nil { l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err) - errOccured = true + errOccurred = true } } - if errOccured { + if errOccurred { l.Log().Warnln("At least one error occured while trying to import the image(s) into the selected cluster(s)") os.Exit(1) } @@ -86,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") - + cmd.Flags().StringP("load-mode", "lm", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node]. Default: auto") /* Subcommands */ // done diff --git a/docs/usage/commands/k3d_image_import.md b/docs/usage/commands/k3d_image_import.md index c11dada67..5a9fb20a2 100644 --- a/docs/usage/commands/k3d_image_import.md +++ b/docs/usage/commands/k3d_image_import.md @@ -29,6 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] -h, --help help for import -k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume -t, --keep-tools Do not delete the tools node after import + -lm, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. Default: auto ``` ### Options inherited from parent commands diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 6b58fa388..9397f93c2 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -27,8 +27,11 @@ import ( "fmt" "io" "os" + "path" + "regexp" "strings" "sync" + "time" l "github.com/rancher/k3d/v5/pkg/logger" "github.com/rancher/k3d/v5/pkg/runtimes" @@ -52,7 +55,30 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, // no images found to load -> exit early if len(imagesFromRuntime)+len(imagesFromTar) == 0 { - return fmt.Errorf("No valid images specified") + return fmt.Errorf("no valid images specified") + } + + loadWithToolsNode := false + + switch opts.LoadingMode { + case k3d.AutoDetect: + info, err := runtimes.SelectedRuntime.Info() + if err != nil { + return fmt.Errorf("failed to retrieve container runtime information: %w", err) + } + // Regexp matches for ssh:// and tcp:// runtime endpoints which are not well-known local addresses + regex, err := regexp.Compile("(ssh|tcp):\\/\\/(?!localhost|127.0.0.1).+") + if err != nil { + return fmt.Errorf("failed to compile remote runtime endpoint regexp: %w", err) + } + if regex.MatchString(info.Endpoint) { + l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") + loadWithToolsNode = true + } + case k3d.ToolsNode: + loadWithToolsNode = true + case k3d.Direct: + loadWithToolsNode = false } /* TODO: @@ -63,7 +89,90 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, * 3. From stdin: save to tar -> import * Note: temporary storage location is always the shared image volume and actions are always executed by the tools node */ + if loadWithToolsNode { + err = importWithToolsNode(ctx, runtime, err, cluster, imagesFromRuntime, imagesFromTar, opts) + } else { + err = importWithStream(ctx, runtime, imagesFromRuntime, cluster, imagesFromTar) + } + if err != nil { + return err + } + + l.Log().Infoln("Successfully imported image(s)") + return nil +} + +func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, err error, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error { + // create tools node to export images + toolsNode, err := EnsureToolsNode(ctx, runtime, cluster) + if err != nil { + return fmt.Errorf("failed to ensure that tools node is running: %w", err) + } + + var importTarNames []string + + if len(imagesFromRuntime) > 0 { + // save image to tarfile in shared volume + l.Log().Infof("Saving %d image(s) from runtime...", len(imagesFromRuntime)) + tarName := fmt.Sprintf("%s/k3d-%s-images-%s.tar", k3d.DefaultImageVolumeMountPath, cluster.Name, time.Now().Format("20060102150405")) + if err := runtime.ExecInNode(ctx, toolsNode, append([]string{"./k3d-tools", "save-image", "-d", tarName}, imagesFromRuntime...)); err != nil { + return fmt.Errorf("failed to save image(s) in tools container for cluster '%s': %w", cluster.Name, err) + } + importTarNames = append(importTarNames, tarName) + } + + if len(imagesFromTar) > 0 { + // copy tarfiles to shared volume + l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar)) + for _, file := range imagesFromTar { + tarName := fmt.Sprintf("%s/k3d-%s-images-%s-file-%s", k3d.DefaultImageVolumeMountPath, cluster.Name, time.Now().Format("20060102150405"), path.Base(file)) + if err := runtime.CopyToNode(ctx, file, tarName, toolsNode); err != nil { + l.Log().Errorf("failed to copy image tar '%s' to tools node! Error below:\n%+v", file, err) + continue + } + importTarNames = append(importTarNames, tarName) + } + } + + // import image in each node + l.Log().Infoln("Importing images into nodes...") + var importWaitgroup sync.WaitGroup + for _, tarName := range importTarNames { + for _, node := range cluster.Nodes { + // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) + if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { + importWaitgroup.Add(1) + go func(node *k3d.Node, wg *sync.WaitGroup, tarPath string) { + l.Log().Infof("Importing images from tarball '%s' into node '%s'...", tarPath, node.Name) + if err := runtime.ExecInNode(ctx, node, []string{"ctr", "image", "import", tarPath}); err != nil { + l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) + } + wg.Done() + }(node, &importWaitgroup, tarName) + } + } + } + importWaitgroup.Wait() + + // remove tarball + if !opts.KeepTar && len(importTarNames) > 0 { + l.Log().Infoln("Removing the tarball(s) from image volume...") + if err := runtime.ExecInNode(ctx, toolsNode, []string{"rm", "-f", strings.Join(importTarNames, " ")}); err != nil { + l.Log().Errorf("failed to delete one or more tarballs from '%+v': %v", importTarNames, err) + } + } + + // delete tools container + if !opts.KeepToolsNode { + l.Log().Infoln("Removing k3d-tools node...") + if err := runtime.DeleteNode(ctx, toolsNode); err != nil { + l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err) + } + } + return nil +} +func importWithStream(ctx context.Context, runtime runtimes.Runtime, imagesFromRuntime []string, cluster *k3d.Cluster, imagesFromTar []string) error { if len(imagesFromRuntime) > 0 { l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) // open a stream to all given images @@ -102,8 +211,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, } } } - - l.Log().Infoln("Successfully imported image(s)") return nil } diff --git a/pkg/types/types.go b/pkg/types/types.go index de819b359..a0a8eb606 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -178,10 +178,27 @@ type NodeHookAction interface { Run(ctx context.Context, node *Node) error } +// LoadMode describes how images are loaded into the cluster +type LoadMode string + +const ( + AutoDetect LoadMode = "auto" + Direct LoadMode = "direct" + ToolsNode LoadMode = "tools-node" +) + +// LoadModes defines the loading methods for image loading +var LoadModes = map[string]LoadMode{ + string(AutoDetect): AutoDetect, + string(Direct): Direct, + string(ToolsNode): ToolsNode, +} + // ImageImportOpts describes a set of options one can set for loading image(s) into cluster(s) type ImageImportOpts struct { KeepTar bool KeepToolsNode bool + LoadingMode LoadMode } type IPAM struct { From af56f804e2dcd25127976004b37423e1d8535580 Mon Sep 17 00:00:00 2001 From: sbaier Date: Thu, 28 Oct 2021 01:08:35 +0200 Subject: [PATCH 10/17] fix: invalid shortcode for command --- cmd/image/imageImport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index 7199ece7f..44033d30a 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -98,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") - cmd.Flags().StringP("load-mode", "lm", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node]. Default: auto") + cmd.Flags().StringP("load-mode", "m", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node].") /* Subcommands */ // done From 71ffb19630663f5be002097df0e08c6feb67fc64 Mon Sep 17 00:00:00 2001 From: sbaier Date: Thu, 28 Oct 2021 01:08:50 +0200 Subject: [PATCH 11/17] doc: update docs + add info about loading modes --- docs/usage/commands/k3d_image_import.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/docs/usage/commands/k3d_image_import.md b/docs/usage/commands/k3d_image_import.md index 5a9fb20a2..b9f8bb1c5 100644 --- a/docs/usage/commands/k3d_image_import.md +++ b/docs/usage/commands/k3d_image_import.md @@ -29,7 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] -h, --help help for import -k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume -t, --keep-tools Do not delete the tools node after import - -lm, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. Default: auto + -m, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. (default "auto") ``` ### Options inherited from parent commands @@ -40,6 +40,25 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] --verbose Enable verbose output (debug logging) ``` + +### Loading modes + +#### Auto + +Auto-determine whether to use `direct` or `tools-node`. + +For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes. + +Otherwise direct is used. + +#### Direct + +Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written. + +#### Tools Node + +Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there. + ### SEE ALSO * [k3d image](k3d_image.md) - Handle container images. From 772e9917954b3a6bd70c7b60b8b528c437336351 Mon Sep 17 00:00:00 2001 From: sbaier Date: Thu, 28 Oct 2021 10:58:39 +0200 Subject: [PATCH 12/17] fix: remote docker daemon detection --- pkg/client/tools.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 9397f93c2..7e31fa1cd 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -28,7 +28,6 @@ import ( "io" "os" "path" - "regexp" "strings" "sync" "time" @@ -62,16 +61,14 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, switch opts.LoadingMode { case k3d.AutoDetect: - info, err := runtimes.SelectedRuntime.Info() if err != nil { return fmt.Errorf("failed to retrieve container runtime information: %w", err) } - // Regexp matches for ssh:// and tcp:// runtime endpoints which are not well-known local addresses - regex, err := regexp.Compile("(ssh|tcp):\\/\\/(?!localhost|127.0.0.1).+") if err != nil { return fmt.Errorf("failed to compile remote runtime endpoint regexp: %w", err) } - if regex.MatchString(info.Endpoint) { + runtimeHost := runtime.GetHost() + if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" { l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") loadWithToolsNode = true } From 56921af5ffaa5ced58e46253392006ec33bef549 Mon Sep 17 00:00:00 2001 From: Simon Baier Date: Sun, 14 Nov 2021 22:34:42 +0000 Subject: [PATCH 13/17] Apply suggestions from code review Co-authored-by: Thorsten Klein --- cmd/image/imageImport.go | 4 ++-- pkg/client/tools.go | 8 ++++---- pkg/types/types.go | 20 ++++++++++---------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index 44033d30a..745da7c28 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -60,7 +60,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of Run: func(cmd *cobra.Command, args []string) { images, clusters := parseLoadImageCmd(cmd, args) - loadModeStr, err := cmd.Flags().GetString("load-mode") + loadModeStr, err := cmd.Flags().GetString("mode") if err != nil { l.Log().Errorln("No load-mode specified") l.Log().Fatalln(err) @@ -98,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") - cmd.Flags().StringP("load-mode", "m", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node].") + cmd.Flags().StringP("mode", "m", string(k3d.ImportModeAutoDetect), "Which method to use to import images into the cluster [auto, direct, tools].") /* Subcommands */ // done diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 7e31fa1cd..2f7d1676e 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -60,7 +60,7 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, loadWithToolsNode := false switch opts.LoadingMode { - case k3d.AutoDetect: + case k3d.ImportModeAutoDetect: if err != nil { return fmt.Errorf("failed to retrieve container runtime information: %w", err) } @@ -72,9 +72,9 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") loadWithToolsNode = true } - case k3d.ToolsNode: + case k3d.ImportModeTools: loadWithToolsNode = true - case k3d.Direct: + case k3d.ImportModeDirect: loadWithToolsNode = false } @@ -184,7 +184,7 @@ func importWithStream(ctx context.Context, runtime runtimes.Runtime, imagesFromR if len(imagesFromTar) > 0 { // copy tarfiles to shared volume - l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar)) + l.Log().Infof("Importing images from %d tarball(s)...", len(imagesFromTar)) files := make([]*os.File, len(imagesFromTar)) readers := make([]io.Reader, len(imagesFromTar)) diff --git a/pkg/types/types.go b/pkg/types/types.go index a0a8eb606..3aac49798 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -179,26 +179,26 @@ type NodeHookAction interface { } // LoadMode describes how images are loaded into the cluster -type LoadMode string +type ImportMode string const ( - AutoDetect LoadMode = "auto" - Direct LoadMode = "direct" - ToolsNode LoadMode = "tools-node" + ImportModeAutoDetect ImportMode = "auto" + ImportModeDirect ImportMode = "direct" + ImportModeToolsNode ImportMode = "tools-node" ) -// LoadModes defines the loading methods for image loading -var LoadModes = map[string]LoadMode{ - string(AutoDetect): AutoDetect, - string(Direct): Direct, - string(ToolsNode): ToolsNode, +// ImportModes defines the loading methods for image loading +var ImportModes = map[string]ImportMode{ + string(ImportModeAutoDetect): ImportModeAutoDetect, + string(ImportModeDirect): ImportModeDirect, + string(ImportModeToolsNode): ImportModeToolsNode, } // ImageImportOpts describes a set of options one can set for loading image(s) into cluster(s) type ImageImportOpts struct { KeepTar bool KeepToolsNode bool - LoadingMode LoadMode + Mode ImportMode } type IPAM struct { From 4589999b1059b1809f5c422c4d6bd799de9eb3b4 Mon Sep 17 00:00:00 2001 From: sbaier Date: Mon, 15 Nov 2021 00:22:27 +0100 Subject: [PATCH 14/17] fix: some minor syntax error from PR suggestions, move doc to new, non-auto-generated section --- cmd/image/imageImport.go | 6 +++--- docs/usage/.pages | 1 + docs/usage/commands/k3d_image_import.md | 21 +-------------------- docs/usage/importing_images.md | 18 ++++++++++++++++++ pkg/client/tools.go | 12 ++++++------ 5 files changed, 29 insertions(+), 29 deletions(-) create mode 100644 docs/usage/importing_images.md diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index 745da7c28..5c30dd66a 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -65,10 +65,10 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of l.Log().Errorln("No load-mode specified") l.Log().Fatalln(err) } - if mode, ok := k3d.LoadModes[loadModeStr]; !ok { + if mode, ok := k3d.ImportModes[loadModeStr]; !ok { l.Log().Fatalf("Unknown image loading mode '%s'\n", loadModeStr) } else { - loadImageOpts.LoadingMode = mode + loadImageOpts.Mode = mode } l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters) @@ -98,7 +98,7 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume") cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import") - cmd.Flags().StringP("mode", "m", string(k3d.ImportModeAutoDetect), "Which method to use to import images into the cluster [auto, direct, tools].") + cmd.Flags().StringP("mode", "m", string(k3d.ImportModeAutoDetect), "Which method to use to import images into the cluster [auto, direct, tools]. See https://k3d.io/usage/guides/importing_images/") /* Subcommands */ // done diff --git a/docs/usage/.pages b/docs/usage/.pages index 051903554..1d7baa526 100644 --- a/docs/usage/.pages +++ b/docs/usage/.pages @@ -5,5 +5,6 @@ nav: - multiserver.md - registries.md - exposing_services.md + - importing_images.md - advanced - commands diff --git a/docs/usage/commands/k3d_image_import.md b/docs/usage/commands/k3d_image_import.md index b9f8bb1c5..04c147e92 100644 --- a/docs/usage/commands/k3d_image_import.md +++ b/docs/usage/commands/k3d_image_import.md @@ -29,7 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] -h, --help help for import -k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume -t, --keep-tools Do not delete the tools node after import - -m, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. (default "auto") + -m, --mode string Which method to use to import images into the cluster [auto, direct, tools]. (default "auto") ``` ### Options inherited from parent commands @@ -40,25 +40,6 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags] --verbose Enable verbose output (debug logging) ``` - -### Loading modes - -#### Auto - -Auto-determine whether to use `direct` or `tools-node`. - -For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes. - -Otherwise direct is used. - -#### Direct - -Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written. - -#### Tools Node - -Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there. - ### SEE ALSO * [k3d image](k3d_image.md) - Handle container images. diff --git a/docs/usage/importing_images.md b/docs/usage/importing_images.md new file mode 100644 index 000000000..06ab1918c --- /dev/null +++ b/docs/usage/importing_images.md @@ -0,0 +1,18 @@ +# Importing modes + +## Auto + +Auto-determine whether to use `direct` or `tools-node`. + +For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes. + +Otherwise direct is used. + +## Direct + +Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written. + +## Tools Node + +Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there. + diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 2f7d1676e..df307eeb9 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -59,7 +59,7 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, loadWithToolsNode := false - switch opts.LoadingMode { + switch opts.Mode { case k3d.ImportModeAutoDetect: if err != nil { return fmt.Errorf("failed to retrieve container runtime information: %w", err) @@ -72,7 +72,7 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") loadWithToolsNode = true } - case k3d.ImportModeTools: + case k3d.ImportModeToolsNode: loadWithToolsNode = true case k3d.ImportModeDirect: loadWithToolsNode = false @@ -87,9 +87,9 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, * Note: temporary storage location is always the shared image volume and actions are always executed by the tools node */ if loadWithToolsNode { - err = importWithToolsNode(ctx, runtime, err, cluster, imagesFromRuntime, imagesFromTar, opts) + err = importWithToolsNode(ctx, runtime, cluster, imagesFromRuntime, imagesFromTar, opts) } else { - err = importWithStream(ctx, runtime, imagesFromRuntime, cluster, imagesFromTar) + err = importWithStream(ctx, runtime, cluster, imagesFromRuntime, imagesFromTar) } if err != nil { return err @@ -99,7 +99,7 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, return nil } -func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, err error, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error { +func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error { // create tools node to export images toolsNode, err := EnsureToolsNode(ctx, runtime, cluster) if err != nil { @@ -169,7 +169,7 @@ func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, err erro return nil } -func importWithStream(ctx context.Context, runtime runtimes.Runtime, imagesFromRuntime []string, cluster *k3d.Cluster, imagesFromTar []string) error { +func importWithStream(ctx context.Context, runtime runtimes.Runtime, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string) error { if len(imagesFromRuntime) > 0 { l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) // open a stream to all given images From 63c86680397704d1c6a2a6a862bde87eb0b1c08f Mon Sep 17 00:00:00 2001 From: sbaier Date: Tue, 16 Nov 2021 10:07:18 +0100 Subject: [PATCH 15/17] fix: add error-handling for stream loading runtime errors, fix initial node role determination --- cmd/image/imageImport.go | 3 +-- pkg/client/tools.go | 51 +++++++++++++++++++++++----------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index 5c30dd66a..d8582a56e 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -118,7 +118,6 @@ func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Clus clusters = append(clusters, &k3d.Cluster{Name: clusterName}) } - // TODO is this actually necessary? looks like it never worked as intended at first glance. // Figure out the nodes for each cluster nodeList, err := client.NodeList(cmd.Context(), runtimes.SelectedRuntime) if err != nil { @@ -126,7 +125,7 @@ func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Clus } for _, node := range nodeList { for _, cluster := range clusters { - if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] { + if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] && node.Role == k3d.ServerRole { cluster.Nodes = append(cluster.Nodes, node) } } diff --git a/pkg/client/tools.go b/pkg/client/tools.go index df307eeb9..32a1cc065 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -25,6 +25,7 @@ package client import ( "context" "fmt" + "golang.org/x/sync/errgroup" "io" "os" "path" @@ -43,8 +44,8 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, // stdin case if len(images) == 1 && images[0] == "-" { - loadImageFromStream(ctx, runtime, os.Stdin, cluster) - return nil + err := loadImageFromStream(ctx, runtime, os.Stdin, cluster) + return fmt.Errorf("failed to load image to cluster from stdin: %v", err) } imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images) @@ -64,9 +65,6 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, if err != nil { return fmt.Errorf("failed to retrieve container runtime information: %w", err) } - if err != nil { - return fmt.Errorf("failed to compile remote runtime endpoint regexp: %w", err) - } runtimeHost := runtime.GetHost() if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" { l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images") @@ -174,9 +172,12 @@ func importWithStream(ctx context.Context, runtime runtimes.Runtime, cluster *k3 l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime)) // open a stream to all given images stream, err := runtime.GetImageStream(ctx, imagesFromRuntime) - loadImageFromStream(ctx, runtime, stream, cluster) if err != nil { - return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err) + return fmt.Errorf("could not open image stream for given images %s: %w", imagesFromRuntime, err) + } + err = loadImageFromStream(ctx, runtime, stream, cluster) + if err != nil { + return fmt.Errorf("could not load image to cluster from stream %s: %w", imagesFromRuntime, err) } // load the images directly into the nodes @@ -200,7 +201,10 @@ func importWithStream(ctx context.Context, runtime runtimes.Runtime, cluster *k3 readers[i] = file } multiReader := io.MultiReader(readers...) - loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster) + err := loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster) + if err != nil { + return fmt.Errorf("could not load image to cluster from stream %s: %w", imagesFromTar, err) + } for _, file := range files { err := file.Close() if err != nil { @@ -211,8 +215,8 @@ func importWithStream(ctx context.Context, runtime runtimes.Runtime, cluster *k3 return nil } -func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) { - var importWaitgroup sync.WaitGroup +func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) error { + var errorGroup errgroup.Group numNodes := 0 for _, node := range cluster.Nodes { @@ -230,33 +234,38 @@ func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream i pipeWriters[i] = writer } - go func() { + errorGroup.Go(func() error { _, err := io.Copy(io.MultiWriter(pipeWriters...), stream) if err != nil { - l.Log().Errorf("Failed to copy read stream. %v", err) + return fmt.Errorf("failed to copy read stream. %v", err) } err = stream.Close() if err != nil { - l.Log().Errorf("Failed to close stream. %v", err) + return fmt.Errorf("failed to close stream. %v", err) } - }() + return nil + }) pipeId := 0 for _, node := range cluster.Nodes { // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { - importWaitgroup.Add(1) - go func(node *k3d.Node, wg *sync.WaitGroup, stream io.ReadCloser) { + pipeReader := pipeReaders[pipeId] + errorGroup.Go(func() error { l.Log().Infof("Importing images into node '%s'...", node.Name) - if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil { - l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err) + if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, pipeReader); err != nil { + return fmt.Errorf("failed to import images in node '%s': %v", node.Name, err) } - wg.Done() - }(node, &importWaitgroup, pipeReaders[pipeId]) + return nil + }) pipeId++ } } - importWaitgroup.Wait() + err := errorGroup.Wait() + if err != nil { + return fmt.Errorf("error loading image to cluster, first error: %v", err) + } + return nil } type runtimeImageGetter interface { From e5ec230564339c0ed55d1b312be5b88cb97cb6e6 Mon Sep 17 00:00:00 2001 From: Thorsten Klein Date: Tue, 16 Nov 2021 11:45:20 +0100 Subject: [PATCH 16/17] Use ClusterGet to retrieve cluster information/nodes --- cmd/image/imageImport.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/cmd/image/imageImport.go b/cmd/image/imageImport.go index d8582a56e..694bae245 100644 --- a/cmd/image/imageImport.go +++ b/cmd/image/imageImport.go @@ -115,20 +115,11 @@ func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Clus } clusters := []*k3d.Cluster{} for _, clusterName := range clusterNames { - clusters = append(clusters, &k3d.Cluster{Name: clusterName}) - } - - // Figure out the nodes for each cluster - nodeList, err := client.NodeList(cmd.Context(), runtimes.SelectedRuntime) - if err != nil { - l.Log().Fatalf("Failed to list clusters %v", err) - } - for _, node := range nodeList { - for _, cluster := range clusters { - if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] && node.Role == k3d.ServerRole { - cluster.Nodes = append(cluster.Nodes, node) - } + cluster, err := client.ClusterGet(cmd.Context(), runtimes.SelectedRuntime, &k3d.Cluster{Name: clusterName}) + if err != nil { + l.Log().Fatalf("failed to get cluster %s: %v", clusterName, err) } + clusters = append(clusters, cluster) } // images From 1f1b7b58bc5e5a1d962bce16af2a29805ea3dc02 Mon Sep 17 00:00:00 2001 From: Thorsten Klein Date: Tue, 16 Nov 2021 13:02:34 +0100 Subject: [PATCH 17/17] capture node in loop to avoid using the same node in every iteration --- pkg/client/tools.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/client/tools.go b/pkg/client/tools.go index 32a1cc065..bb3348c79 100644 --- a/pkg/client/tools.go +++ b/pkg/client/tools.go @@ -247,7 +247,8 @@ func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream i }) pipeId := 0 - for _, node := range cluster.Nodes { + for _, n := range cluster.Nodes { + node := n // only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer) if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole { pipeReader := pipeReaders[pipeId]