From 6c3ae557cbc9b31c6f332fb5417e0c7632e02a0b Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 17:08:34 -0500 Subject: [PATCH 1/6] rpk/start: Add --node-id, --kafka-addr & --rpc-addr flags --- src/go/rpk/pkg/cli/cmd/start.go | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/go/rpk/pkg/cli/cmd/start.go b/src/go/rpk/pkg/cli/cmd/start.go index d9cf1d34345b..68614b79c771 100644 --- a/src/go/rpk/pkg/cli/cmd/start.go +++ b/src/go/rpk/pkg/cli/cmd/start.go @@ -80,7 +80,10 @@ func NewStartCommand(fs afero.Fs, mgr config.Manager) *cobra.Command { prestartCfg := prestartConfig{} var ( configFile string + nodeID uint seeds []string + kafkaAddr string + rpcAddr string advertisedKafka string advertisedRPC string installDirFlag string @@ -118,6 +121,33 @@ func NewStartCommand(fs afero.Fs, mgr config.Manager) *cobra.Command { if len(seedServers) != 0 { conf.Redpanda.SeedServers = seedServers } + + kafkaAddr = stringOr( + kafkaAddr, + os.Getenv("REDPANDA_KAFKA_ADDRESS"), + ) + kafkaApi, err := parseAddress(kafkaAddr) + if err != nil { + sendEnv(mgr, env, conf, err) + return err + } + if kafkaApi != nil { + conf.Redpanda.KafkaApi = *kafkaApi + } + + rpcAddr = stringOr( + rpcAddr, + os.Getenv("REDPANDA_RPC_ADDRESS"), + ) + rpcServer, err := parseAddress(rpcAddr) + if err != nil { + sendEnv(mgr, env, conf, err) + return err + } + if rpcServer != nil { + conf.Redpanda.RPCServer = *rpcServer + } + advertisedKafka = stringOr( advertisedKafka, os.Getenv("REDPANDA_ADVERTISE_KAFKA_ADDRESS"), @@ -193,6 +223,14 @@ func NewStartCommand(fs afero.Fs, mgr config.Manager) *cobra.Command { " in the default locations", ) mgr.BindFlag("config_file", command.Flags().Lookup("config")) + command.Flags().UintVar( + &nodeID, + "node-id", + 0, + "The node ID. Must be an integer and must be unique"+ + " within a cluster", + ) + mgr.BindFlag("redpanda.node_id", command.Flags().Lookup("node-id")) command.Flags().StringSliceVarP( &seeds, "seeds", @@ -201,6 +239,18 @@ func NewStartCommand(fs afero.Fs, mgr config.Manager) *cobra.Command { "A list of seed nodes to connect to, in the format "+ seedFormat, ) + command.Flags().StringVar( + &kafkaAddr, + "kafka-addr", + "", + "The Kafka address to bind to (:)", + ) + command.Flags().StringVar( + &rpcAddr, + "rpc-addr", + "", + "The RPC address to bind to (:)", + ) command.Flags().StringVar( &advertisedKafka, "advertise-kafka-addr", From 5eed257a47cbc06828b3f53d35613fe3eb00b4e8 Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 18:29:47 -0500 Subject: [PATCH 2/6] rpk/container/client: Add ContainerList --- src/go/rpk/pkg/cli/cmd/container/common/client.go | 5 +++++ src/go/rpk/pkg/cli/cmd/container/common/test.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/go/rpk/pkg/cli/cmd/container/common/client.go b/src/go/rpk/pkg/cli/cmd/container/common/client.go index 899a55bd4993..965d09775148 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/client.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/client.go @@ -56,6 +56,11 @@ type Client interface { timeout *time.Duration, ) error + ContainerList( + ctx context.Context, + options types.ContainerListOptions, + ) ([]types.Container, error) + ContainerInspect( ctx context.Context, containerID string, diff --git a/src/go/rpk/pkg/cli/cmd/container/common/test.go b/src/go/rpk/pkg/cli/cmd/container/common/test.go index 529c6823c05b..d691b71175d8 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/test.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/test.go @@ -56,6 +56,11 @@ type MockClient struct { timeout *time.Duration, ) error + MockContainerList func( + ctx context.Context, + options types.ContainerListOptions, + ) ([]types.Container, error) + MockContainerInspect func( ctx context.Context, containerID string, @@ -158,6 +163,15 @@ func (c *MockClient) ContainerStop( return nil } +func (c *MockClient) ContainerList( + ctx context.Context, options types.ContainerListOptions, +) ([]types.Container, error) { + if c.MockContainerList != nil { + return c.MockContainerList(ctx, options) + } + return []types.Container{}, nil +} + func (c *MockClient) ContainerInspect( ctx context.Context, containerID string, ) (types.ContainerJSON, error) { From 408ab33284f5c8359d47924fe545b27b7960c528 Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 18:33:47 -0500 Subject: [PATCH 3/6] rpk/container/common: Make GetExistingNodes return []*NodeState --- src/go/rpk/pkg/cli/cmd/api.go | 12 +- .../pkg/cli/cmd/container/common/common.go | 54 ++++--- src/go/rpk/pkg/cli/cmd/container/purge.go | 8 +- .../rpk/pkg/cli/cmd/container/purge_test.go | 146 +++++++++++++++++- src/go/rpk/pkg/cli/cmd/container/start.go | 23 +-- .../rpk/pkg/cli/cmd/container/start_test.go | 54 +++---- src/go/rpk/pkg/cli/cmd/container/stop.go | 28 ++-- src/go/rpk/pkg/cli/cmd/container/stop_test.go | 44 +++++- 8 files changed, 269 insertions(+), 100 deletions(-) diff --git a/src/go/rpk/pkg/cli/cmd/api.go b/src/go/rpk/pkg/cli/cmd/api.go index 161ab1f6491a..69c2f441a712 100644 --- a/src/go/rpk/pkg/cli/cmd/api.go +++ b/src/go/rpk/pkg/cli/cmd/api.go @@ -232,12 +232,12 @@ func createAdmin( } func containerBrokers(fs afero.Fs) []string { - nodeIDs, err := common.GetExistingNodes(fs) + c, err := common.NewDockerClient() if err != nil { log.Debug(err) return []string{} } - c, err := common.NewDockerClient() + nodes, err := common.GetExistingNodes(c) if err != nil { log.Debug(err) return []string{} @@ -245,13 +245,9 @@ func containerBrokers(fs afero.Fs) []string { grp := errgroup.Group{} mu := sync.Mutex{} addrs := []string{} - for _, nodeID := range nodeIDs { - id := nodeID + for _, node := range nodes { + s := node grp.Go(func() error { - s, err := common.GetState(c, id) - if err != nil { - return err - } mu.Lock() defer mu.Unlock() addrs = append( diff --git a/src/go/rpk/pkg/cli/cmd/container/common/common.go b/src/go/rpk/pkg/cli/cmd/container/common/common.go index 4c3a64c8f5f7..e6e2b0e413b0 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/common.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/common.go @@ -16,7 +16,6 @@ import ( "fmt" "os" "path/filepath" - "regexp" "strconv" "strings" "time" @@ -83,32 +82,41 @@ func ClusterDir() string { return filepath.Join(home, ".rpk", "cluster") } -func GetExistingNodes(fs afero.Fs) ([]uint, error) { - nodeIDs := []uint{} - nodeDirs, err := afero.ReadDir(fs, ClusterDir()) +func GetExistingNodes(c Client) ([]*NodeState, error) { + regExp := `^/rp-node-[\d]+` + filters := filters.NewArgs() + filters.Add("name", regExp) + ctx, _ := DefaultCtx() + containers, err := c.ContainerList( + ctx, + types.ContainerListOptions{ + All: true, + Filters: filters, + }, + ) if err != nil { - if os.IsNotExist(err) { - return nodeIDs, nil + return nil, err + } + if len(containers) == 0 { + return []*NodeState{}, nil + } + + nodes := make([]*NodeState, len(containers)) + for i, cont := range containers { + nodeIDStr := cont.Labels["node-id"] + nodeID, err := strconv.ParseUint(nodeIDStr, 10, 64) + if err != nil { + return nil, fmt.Errorf( + "Couldn't parse node ID: '%s'", + nodeIDStr, + ) } - return nodeIDs, err - } - nameRegExp := regexp.MustCompile(`node-[\d]+`) - for _, nodeDir := range nodeDirs { - if nodeDir.IsDir() && nameRegExp.Match([]byte(nodeDir.Name())) { - nameParts := strings.Split(nodeDir.Name(), "-") - nodeIDStr := nameParts[len(nameParts)-1] - nodeID, err := strconv.ParseUint(nodeIDStr, 10, 64) - if err != nil { - return nodeIDs, fmt.Errorf( - "Couldn't parse node ID '%s': %v", - nodeIDStr, - err, - ) - } - nodeIDs = append(nodeIDs, uint(nodeID)) + nodes[i], err = GetState(c, uint(nodeID)) + if err != nil { + return nil, err } } - return nodeIDs, nil + return nodes, nil } func GetState(c Client, nodeID uint) (*NodeState, error) { diff --git a/src/go/rpk/pkg/cli/cmd/container/purge.go b/src/go/rpk/pkg/cli/cmd/container/purge.go index e7366e432212..c12dcc6e1b4c 100644 --- a/src/go/rpk/pkg/cli/cmd/container/purge.go +++ b/src/go/rpk/pkg/cli/cmd/container/purge.go @@ -39,11 +39,11 @@ func Purge(fs afero.Fs) *cobra.Command { } func purgeCluster(fs afero.Fs, c common.Client) error { - nodeIDs, err := common.GetExistingNodes(fs) + nodes, err := common.GetExistingNodes(c) if err != nil { return err } - if len(nodeIDs) == 0 { + if len(nodes) == 0 { log.Info( `No nodes to remove. You may start a new local cluster with 'rpk container start'`, @@ -55,8 +55,8 @@ You may start a new local cluster with 'rpk container start'`, return err } grp, _ := errgroup.WithContext(context.Background()) - for _, nodeID := range nodeIDs { - id := nodeID + for _, node := range nodes { + id := node.ID grp.Go(func() error { err := common.RemoveNodeDir(fs, id) if err != nil { diff --git a/src/go/rpk/pkg/cli/cmd/container/purge_test.go b/src/go/rpk/pkg/cli/cmd/container/purge_test.go index 32a61c56dd50..08f624a8635d 100644 --- a/src/go/rpk/pkg/cli/cmd/container/purge_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/purge_test.go @@ -36,6 +36,32 @@ func TestPurge(t *testing.T) { name: "it should log if the containers can't be stopped", client: func() (common.Client, error) { return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + { + ID: "b", + Labels: map[string]string{ + "node-id": "1", + }, + }, + { + ID: "c", + Labels: map[string]string{ + "node-id": "2", + }, + }, + }, nil + }, MockContainerStop: func( _ context.Context, _ string, @@ -66,7 +92,34 @@ func TestPurge(t *testing.T) { { name: "it should stop the current cluster", client: func() (common.Client, error) { - return &common.MockClient{}, nil + return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + { + ID: "b", + Labels: map[string]string{ + "node-id": "1", + }, + }, + { + ID: "c", + Labels: map[string]string{ + "node-id": "2", + }, + }, + }, nil + }, + }, nil }, before: func(fs afero.Fs) error { err := fs.MkdirAll(common.ConfDir(0), 0755) @@ -93,6 +146,20 @@ func TestPurge(t *testing.T) { name: "it should fail if it fails to remove a container", client: func() (common.Client, error) { return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, MockContainerRemove: func( context.Context, string, @@ -111,6 +178,20 @@ func TestPurge(t *testing.T) { name: "it should fail if it fails to delete the network", client: func() (common.Client, error) { return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, MockNetworkRemove: func( context.Context, string, @@ -128,6 +209,20 @@ func TestPurge(t *testing.T) { name: "it should succeed if the network has been removed", client: func() (common.Client, error) { return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, MockNetworkRemove: func( context.Context, string, @@ -144,6 +239,55 @@ func TestPurge(t *testing.T) { }, expectedOutput: []string{"Deleted cluster data."}, }, + { + name: "it should fail if it fails to list the containers", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return nil, errors.New("Can't list") + }, + }, nil + }, + before: func(fs afero.Fs) error { + return fs.MkdirAll(common.ConfDir(0), 0755) + }, + expectedErrMsg: "Can't list", + }, + { + name: "it should fail if it fails to inspect a container", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerInspect: func( + _ context.Context, + _ string, + ) (types.ContainerJSON, error) { + return types.ContainerJSON{}, + errors.New("Can't inspect") + }, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, + }, nil + }, + before: func(fs afero.Fs) error { + return fs.MkdirAll(common.ConfDir(0), 0755) + }, + expectedErrMsg: "Can't inspect", + }, } for _, tt := range tests { t.Run(tt.name, func(st *testing.T) { diff --git a/src/go/rpk/pkg/cli/cmd/container/start.go b/src/go/rpk/pkg/cli/cmd/container/start.go index 2258dfd46895..c39babd76fb8 100644 --- a/src/go/rpk/pkg/cli/cmd/container/start.go +++ b/src/go/rpk/pkg/cli/cmd/container/start.go @@ -249,33 +249,20 @@ func startCluster( func restartCluster(fs afero.Fs, c common.Client) ([]node, error) { // Check if a cluster is running - nodeIDs, err := common.GetExistingNodes(fs) + states, err := common.GetExistingNodes(c) if err != nil { return nil, err } // If there isn't an existing cluster, there's nothing to restart. - if len(nodeIDs) == 0 { + if len(states) == 0 { return nil, nil } grp, _ := errgroup.WithContext(context.Background()) mu := sync.Mutex{} nodes := []node{} - for _, nodeID := range nodeIDs { - id := nodeID + for _, s := range states { + state := s grp.Go(func() error { - state, err := common.GetState(c, id) - if err != nil { - if c.IsErrNotFound(err) { - msg := "Found data for an existing" + - " cluster, but the container" + - " for node %d was removed.\n" + - "Please run 'rpk container" + - " purge' to delete all" + - " remaining data." - return fmt.Errorf(msg, id) - } - return err - } if !state.Running { ctx, _ := common.DefaultCtx() err = c.ContainerStart( @@ -286,7 +273,7 @@ func restartCluster(fs afero.Fs, c common.Client) ([]node, error) { if err != nil { return err } - state, err = common.GetState(c, id) + state, err = common.GetState(c, state.ID) if err != nil { return err } diff --git a/src/go/rpk/pkg/cli/cmd/container/start_test.go b/src/go/rpk/pkg/cli/cmd/container/start_test.go index 3618b9b16e5c..bec4a6ebb6a3 100644 --- a/src/go/rpk/pkg/cli/cmd/container/start_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/start_test.go @@ -371,37 +371,33 @@ Please check your internet connection and try again.`, { name: "it should do nothing if there's an existing running cluster", nodes: 1, - client: func() (common.Client, error) { - return &common.MockClient{}, nil - }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles( - fs, - st, - true, - common.ConfDir(0), - ) - require.NoError(st, err) - require.True(st, ok) - }, - }, - { - name: "it should fail if there's an existing cluster but the containers were removed", - nodes: 1, client: func() (common.Client, error) { return &common.MockClient{ - MockContainerInspect: func( + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( _ context.Context, - _ string, - ) (types.ContainerJSON, error) { - return types.ContainerJSON{}, - errors.New("Image not found") - }, - MockIsErrNotFound: func(_ error) bool { - return true + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + { + ID: "b", + Labels: map[string]string{ + "node-id": "1", + }, + }, + { + ID: "c", + Labels: map[string]string{ + "node-id": "2", + }, + }, + }, nil }, }, nil }, @@ -418,8 +414,6 @@ Please check your internet connection and try again.`, require.NoError(st, err) require.True(st, ok) }, - expectedErrMsg: `Found data for an existing cluster, but the container for node 0 was removed. -Please run 'rpk container purge' to delete all remaining data.`, }, } diff --git a/src/go/rpk/pkg/cli/cmd/container/stop.go b/src/go/rpk/pkg/cli/cmd/container/stop.go index 0ab1dc3aac37..8a4495461dd3 100644 --- a/src/go/rpk/pkg/cli/cmd/container/stop.go +++ b/src/go/rpk/pkg/cli/cmd/container/stop.go @@ -37,11 +37,11 @@ func Stop(fs afero.Fs) *cobra.Command { } func stopCluster(fs afero.Fs, c common.Client) error { - nodeIDs, err := common.GetExistingNodes(fs) + nodes, err := common.GetExistingNodes(c) if err != nil { return err } - if len(nodeIDs) == 0 { + if len(nodes) == 0 { log.Info( `No cluster available. You may start a new cluster with 'rpk container start'`, @@ -49,36 +49,34 @@ You may start a new cluster with 'rpk container start'`, } wg := sync.WaitGroup{} - wg.Add(len(nodeIDs)) - for _, nodeID := range nodeIDs { - go func(id uint) { + wg.Add(len(nodes)) + for _, node := range nodes { + go func(state *common.NodeState) { defer wg.Done() - state, err := common.GetState(c, id) - if err != nil { - log.Errorf("Couldn't get node %d's state", id) - return - } // If the node was stopped already, do nothing. if !state.Running { - log.Infof("Node %d was stopped already.", id) + log.Infof( + "Node %d was stopped already.", + state.ID, + ) return } - log.Infof("Stopping node %d", id) + log.Infof("Stopping node %d", state.ID) ctx := context.Background() // Redpanda sometimes takes a while to stop, so 20 // seconds is a safe estimate timeout := 20 * time.Second err = c.ContainerStop( ctx, - common.Name(id), + common.Name(state.ID), &timeout, ) if err != nil { - log.Errorf("Couldn't stop node %d", id) + log.Errorf("Couldn't stop node %d", state.ID) log.Debug(err) return } - }(nodeID) + }(node) } wg.Wait() return nil diff --git a/src/go/rpk/pkg/cli/cmd/container/stop_test.go b/src/go/rpk/pkg/cli/cmd/container/stop_test.go index 6630ef6cd573..fb52233233ca 100644 --- a/src/go/rpk/pkg/cli/cmd/container/stop_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/stop_test.go @@ -17,6 +17,7 @@ import ( "time" "vectorized/pkg/cli/cmd/container/common" + "github.com/docker/docker/api/types" "github.com/sirupsen/logrus" "github.com/spf13/afero" "github.com/stretchr/testify/require" @@ -35,6 +36,32 @@ func TestStop(t *testing.T) { name: "it should log if the container can't be stopped", client: func() (common.Client, error) { return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + { + ID: "b", + Labels: map[string]string{ + "node-id": "1", + }, + }, + { + ID: "c", + Labels: map[string]string{ + "node-id": "2", + }, + }, + }, nil + }, MockContainerStop: func( _ context.Context, _ string, @@ -65,7 +92,22 @@ func TestStop(t *testing.T) { { name: "it should stop the current cluster", client: func() (common.Client, error) { - return &common.MockClient{}, nil + return &common.MockClient{ + MockContainerInspect: common.MockContainerInspect, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, + }, nil }, before: func(fs afero.Fs) error { return fs.MkdirAll(common.ConfDir(0), 0755) From 5f8b6f3c9b6ec4ec1e466c24595f8fd8850c01ea Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 19:50:14 -0500 Subject: [PATCH 4/6] rpk/container: Don't rely on volumes Redpanda runs under the 'redpanda' user in the containers. When `rpk container start` was run, it mounted the configuration dir as a volume, messing up the permissions of the volume's target path (/etc/redpanda)inside the container, which made rpk fail when it tried to write the config to /etc/redpanda/redpanda.yaml. Use the --seeds, --kafka-addr, --rpc-addr, --advertise-kafka-addr, --advertise-rpc-addr & --node-id flags to remove the need for a config file. --- src/go/rpk/pkg/cli/cmd/container.go | 10 +- .../pkg/cli/cmd/container/common/common.go | 113 +++----------- .../rpk/pkg/cli/cmd/container/common/test.go | 17 --- src/go/rpk/pkg/cli/cmd/container/purge.go | 17 +-- .../rpk/pkg/cli/cmd/container/purge_test.go | 48 +----- src/go/rpk/pkg/cli/cmd/container/start.go | 77 +++++----- .../rpk/pkg/cli/cmd/container/start_test.go | 141 ++++++------------ src/go/rpk/pkg/cli/cmd/container/stop.go | 7 +- src/go/rpk/pkg/cli/cmd/container/stop_test.go | 60 ++++++-- src/go/rpk/pkg/cli/cmd/root.go | 2 +- 10 files changed, 160 insertions(+), 332 deletions(-) diff --git a/src/go/rpk/pkg/cli/cmd/container.go b/src/go/rpk/pkg/cli/cmd/container.go index 553042fa51b3..620e48b90db3 100644 --- a/src/go/rpk/pkg/cli/cmd/container.go +++ b/src/go/rpk/pkg/cli/cmd/container.go @@ -11,21 +11,19 @@ package cmd import ( "vectorized/pkg/cli/cmd/container" - "vectorized/pkg/config" - "github.com/spf13/afero" "github.com/spf13/cobra" ) -func NewContainerCommand(fs afero.Fs, mgr config.Manager) *cobra.Command { +func NewContainerCommand() *cobra.Command { command := &cobra.Command{ Use: "container", Short: "Manage a local container cluster", } - command.AddCommand(container.Start(fs, mgr)) - command.AddCommand(container.Stop(fs)) - command.AddCommand(container.Purge(fs)) + command.AddCommand(container.Start()) + command.AddCommand(container.Stop()) + command.AddCommand(container.Purge()) return command } diff --git a/src/go/rpk/pkg/cli/cmd/container/common/common.go b/src/go/rpk/pkg/cli/cmd/container/common/common.go index e6e2b0e413b0..8e5a27d8e1c2 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/common.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/common.go @@ -14,8 +14,6 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" "strconv" "strings" "time" @@ -24,12 +22,10 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" "github.com/docker/go-connections/nat" log "github.com/sirupsen/logrus" - "github.com/spf13/afero" ) const ( @@ -61,27 +57,6 @@ func DefaultCtx() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), defaultDockerClientTimeout) } -func ConfPath(nodeID uint) string { - return filepath.Join(ConfDir(nodeID), "redpanda.yaml") -} - -func DataDir(nodeID uint) string { - return filepath.Join(NodeDir(nodeID), "data") -} - -func ConfDir(nodeID uint) string { - return filepath.Join(NodeDir(nodeID), "conf") -} - -func NodeDir(nodeID uint) string { - return filepath.Join(ClusterDir(), fmt.Sprintf("node-%d", nodeID)) -} - -func ClusterDir() string { - home, _ := os.UserHomeDir() - return filepath.Join(home, ".rpk", "cluster") -} - func GetExistingNodes(c Client) ([]*NodeState, error) { regExp := `^/rp-node-[\d]+` filters := filters.NewArgs() @@ -148,7 +123,6 @@ func GetState(c Client, nodeID uint) (*NodeState, error) { return &NodeState{ Running: containerJSON.State.Running, Status: containerJSON.State.Status, - ConfigFile: ConfPath(nodeID), ContainerID: containerJSON.ID, ContainerIP: ipAddress, HostKafkaPort: hostKafkaPort, @@ -216,8 +190,8 @@ func RemoveNetwork(c Client) error { } func CreateNode( - fs afero.Fs, c Client, nodeID, - kafkaPort, rpcPort uint, netID string, + c Client, nodeID, + kafkaPort, rpcPort uint, netID string, args ...string, ) (*NodeState, error) { rPort, err := nat.NewPort( "tcp", @@ -233,10 +207,28 @@ func CreateNode( if err != nil { return nil, err } + ip, err := nodeIP(c, netID, nodeID) + if err != nil { + return nil, err + } hostname := Name(nodeID) + cmd := []string{ + "start", + "--node-id", + fmt.Sprintf("%d", nodeID), + "--kafka-addr", + fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.KafkaApi.Port), + "--rpc-addr", + fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.RPCServer.Port), + "--advertise-kafka-addr", + fmt.Sprintf("127.0.0.1:%d", kafkaPort), + "--advertise-rpc-addr", + fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.RPCServer.Port), + } containerConfig := container.Config{ Image: redpandaImageBase, Hostname: hostname, + Cmd: append(cmd, args...), ExposedPorts: nat.PortSet{ rPort: {}, kPort: {}, @@ -247,18 +239,6 @@ func CreateNode( }, } hostConfig := container.HostConfig{ - Mounts: []mount.Mount{ - mount.Mount{ - Type: mount.TypeBind, - Target: "/opt/redpanda/data", - Source: DataDir(nodeID), - }, - mount.Mount{ - Type: mount.TypeBind, - Target: "/etc/redpanda", - Source: ConfDir(nodeID), - }, - }, PortBindings: nat.PortMap{ rPort: []nat.PortBinding{nat.PortBinding{ HostPort: fmt.Sprint(rpcPort), @@ -268,10 +248,6 @@ func CreateNode( }}, }, } - ip, err := nodeIP(c, netID, nodeID) - if err != nil { - return nil, err - } networkConfig := network.NetworkingConfig{ EndpointsConfig: map[string]*network.EndpointSettings{ redpandaNetwork: &network.EndpointSettings{ @@ -283,12 +259,6 @@ func CreateNode( }, } - err = createNodeDirs(fs, nodeID) - if err != nil { - RemoveNodeDir(fs, nodeID) - return nil, err - } - ctx, _ := DefaultCtx() container, err := c.ContainerCreate( ctx, @@ -298,11 +268,9 @@ func CreateNode( hostname, ) if err != nil { - RemoveNodeDir(fs, nodeID) return nil, err } return &NodeState{ - ConfigFile: ConfPath(nodeID), HostKafkaPort: kafkaPort, ID: nodeID, ContainerID: container.ID, @@ -388,47 +356,6 @@ func nodeIP(c Client, netID string, id uint) (string, error) { return strings.Join(octets, "."), nil } -func createNodeDirs(fs afero.Fs, nodeID uint) error { - dir := DataDir(nodeID) - // If it doesn't exist already, create a directory for the node's data. - exists, err := afero.DirExists(fs, dir) - if err != nil { - return err - } - if !exists { - err = fs.MkdirAll(dir, 0755) - if err != nil { - return err - } - } - - dir = ConfDir(nodeID) - // If it doesn't exist already, create a directory for the node's config. - exists, err = afero.DirExists(fs, dir) - if err != nil { - return err - } - if !exists { - err = fs.MkdirAll(dir, 0755) - if err != nil { - return err - } - } - return nil -} - -func RemoveNodeDir(fs afero.Fs, nodeID uint) error { - err := fs.RemoveAll(NodeDir(nodeID)) - if err != nil { - log.Debugf( - "Got an error while removing node %d's dir: %v", - nodeID, - err, - ) - } - return err -} - func WrapIfConnErr(err error) error { if client.IsErrConnectionFailed(err) { msg := `Couldn't connect to docker. diff --git a/src/go/rpk/pkg/cli/cmd/container/common/test.go b/src/go/rpk/pkg/cli/cmd/container/common/test.go index d691b71175d8..b67e4bfa77b0 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/test.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/test.go @@ -12,14 +12,12 @@ package common import ( "context" "io" - "testing" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/docker/go-connections/nat" - "github.com/spf13/afero" ) type MockClient struct { @@ -238,21 +236,6 @@ func (c *MockClient) IsErrConnectionFailed(err error) bool { return false } -func CheckFiles( - fs afero.Fs, st *testing.T, shouldExist bool, paths ...string, -) (bool, error) { - for _, p := range paths { - exists, err := afero.Exists(fs, p) - if err != nil { - return false, err - } - if exists != shouldExist { - return false, nil - } - } - return true, nil -} - func MockContainerInspect( _ context.Context, _ string, ) (types.ContainerJSON, error) { diff --git a/src/go/rpk/pkg/cli/cmd/container/purge.go b/src/go/rpk/pkg/cli/cmd/container/purge.go index c12dcc6e1b4c..eaf90ed46ff9 100644 --- a/src/go/rpk/pkg/cli/cmd/container/purge.go +++ b/src/go/rpk/pkg/cli/cmd/container/purge.go @@ -11,17 +11,15 @@ package container import ( "context" - "os" "vectorized/pkg/cli/cmd/container/common" "github.com/docker/docker/api/types" log "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) -func Purge(fs afero.Fs) *cobra.Command { +func Purge() *cobra.Command { command := &cobra.Command{ Use: "purge", Short: "Stop and remove an existing local container cluster's data", @@ -31,14 +29,14 @@ func Purge(fs afero.Fs) *cobra.Command { return err } defer c.Close() - return common.WrapIfConnErr(purgeCluster(fs, c)) + return common.WrapIfConnErr(purgeCluster(c)) }, } return command } -func purgeCluster(fs afero.Fs, c common.Client) error { +func purgeCluster(c common.Client) error { nodes, err := common.GetExistingNodes(c) if err != nil { return err @@ -50,7 +48,7 @@ You may start a new local cluster with 'rpk container start'`, ) return nil } - err = stopCluster(fs, c) + err = stopCluster(c) if err != nil { return err } @@ -58,13 +56,6 @@ You may start a new local cluster with 'rpk container start'`, for _, node := range nodes { id := node.ID grp.Go(func() error { - err := common.RemoveNodeDir(fs, id) - if err != nil { - if !os.IsNotExist(err) { - return err - } - } - log.Infof("Deleted data for node %d", id) ctx, _ := common.DefaultCtx() name := common.Name(id) err = c.ContainerRemove( diff --git a/src/go/rpk/pkg/cli/cmd/container/purge_test.go b/src/go/rpk/pkg/cli/cmd/container/purge_test.go index 08f624a8635d..463ba48f360c 100644 --- a/src/go/rpk/pkg/cli/cmd/container/purge_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/purge_test.go @@ -19,7 +19,6 @@ import ( "github.com/docker/docker/api/types" "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/stretchr/testify/require" ) @@ -29,8 +28,6 @@ func TestPurge(t *testing.T) { client func() (common.Client, error) expectedErrMsg string expectedOutput []string - before func(afero.Fs) error - check func(afero.Fs, *testing.T) }{ { name: "it should log if the containers can't be stopped", @@ -71,10 +68,11 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedOutput: []string{ + "Stopping node 2", + "Couldn't stop node 2", + "Stopping node 1", + "Couldn't stop node 1", "Stopping node 0", "Couldn't stop node 0", "Don't stop me now", @@ -121,24 +119,10 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - err := fs.MkdirAll(common.ConfDir(0), 0755) - if err != nil { - return err - } - err = fs.MkdirAll(common.ConfDir(1), 0755) - if err != nil { - return err - } - return fs.MkdirAll(common.ConfDir(2), 0755) - }, expectedOutput: []string{ "Stopping node 0", - "Deleted data for node 0", "Removed container 'rp-node-0'", - "Deleted data for node 1", "Removed container 'rp-node-1'", - "Deleted data for node 2", "Removed container 'rp-node-2'", }, }, @@ -169,9 +153,6 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedErrMsg: "Not going anywhere!", }, { @@ -200,9 +181,6 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedErrMsg: "Can't delete network", }, { @@ -234,9 +212,6 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedOutput: []string{"Deleted cluster data."}, }, { @@ -252,9 +227,6 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedErrMsg: "Can't list", }, { @@ -283,24 +255,17 @@ func TestPurge(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedErrMsg: "Can't inspect", }, } for _, tt := range tests { t.Run(tt.name, func(st *testing.T) { var out bytes.Buffer - fs := afero.NewMemMapFs() - if tt.before != nil { - require.NoError(st, tt.before(fs)) - } c, err := tt.client() require.NoError(st, err) logrus.SetOutput(&out) logrus.SetLevel(logrus.DebugLevel) - err = purgeCluster(fs, c) + err = purgeCluster(c) if tt.expectedErrMsg != "" { require.EqualError(st, err, tt.expectedErrMsg) return @@ -316,9 +281,6 @@ func TestPurge(t *testing.T) { ) } } - if tt.check != nil { - tt.check(fs, st) - } }) } } diff --git a/src/go/rpk/pkg/cli/cmd/container/start.go b/src/go/rpk/pkg/cli/cmd/container/start.go index c39babd76fb8..7825adc7edba 100644 --- a/src/go/rpk/pkg/cli/cmd/container/start.go +++ b/src/go/rpk/pkg/cli/cmd/container/start.go @@ -23,7 +23,6 @@ import ( "github.com/docker/docker/api/types" log "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -33,7 +32,7 @@ type node struct { addr string } -func Start(fs afero.Fs, mgr config.Manager) *cobra.Command { +func Start() *cobra.Command { var ( nodes uint ) @@ -53,8 +52,6 @@ func Start(fs afero.Fs, mgr config.Manager) *cobra.Command { defer c.Close() return common.WrapIfConnErr(startCluster( - fs, - mgr, c, nodes, )) @@ -72,11 +69,9 @@ func Start(fs afero.Fs, mgr config.Manager) *cobra.Command { return command } -func startCluster( - fs afero.Fs, mgr config.Manager, c common.Client, n uint, -) error { +func startCluster(c common.Client, n uint) error { // Check if cluster exists and start it again. - restarted, err := restartCluster(fs, c) + restarted, err := restartCluster(c) if err != nil { return err } @@ -116,19 +111,6 @@ func startCluster( } } - dir := common.ClusterDir() - // If it doesn't exist already, create a directory for the cluster. - exists, err := afero.DirExists(fs, dir) - if err != nil { - return err - } - if !exists { - err = fs.MkdirAll(dir, 0755) - if err != nil { - return err - } - } - // Create the docker network if it doesn't exist already netID, err := common.CreateNetwork(c) if err != nil { @@ -146,7 +128,6 @@ func startCluster( return err } seedState, err := common.CreateNode( - fs, c, seedID, seedKafkaPort, @@ -160,13 +141,12 @@ func startCluster( coreCount := int(math.Max(1, float64(runtime.NumCPU())/float64(n))) log.Info("Starting cluster") - seedIP, seedKafkaPort, err := startNode( - mgr, + err = startNode( c, seedID, seedKafkaPort, seedRPCPort, - 0, + seedID, seedState.ContainerID, seedState.ContainerIP, "", @@ -176,7 +156,10 @@ func startCluster( return err } - seedNode := node{0, fmt.Sprintf("%s:%d", seedIP, seedKafkaPort)} + seedNode := node{ + seedID, + fmt.Sprintf("%s:%d", seedState.ContainerIP, seedKafkaPort), + } nodes := []node{seedNode} @@ -195,7 +178,23 @@ func startCluster( if err != nil { return err } - state, err := common.CreateNode(fs, c, id, kafkaPort, rpcPort, netID) + args := []string{ + "--seeds", + fmt.Sprintf( + "%s:%d+%d", + seedState.ContainerIP, + config.Default().Redpanda.RPCServer.Port, + seedID, + ), + } + state, err := common.CreateNode( + c, + id, + kafkaPort, + rpcPort, + netID, + args..., + ) if err != nil { return err } @@ -205,8 +204,7 @@ func startCluster( state.ContainerIP, state.ContainerID, ) - ip, port, err := startNode( - mgr, + err = startNode( c, id, kafkaPort, @@ -225,8 +223,8 @@ func startCluster( id, fmt.Sprintf( "%s:%d", - ip, - port, + state.ContainerIP, + state.HostKafkaPort, ), }) mu.Unlock() @@ -247,7 +245,7 @@ func startCluster( return nil } -func restartCluster(fs afero.Fs, c common.Client) ([]node, error) { +func restartCluster(c common.Client) ([]node, error) { // Check if a cluster is running states, err := common.GetExistingNodes(c) if err != nil { @@ -296,20 +294,14 @@ func restartCluster(fs afero.Fs, c common.Client) ([]node, error) { } func startNode( - mgr config.Manager, c common.Client, nodeID, kafkaPort, rpcPort, seedRPCPort uint, containerID, ip, seedIP string, cores int, -) (string, uint, error) { - conf, err := writeNodeConfig(mgr, nodeID, kafkaPort, rpcPort, seedRPCPort, ip, seedIP, common.ConfPath(nodeID), cores) - if err != nil { - return "", 0, err - } +) error { ctx, _ := common.DefaultCtx() - err = c.ContainerStart(ctx, containerID, types.ContainerStartOptions{}) - advert := conf.Redpanda.AdvertisedKafkaApi - return advert.Address, uint(advert.Port), err + err := c.ContainerStart(ctx, containerID, types.ContainerStartOptions{}) + return err } func writeNodeConfig( @@ -353,12 +345,11 @@ func renderClusterInfo(nodes []node) { t := ui.NewRpkTable(log.StandardLogger().Out) t.SetColWidth(80) t.SetAutoWrapText(true) - t.SetHeader([]string{"Node ID", "Address", "Config"}) + t.SetHeader([]string{"Node ID", "Address"}) for _, node := range nodes { t.Append([]string{ fmt.Sprint(node.id), node.addr, - common.ConfPath(node.id), }) } diff --git a/src/go/rpk/pkg/cli/cmd/container/start_test.go b/src/go/rpk/pkg/cli/cmd/container/start_test.go index bec4a6ebb6a3..eca61bfb77e1 100644 --- a/src/go/rpk/pkg/cli/cmd/container/start_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/start_test.go @@ -16,13 +16,11 @@ import ( "io" "testing" "vectorized/pkg/cli/cmd/container/common" - "vectorized/pkg/config" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/stretchr/testify/require" ) @@ -33,8 +31,6 @@ func TestStart(t *testing.T) { nodes uint expectedErrMsg string expectedOutput string - before func(afero.Fs) error - check func(afero.Fs, *testing.T) }{ { name: "it should fail if the img can't be pulled and imgs can't be listed", @@ -57,11 +53,6 @@ func TestStart(t *testing.T) { }, expectedErrMsg: "Couldn't pull image and a local one" + " wasn't found either.", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, false, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should fail if the img couldn't be pulled bc of internet conn issues", @@ -87,11 +78,6 @@ func TestStart(t *testing.T) { }, expectedErrMsg: `Couldn't pull image and a local one wasn't found either. Please check your internet connection and try again.`, - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, false, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should fail if the img can't be pulled and it isn't avail. locally", @@ -114,11 +100,6 @@ Please check your internet connection and try again.`, }, expectedErrMsg: "Couldn't pull image and a local one" + " wasn't found either.", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, false, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should fail if creating the network fails", @@ -137,14 +118,6 @@ Please check your internet connection and try again.`, }, nil }, expectedErrMsg: "Network create go boom", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, true, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - ok, err = common.CheckFiles(fs, st, false, common.NodeDir(0)) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should fail if inspecting the network fails", @@ -163,15 +136,6 @@ Please check your internet connection and try again.`, }, nil }, expectedErrMsg: "Can't inspect the network", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, true, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - ok, err = common.CheckFiles(fs, st, false, common.NodeDir(0)) - require.NoError(st, err) - require.True(st, ok) - - }, }, { name: "it should fail if the network config is corrupted", @@ -194,14 +158,48 @@ Please check your internet connection and try again.`, }, nil }, expectedErrMsg: "'rpnet' network config is corrupted", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, true, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - ok, err = common.CheckFiles(fs, st, false, common.NodeDir(0)) - require.NoError(st, err) - require.True(st, ok) + }, + { + name: "it should fail if listing the containers fails", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return nil, errors.New("Can't list") + }, + }, nil + }, + expectedErrMsg: "Can't list", + }, + { + name: "it should fail if inspecting existing containers fails", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerInspect: func( + _ context.Context, + _ string, + ) (types.ContainerJSON, error) { + return types.ContainerJSON{}, + errors.New("Can't inspect") + }, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, + }, nil }, + expectedErrMsg: "Can't inspect", }, { name: "it should fail if creating the container fails", @@ -245,14 +243,6 @@ Please check your internet connection and try again.`, }, nil }, expectedErrMsg: "Can't create container", - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles(fs, st, true, common.ClusterDir()) - require.NoError(st, err) - require.True(st, ok) - ok, err = common.CheckFiles(fs, st, false, common.NodeDir(0)) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should allow creating a single container", @@ -297,17 +287,6 @@ Please check your internet connection and try again.`, }, nil }, expectedOutput: `Cluster started! You may use 'rpk api' to interact with the cluster. E.g:\n\nrpk api status`, - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles( - fs, - st, - true, - common.DataDir(0), - common.ConfPath(0), - ) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should allow creating multiple containers", @@ -352,21 +331,6 @@ Please check your internet connection and try again.`, }, nil }, expectedOutput: `Cluster started! You may use 'rpk api' to interact with the cluster. E.g:\n\nrpk api status`, - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles( - fs, - st, - true, - common.DataDir(0), - common.ConfPath(0), - common.DataDir(1), - common.ConfPath(1), - common.DataDir(2), - common.ConfPath(2), - ) - require.NoError(st, err) - require.True(st, ok) - }, }, { name: "it should do nothing if there's an existing running cluster", @@ -401,34 +365,16 @@ Please check your internet connection and try again.`, }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, - check: func(fs afero.Fs, st *testing.T) { - ok, err := common.CheckFiles( - fs, - st, - true, - common.ConfDir(0), - ) - require.NoError(st, err) - require.True(st, ok) - }, }, } for _, tt := range tests { t.Run(tt.name, func(st *testing.T) { var out bytes.Buffer - fs := afero.NewMemMapFs() - mgr := config.NewManager(fs) - if tt.before != nil { - require.NoError(st, tt.before(fs)) - } c, err := tt.client() require.NoError(st, err) logrus.SetOutput(&out) - err = startCluster(fs, mgr, c, tt.nodes) + err = startCluster(c, tt.nodes) if tt.expectedErrMsg != "" { require.EqualError(st, err, tt.expectedErrMsg) } else { @@ -438,9 +384,6 @@ Please check your internet connection and try again.`, require.Contains(st, out.String(), tt.expectedOutput) } } - if tt.check != nil { - tt.check(fs, st) - } }) } } diff --git a/src/go/rpk/pkg/cli/cmd/container/stop.go b/src/go/rpk/pkg/cli/cmd/container/stop.go index 8a4495461dd3..82629d9dbfbf 100644 --- a/src/go/rpk/pkg/cli/cmd/container/stop.go +++ b/src/go/rpk/pkg/cli/cmd/container/stop.go @@ -16,11 +16,10 @@ import ( "vectorized/pkg/cli/cmd/container/common" log "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/spf13/cobra" ) -func Stop(fs afero.Fs) *cobra.Command { +func Stop() *cobra.Command { command := &cobra.Command{ Use: "stop", Short: "Stop an existing local container cluster", @@ -30,13 +29,13 @@ func Stop(fs afero.Fs) *cobra.Command { return err } defer c.Close() - return common.WrapIfConnErr(stopCluster(fs, c)) + return common.WrapIfConnErr(stopCluster(c)) }, } return command } -func stopCluster(fs afero.Fs, c common.Client) error { +func stopCluster(c common.Client) error { nodes, err := common.GetExistingNodes(c) if err != nil { return err diff --git a/src/go/rpk/pkg/cli/cmd/container/stop_test.go b/src/go/rpk/pkg/cli/cmd/container/stop_test.go index fb52233233ca..144b895186ce 100644 --- a/src/go/rpk/pkg/cli/cmd/container/stop_test.go +++ b/src/go/rpk/pkg/cli/cmd/container/stop_test.go @@ -19,7 +19,6 @@ import ( "github.com/docker/docker/api/types" "github.com/sirupsen/logrus" - "github.com/spf13/afero" "github.com/stretchr/testify/require" ) @@ -30,7 +29,6 @@ func TestStop(t *testing.T) { client func() (common.Client, error) expectedErrMsg string expectedOutput []string - before func(afero.Fs) error }{ { name: "it should log if the container can't be stopped", @@ -71,15 +69,58 @@ func TestStop(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedOutput: []string{ + "Stopping node 2", + "Couldn't stop node 2", + "Stopping node 1", + "Couldn't stop node 1", "Stopping node 0", "Couldn't stop node 0", "Don't stop me now", }, }, + { + name: "it should fail if the containers can't be listed", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return nil, errors.New("Can't list") + }, + }, nil + }, + expectedErrMsg: "Can't list", + }, + { + name: "it should fail if the containers can't be inspected", + client: func() (common.Client, error) { + return &common.MockClient{ + MockContainerInspect: func( + _ context.Context, + _ string, + ) (types.ContainerJSON, error) { + return types.ContainerJSON{}, + errors.New("Can't inspect") + }, + MockContainerList: func( + _ context.Context, + _ types.ContainerListOptions, + ) ([]types.Container, error) { + return []types.Container{ + { + ID: "a", + Labels: map[string]string{ + "node-id": "0", + }, + }, + }, nil + }, + }, nil + }, + expectedErrMsg: "Can't inspect", + }, { name: "it should do nothing if there's no cluster", client: func() (common.Client, error) { @@ -109,9 +150,6 @@ func TestStop(t *testing.T) { }, }, nil }, - before: func(fs afero.Fs) error { - return fs.MkdirAll(common.ConfDir(0), 0755) - }, expectedOutput: []string{ "Stopping node 0", }, @@ -120,15 +158,11 @@ func TestStop(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(st *testing.T) { var out bytes.Buffer - fs := afero.NewMemMapFs() - if tt.before != nil { - require.NoError(st, tt.before(fs)) - } c, err := tt.client() require.NoError(st, err) logrus.SetOutput(&out) logrus.SetLevel(logrus.DebugLevel) - err = stopCluster(fs, c) + err = stopCluster(c) if tt.expectedErrMsg != "" { require.EqualError(st, err, tt.expectedErrMsg) return diff --git a/src/go/rpk/pkg/cli/cmd/root.go b/src/go/rpk/pkg/cli/cmd/root.go index 0c85bf2482cd..be610268cb05 100644 --- a/src/go/rpk/pkg/cli/cmd/root.go +++ b/src/go/rpk/pkg/cli/cmd/root.go @@ -71,7 +71,7 @@ func Execute() { rootCmd.AddCommand(NewVersionCommand()) rootCmd.AddCommand(NewApiCommand(fs, mgr)) rootCmd.AddCommand(NewWasmCommand(fs)) - rootCmd.AddCommand(NewContainerCommand(fs, mgr)) + rootCmd.AddCommand(NewContainerCommand()) addPlatformDependentCmds(fs, mgr, rootCmd) From 1cdf975101b97bfa6be959a2c699408b6e3818ef Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 19:55:58 -0500 Subject: [PATCH 5/6] rpk/container/common: Remove unused func --- src/go/rpk/pkg/cli/cmd/container/start.go | 37 ----------------------- 1 file changed, 37 deletions(-) diff --git a/src/go/rpk/pkg/cli/cmd/container/start.go b/src/go/rpk/pkg/cli/cmd/container/start.go index 7825adc7edba..74adeff82edc 100644 --- a/src/go/rpk/pkg/cli/cmd/container/start.go +++ b/src/go/rpk/pkg/cli/cmd/container/start.go @@ -304,43 +304,6 @@ func startNode( return err } -func writeNodeConfig( - mgr config.Manager, - nodeID, kafkaPort, rpcPort, seedRPCPort uint, - ip, seedIP, path string, - cores int, -) (*config.Config, error) { - localhost := "127.0.0.1" - conf := config.Default() - conf.Redpanda.Id = int(nodeID) - conf.ConfigFile = path - - conf.Redpanda.KafkaApi.Address = ip - conf.Redpanda.RPCServer.Address = ip - - conf.Redpanda.AdvertisedKafkaApi = &config.SocketAddress{ - Address: localhost, - Port: int(kafkaPort), - } - conf.Redpanda.AdvertisedRPCAPI = &config.SocketAddress{ - Address: ip, - Port: conf.Redpanda.RPCServer.Port, - } - - if seedIP != "" { - conf.Redpanda.SeedServers = []config.SeedServer{{ - Id: 0, - Host: config.SocketAddress{ - Address: seedIP, - Port: conf.Redpanda.RPCServer.Port, - }, - }} - } - config.SetMode(config.ModeDev, conf) - conf.Rpk.SMP = &cores - return conf, mgr.Write(conf) -} - func renderClusterInfo(nodes []node) { t := ui.NewRpkTable(log.StandardLogger().Out) t.SetColWidth(80) From ca5c6b6a6a45c1e95835d85c5d102bb7e4a907a1 Mon Sep 17 00:00:00 2001 From: David Castillo Date: Wed, 9 Dec 2020 21:02:05 -0500 Subject: [PATCH 6/6] rpk/container: Use the image's next version, not latest The 'latest' tag is a moving target. Therefore, allow setting the image version that rpk will pull. --- src/go/rpk/build.sh | 8 ++++++-- src/go/rpk/pkg/cli/cmd/container/common/common.go | 10 +++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/go/rpk/build.sh b/src/go/rpk/build.sh index f1b074fe317f..67db9c0ecf7b 100755 --- a/src/go/rpk/build.sh +++ b/src/go/rpk/build.sh @@ -10,11 +10,15 @@ version=$1 rev=${2:-$(git rev-parse --short HEAD)} +img_tag=${version:-latest} out_dir="$(go env GOOS)-$(go env GOARCH)" mkdir -p ${out_dir} -pkg='vectorized/pkg/cli/cmd/version' +ver_pkg='vectorized/pkg/cli/cmd/version' +cont_pkg='vectorized/pkg/cli/cmd/container/common' -go build -ldflags "-X ${pkg}.version=${version} -X ${pkg}.rev=${rev}" -o ${out_dir} ./... +go build \ + -ldflags "-X ${ver_pkg}.version=${version} -X ${ver_pkg}.rev=${rev} -X ${cont_pkg}.tag=${img_tag}" \ + -o ${out_dir} ./... diff --git a/src/go/rpk/pkg/cli/cmd/container/common/common.go b/src/go/rpk/pkg/cli/cmd/container/common/common.go index 8e5a27d8e1c2..f2d12f8a4075 100644 --- a/src/go/rpk/pkg/cli/cmd/container/common/common.go +++ b/src/go/rpk/pkg/cli/cmd/container/common/common.go @@ -28,11 +28,15 @@ import ( log "github.com/sirupsen/logrus" ) -const ( - redpandaNetwork = "redpanda" +var ( registry = "docker.io" - redpandaImageBase = "vectorized/redpanda" + tag = "latest" + redpandaImageBase = "vectorized/redpanda:" + tag redpandaImage = registry + "/" + redpandaImageBase +) + +const ( + redpandaNetwork = "redpanda" defaultDockerClientTimeout = 10 * time.Second )