Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement]: improve image loading performance #826

Merged
merged 19 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
661b665
feat: initial fast, tool-node-less loading facilities for container i…
sbaier1 Oct 25, 2021
0735474
feat: add parallel loading to nodes, add tar loading
sbaier1 Oct 26, 2021
b0ac288
feat: tar + runtime image loading working again, fixed cluster node i…
sbaier1 Oct 26, 2021
2f540d0
fix: remove outdated TODO
sbaier1 Oct 26, 2021
ba58de1
feat: allow importing images from stdin as well
sbaier1 Oct 26, 2021
6e2d8b0
fix: multiplex the readers so we can copy to multiple nodes in any ca…
sbaier1 Oct 26, 2021
5a65aff
refactor: remove no longer valid TODOs
sbaier1 Oct 26, 2021
f365ace
fix: ensure initial ReadCloser stream gets closed properly after copying
sbaier1 Oct 26, 2021
052d691
feat: initial implementation of switchable loading modes with support…
sbaier1 Oct 27, 2021
af56f80
fix: invalid shortcode for command
sbaier1 Oct 27, 2021
71ffb19
doc: update docs + add info about loading modes
sbaier1 Oct 27, 2021
772e991
fix: remote docker daemon detection
sbaier1 Oct 28, 2021
56921af
Apply suggestions from code review
sbaier1 Nov 14, 2021
4589999
fix: some minor syntax error from PR suggestions, move doc to new, no…
sbaier1 Nov 14, 2021
9bc54b5
Merge branch 'main' into feat/fast-load
iwilltry42 Nov 15, 2021
63c8668
fix: add error-handling for stream loading runtime errors, fix initia…
sbaier1 Nov 16, 2021
e5ec230
Use ClusterGet to retrieve cluster information/nodes
iwilltry42 Nov 16, 2021
3224f1b
Merge branch 'main' into feat/fast-load
iwilltry42 Nov 16, 2021
1f1b7b5
capture node in loop to avoid using the same node in every iteration
iwilltry42 Nov 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions cmd/image/imageImport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,28 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
images, clusters := parseLoadImageCmd(cmd, args)

loadModeStr, err := cmd.Flags().GetString("load-mode")
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
l.Log().Errorln("No load-mode specified")
l.Log().Fatalln(err)
}
if mode, ok := k3d.LoadModes[loadModeStr]; !ok {
l.Log().Fatalf("Unknown image loading mode '%s'\n", loadModeStr)
} else {
loadImageOpts.LoadingMode = mode
}

l.Log().Debugf("Importing image(s) [%+v] from runtime [%s] into cluster(s) [%+v]...", images, runtimes.SelectedRuntime, clusters)
errOccured := false
errOccurred := false
for _, cluster := range clusters {
l.Log().Infof("Importing image(s) into cluster '%s'", cluster.Name)
if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, &cluster, loadImageOpts); err != nil {
if err := client.ImageImportIntoClusterMulti(cmd.Context(), runtimes.SelectedRuntime, images, cluster, loadImageOpts); err != nil {
l.Log().Errorf("Failed to import image(s) into cluster '%s': %+v", cluster.Name, err)
errOccured = true
errOccurred = true
}
}
if errOccured {
if errOccurred {
l.Log().Warnln("At least one error occured while trying to import the image(s) into the selected cluster(s)")
os.Exit(1)
}
Expand All @@ -86,24 +98,38 @@ So if a file './rancher/k3d-tools' exists, k3d will try to import it instead of

cmd.Flags().BoolVarP(&loadImageOpts.KeepTar, "keep-tarball", "k", false, "Do not delete the tarball containing the saved images from the shared volume")
cmd.Flags().BoolVarP(&loadImageOpts.KeepToolsNode, "keep-tools", "t", false, "Do not delete the tools node after import")

cmd.Flags().StringP("load-mode", "m", string(k3d.AutoDetect), "Which method to use to load images to the cluster [auto, direct, tools-node].")
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
/* Subcommands */

// done
return cmd
}

// parseLoadImageCmd parses the command input into variables required to create a cluster
func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []k3d.Cluster) {
func parseLoadImageCmd(cmd *cobra.Command, args []string) ([]string, []*k3d.Cluster) {

// --cluster
clusterNames, err := cmd.Flags().GetStringArray("cluster")
if err != nil {
l.Log().Fatalln(err)
}
clusters := []k3d.Cluster{}
clusters := []*k3d.Cluster{}
for _, clusterName := range clusterNames {
clusters = append(clusters, k3d.Cluster{Name: clusterName})
clusters = append(clusters, &k3d.Cluster{Name: clusterName})
}

// TODO is this actually necessary? looks like it never worked as intended at first glance.
// Figure out the nodes for each cluster
nodeList, err := client.NodeList(cmd.Context(), runtimes.SelectedRuntime)
if err != nil {
l.Log().Fatalf("Failed to list clusters %v", err)
}
for _, node := range nodeList {
for _, cluster := range clusters {
if cluster.Name == node.RuntimeLabels[k3d.LabelClusterName] {
cluster.Nodes = append(cluster.Nodes, node)
}
}
}

// images
Expand Down
20 changes: 20 additions & 0 deletions docs/usage/commands/k3d_image_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags]
-h, --help help for import
-k, --keep-tarball Do not delete the tarball containing the saved images from the shared volume
-t, --keep-tools Do not delete the tools node after import
-m, --load-mode Which method to use to load images to the cluster [auto, direct, tools-node]. (default "auto")
```

### Options inherited from parent commands
Expand All @@ -39,6 +40,25 @@ k3d image import [IMAGE | ARCHIVE [IMAGE | ARCHIVE...]] [flags]
--verbose Enable verbose output (debug logging)
```


### Loading modes

#### Auto

Auto-determine whether to use `direct` or `tools-node`.

For remote container runtimes, `tools-node` is faster due to less network overhead, thus it is automatically selected for remote runtimes.

Otherwise direct is used.

#### Direct

Directly load the given images to the k3s nodes. No separate container is spawned, no intermediate files are written.

#### Tools Node

Start a `k3d-tools` container in the container runtime, copy images to that runtime, then load the images to k3s nodes from there.

### SEE ALSO
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved

* [k3d image](k3d_image.md) - Handle container images.
Expand Down
141 changes: 135 additions & 6 deletions pkg/client/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package client
import (
"context"
"fmt"
"io"
"os"
"path"
"strings"
Expand All @@ -39,20 +40,42 @@ import (
// ImageImportIntoClusterMulti starts up a k3d tools container for the selected cluster and uses it to export
// images from the runtime to import them into the nodes of the selected cluster
func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime, images []string, cluster *k3d.Cluster, opts k3d.ImageImportOpts) error {

// stdin case
if len(images) == 1 && images[0] == "-" {
loadImageFromStream(ctx, runtime, os.Stdin, cluster)
return nil
}

imagesFromRuntime, imagesFromTar, err := findImages(ctx, runtime, images)
if err != nil {
return fmt.Errorf("failed to find images: %w", err)
}

// no images found to load -> exit early
if len(imagesFromRuntime)+len(imagesFromTar) == 0 {
return fmt.Errorf("No valid images specified")
return fmt.Errorf("no valid images specified")
}

// create tools node to export images
toolsNode, err := EnsureToolsNode(ctx, runtime, cluster)
if err != nil {
return fmt.Errorf("failed to ensure that tools node is running: %w", err)
loadWithToolsNode := false

switch opts.LoadingMode {
case k3d.AutoDetect:
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to retrieve container runtime information: %w", err)
}
if err != nil {
return fmt.Errorf("failed to compile remote runtime endpoint regexp: %w", err)
}
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
runtimeHost := runtime.GetHost()
if runtimeHost != "" && runtimeHost != "localhost" && runtimeHost != "127.0.0.1" {
l.Log().Infof("Auto-detected a remote docker daemon, using tools node for loading images")
loadWithToolsNode = true
}
case k3d.ToolsNode:
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
loadWithToolsNode = true
case k3d.Direct:
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
loadWithToolsNode = false
}

/* TODO:
Expand All @@ -63,6 +86,26 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
* 3. From stdin: save to tar -> import
* Note: temporary storage location is always the shared image volume and actions are always executed by the tools node
*/
if loadWithToolsNode {
err = importWithToolsNode(ctx, runtime, err, cluster, imagesFromRuntime, imagesFromTar, opts)
} else {
err = importWithStream(ctx, runtime, imagesFromRuntime, cluster, imagesFromTar)
}
if err != nil {
return err
}

l.Log().Infoln("Successfully imported image(s)")
return nil
}

func importWithToolsNode(ctx context.Context, runtime runtimes.Runtime, err error, cluster *k3d.Cluster, imagesFromRuntime []string, imagesFromTar []string, opts k3d.ImageImportOpts) error {
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
// create tools node to export images
toolsNode, err := EnsureToolsNode(ctx, runtime, cluster)
if err != nil {
return fmt.Errorf("failed to ensure that tools node is running: %w", err)
}

var importTarNames []string

if len(imagesFromRuntime) > 0 {
Expand Down Expand Up @@ -123,11 +166,97 @@ func ImageImportIntoClusterMulti(ctx context.Context, runtime runtimes.Runtime,
l.Log().Errorf("failed to delete tools node '%s' (try to delete it manually): %v", toolsNode.Name, err)
}
}
return nil
}

l.Log().Infoln("Successfully imported image(s)")
func importWithStream(ctx context.Context, runtime runtimes.Runtime, imagesFromRuntime []string, cluster *k3d.Cluster, imagesFromTar []string) error {
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved
if len(imagesFromRuntime) > 0 {
l.Log().Infof("Loading %d image(s) from runtime into nodes...", len(imagesFromRuntime))
// open a stream to all given images
stream, err := runtime.GetImageStream(ctx, imagesFromRuntime)
loadImageFromStream(ctx, runtime, stream, cluster)
iwilltry42 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("Could not open image stream for given images %s: %w", imagesFromRuntime, err)
}
// load the images directly into the nodes

}

if len(imagesFromTar) > 0 {
// copy tarfiles to shared volume
l.Log().Infof("Saving %d tarball(s) to shared image volume...", len(imagesFromTar))
sbaier1 marked this conversation as resolved.
Show resolved Hide resolved

files := make([]*os.File, len(imagesFromTar))
readers := make([]io.Reader, len(imagesFromTar))
failedFiles := 0
for i, fileName := range imagesFromTar {
file, err := os.Open(fileName)
if err != nil {
l.Log().Errorf("failed to read file '%s', skipping. Error below:\n%+v", fileName, err)
failedFiles++
continue
}
files[i] = file
readers[i] = file
}
multiReader := io.MultiReader(readers...)
loadImageFromStream(ctx, runtime, io.NopCloser(multiReader), cluster)
for _, file := range files {
err := file.Close()
if err != nil {
l.Log().Errorf("Failed to close file '%s' after reading. Error below:\n%+v", file.Name(), err)
}
}
}
return nil
}

func loadImageFromStream(ctx context.Context, runtime runtimes.Runtime, stream io.ReadCloser, cluster *k3d.Cluster) {
var importWaitgroup sync.WaitGroup

numNodes := 0
for _, node := range cluster.Nodes {
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole {
numNodes++
}
}
// multiplex the stream so we can write to multiple nodes
pipeReaders := make([]*io.PipeReader, numNodes)
pipeWriters := make([]io.Writer, numNodes)
for i := 0; i < numNodes; i++ {
reader, writer := io.Pipe()
pipeReaders[i] = reader
pipeWriters[i] = writer
}

go func() {
_, err := io.Copy(io.MultiWriter(pipeWriters...), stream)
if err != nil {
l.Log().Errorf("Failed to copy read stream. %v", err)
}
err = stream.Close()
if err != nil {
l.Log().Errorf("Failed to close stream. %v", err)
}
}()

pipeId := 0
for _, node := range cluster.Nodes {
// only import image in server and agent nodes (i.e. ignoring auxiliary nodes like the server loadbalancer)
if node.Role == k3d.ServerRole || node.Role == k3d.AgentRole {
importWaitgroup.Add(1)
go func(node *k3d.Node, wg *sync.WaitGroup, stream io.ReadCloser) {
l.Log().Infof("Importing images into node '%s'...", node.Name)
if err := runtime.ExecInNodeWithStdin(ctx, node, []string{"ctr", "image", "import", "-"}, stream); err != nil {
l.Log().Errorf("failed to import images in node '%s': %v", node.Name, err)
}
wg.Done()
}(node, &importWaitgroup, pipeReaders[pipeId])
pipeId++
}
}
importWaitgroup.Wait()
}

type runtimeImageGetter interface {
Expand Down
49 changes: 44 additions & 5 deletions pkg/runtimes/docker/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (d Docker) GetNodeLogs(ctx context.Context, node *k3d.Node, since time.Time

// ExecInNodeGetLogs executes a command inside a node and returns the logs to the caller, e.g. to parse them
func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []string) (*bufio.Reader, error) {
resp, err := executeInNode(ctx, node, cmd)
resp, err := executeInNode(ctx, node, cmd, nil)
if resp != nil {
defer resp.Close()
}
Expand All @@ -322,9 +322,23 @@ func (d Docker) ExecInNodeGetLogs(ctx context.Context, node *k3d.Node, cmd []str
return resp.Reader, nil
}

// GetImageStream creates a tar stream for the given images, to be read (and closed) by the caller
func (d Docker) GetImageStream(ctx context.Context, image []string) (io.ReadCloser, error) {
docker, err := GetDockerClient()
if err != nil {
return nil, err
}
reader, err := docker.ImageSave(ctx, image)
return reader, err
}

// ExecInNode execs a command inside a node
func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) error {
execConnection, err := executeInNode(ctx, node, cmd)
return execInNode(ctx, node, cmd, nil)
}

func execInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error {
execConnection, err := executeInNode(ctx, node, cmd, stdin)
if execConnection != nil {
defer execConnection.Close()
}
Expand All @@ -340,7 +354,11 @@ func (d Docker) ExecInNode(ctx context.Context, node *k3d.Node, cmd []string) er
return err
}

func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.HijackedResponse, error) {
func (d Docker) ExecInNodeWithStdin(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) error {
return execInNode(ctx, node, cmd, stdin)
}

func executeInNode(ctx context.Context, node *k3d.Node, cmd []string, stdin io.ReadCloser) (*types.HijackedResponse, error) {

l.Log().Debugf("Executing command '%+v' in node '%s'", cmd, node.Name)

Expand All @@ -357,12 +375,19 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi
}
defer docker.Close()

attachStdin := false
if stdin != nil {
attachStdin = true
}

// exec
exec, err := docker.ContainerExecCreate(ctx, container.ID, types.ExecConfig{
Privileged: true,
Tty: true,
Privileged: true,
// Don't use tty true when piping stdin.
Tty: !attachStdin,
AttachStderr: true,
AttachStdout: true,
AttachStdin: attachStdin,
Cmd: cmd,
})
if err != nil {
Expand All @@ -380,6 +405,20 @@ func executeInNode(ctx context.Context, node *k3d.Node, cmd []string) (*types.Hi
return nil, fmt.Errorf("docker failed to start exec process in node '%s': %w", node.Name, err)
}

// If we need to write to stdin pipe, start a new goroutine that writes the stream to stdin
if stdin != nil {
go func() {
_, err := io.Copy(execConnection.Conn, stdin)
if err != nil {
l.Log().Errorf("Failed to copy read stream. %v", err)
}
err = stdin.Close()
if err != nil {
l.Log().Errorf("Failed to close stdin stream. %v", err)
}
}()
}

for {
// get info about exec process inside container
execInfo, err := docker.ContainerExecInspect(ctx, exec.ID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/runtimes/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ type Runtime interface {
CreateVolume(context.Context, string, map[string]string) error
DeleteVolume(context.Context, string) error
GetVolume(string) (string, error)
GetImageStream(context.Context, []string) (io.ReadCloser, error)
GetRuntimePath() string // returns e.g. '/var/run/docker.sock' for a default docker setup
ExecInNode(context.Context, *k3d.Node, []string) error
ExecInNodeWithStdin(context.Context, *k3d.Node, []string, io.ReadCloser) error
ExecInNodeGetLogs(context.Context, *k3d.Node, []string) (*bufio.Reader, error)
GetNodeLogs(context.Context, *k3d.Node, time.Time, *runtimeTypes.NodeLogsOpts) (io.ReadCloser, error)
GetImages(context.Context) ([]string, error)
Expand Down
Loading