Skip to content

Commit

Permalink
Merge pull request #258 from 0x5d/rpk-fix-container
Browse files Browse the repository at this point in the history
Redpanda runs under the 'redpanda' user in the containers.
When rpk container start was run, it mounted the configuration
dir as a volume, messing up the permissions of the volume's
target path (/etc/redpanda)inside the container, which made
rpk fail when it tried to write the config to /etc/redpanda/redpanda.yaml.

Use the --seeds, --kafka-addr, --rpc-addr, --advertise-kafka-addr,
--advertise-rpc-addr & --node-id flags to remove the need for a config file.
  • Loading branch information
0x5d authored Dec 10, 2020
2 parents f04c685 + ca5c6b6 commit d3411f2
Show file tree
Hide file tree
Showing 14 changed files with 505 additions and 468 deletions.
8 changes: 6 additions & 2 deletions src/go/rpk/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@

version=$1
rev=${2:-$(git rev-parse --short HEAD)}
img_tag=${version:-latest}

out_dir="$(go env GOOS)-$(go env GOARCH)"

mkdir -p ${out_dir}

pkg='vectorized/pkg/cli/cmd/version'
ver_pkg='vectorized/pkg/cli/cmd/version'
cont_pkg='vectorized/pkg/cli/cmd/container/common'

go build -ldflags "-X ${pkg}.version=${version} -X ${pkg}.rev=${rev}" -o ${out_dir} ./...
go build \
-ldflags "-X ${ver_pkg}.version=${version} -X ${ver_pkg}.rev=${rev} -X ${cont_pkg}.tag=${img_tag}" \
-o ${out_dir} ./...
12 changes: 4 additions & 8 deletions src/go/rpk/pkg/cli/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,26 +232,22 @@ func createAdmin(
}

func containerBrokers(fs afero.Fs) []string {
nodeIDs, err := common.GetExistingNodes(fs)
c, err := common.NewDockerClient()
if err != nil {
log.Debug(err)
return []string{}
}
c, err := common.NewDockerClient()
nodes, err := common.GetExistingNodes(c)
if err != nil {
log.Debug(err)
return []string{}
}
grp := errgroup.Group{}
mu := sync.Mutex{}
addrs := []string{}
for _, nodeID := range nodeIDs {
id := nodeID
for _, node := range nodes {
s := node
grp.Go(func() error {
s, err := common.GetState(c, id)
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
addrs = append(
Expand Down
10 changes: 4 additions & 6 deletions src/go/rpk/pkg/cli/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@ package cmd

import (
"vectorized/pkg/cli/cmd/container"
"vectorized/pkg/config"

"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func NewContainerCommand(fs afero.Fs, mgr config.Manager) *cobra.Command {
func NewContainerCommand() *cobra.Command {
command := &cobra.Command{
Use: "container",
Short: "Manage a local container cluster",
}

command.AddCommand(container.Start(fs, mgr))
command.AddCommand(container.Stop(fs))
command.AddCommand(container.Purge(fs))
command.AddCommand(container.Start())
command.AddCommand(container.Stop())
command.AddCommand(container.Purge())

return command
}
5 changes: 5 additions & 0 deletions src/go/rpk/pkg/cli/cmd/container/common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Client interface {
timeout *time.Duration,
) error

ContainerList(
ctx context.Context,
options types.ContainerListOptions,
) ([]types.Container, error)

ContainerInspect(
ctx context.Context,
containerID string,
Expand Down
177 changes: 58 additions & 119 deletions src/go/rpk/pkg/cli/cmd/container/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -25,19 +22,21 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
)

const (
redpandaNetwork = "redpanda"
var (
registry = "docker.io"
redpandaImageBase = "vectorized/redpanda"
tag = "latest"
redpandaImageBase = "vectorized/redpanda:" + tag
redpandaImage = registry + "/" + redpandaImageBase
)

const (
redpandaNetwork = "redpanda"

defaultDockerClientTimeout = 10 * time.Second
)
Expand All @@ -62,53 +61,41 @@ func DefaultCtx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultDockerClientTimeout)
}

func ConfPath(nodeID uint) string {
return filepath.Join(ConfDir(nodeID), "redpanda.yaml")
}

func DataDir(nodeID uint) string {
return filepath.Join(NodeDir(nodeID), "data")
}

func ConfDir(nodeID uint) string {
return filepath.Join(NodeDir(nodeID), "conf")
}

func NodeDir(nodeID uint) string {
return filepath.Join(ClusterDir(), fmt.Sprintf("node-%d", nodeID))
}

func ClusterDir() string {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".rpk", "cluster")
}

func GetExistingNodes(fs afero.Fs) ([]uint, error) {
nodeIDs := []uint{}
nodeDirs, err := afero.ReadDir(fs, ClusterDir())
func GetExistingNodes(c Client) ([]*NodeState, error) {
regExp := `^/rp-node-[\d]+`
filters := filters.NewArgs()
filters.Add("name", regExp)
ctx, _ := DefaultCtx()
containers, err := c.ContainerList(
ctx,
types.ContainerListOptions{
All: true,
Filters: filters,
},
)
if err != nil {
if os.IsNotExist(err) {
return nodeIDs, nil
}
return nodeIDs, err
return nil, err
}
nameRegExp := regexp.MustCompile(`node-[\d]+`)
for _, nodeDir := range nodeDirs {
if nodeDir.IsDir() && nameRegExp.Match([]byte(nodeDir.Name())) {
nameParts := strings.Split(nodeDir.Name(), "-")
nodeIDStr := nameParts[len(nameParts)-1]
nodeID, err := strconv.ParseUint(nodeIDStr, 10, 64)
if err != nil {
return nodeIDs, fmt.Errorf(
"Couldn't parse node ID '%s': %v",
nodeIDStr,
err,
)
}
nodeIDs = append(nodeIDs, uint(nodeID))
if len(containers) == 0 {
return []*NodeState{}, nil
}

nodes := make([]*NodeState, len(containers))
for i, cont := range containers {
nodeIDStr := cont.Labels["node-id"]
nodeID, err := strconv.ParseUint(nodeIDStr, 10, 64)
if err != nil {
return nil, fmt.Errorf(
"Couldn't parse node ID: '%s'",
nodeIDStr,
)
}
nodes[i], err = GetState(c, uint(nodeID))
if err != nil {
return nil, err
}
}
return nodeIDs, nil
return nodes, nil
}

func GetState(c Client, nodeID uint) (*NodeState, error) {
Expand Down Expand Up @@ -140,7 +127,6 @@ func GetState(c Client, nodeID uint) (*NodeState, error) {
return &NodeState{
Running: containerJSON.State.Running,
Status: containerJSON.State.Status,
ConfigFile: ConfPath(nodeID),
ContainerID: containerJSON.ID,
ContainerIP: ipAddress,
HostKafkaPort: hostKafkaPort,
Expand Down Expand Up @@ -208,8 +194,8 @@ func RemoveNetwork(c Client) error {
}

func CreateNode(
fs afero.Fs, c Client, nodeID,
kafkaPort, rpcPort uint, netID string,
c Client, nodeID,
kafkaPort, rpcPort uint, netID string, args ...string,
) (*NodeState, error) {
rPort, err := nat.NewPort(
"tcp",
Expand All @@ -225,10 +211,28 @@ func CreateNode(
if err != nil {
return nil, err
}
ip, err := nodeIP(c, netID, nodeID)
if err != nil {
return nil, err
}
hostname := Name(nodeID)
cmd := []string{
"start",
"--node-id",
fmt.Sprintf("%d", nodeID),
"--kafka-addr",
fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.KafkaApi.Port),
"--rpc-addr",
fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.RPCServer.Port),
"--advertise-kafka-addr",
fmt.Sprintf("127.0.0.1:%d", kafkaPort),
"--advertise-rpc-addr",
fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.RPCServer.Port),
}
containerConfig := container.Config{
Image: redpandaImageBase,
Hostname: hostname,
Cmd: append(cmd, args...),
ExposedPorts: nat.PortSet{
rPort: {},
kPort: {},
Expand All @@ -239,18 +243,6 @@ func CreateNode(
},
}
hostConfig := container.HostConfig{
Mounts: []mount.Mount{
mount.Mount{
Type: mount.TypeBind,
Target: "/opt/redpanda/data",
Source: DataDir(nodeID),
},
mount.Mount{
Type: mount.TypeBind,
Target: "/etc/redpanda",
Source: ConfDir(nodeID),
},
},
PortBindings: nat.PortMap{
rPort: []nat.PortBinding{nat.PortBinding{
HostPort: fmt.Sprint(rpcPort),
Expand All @@ -260,10 +252,6 @@ func CreateNode(
}},
},
}
ip, err := nodeIP(c, netID, nodeID)
if err != nil {
return nil, err
}
networkConfig := network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
redpandaNetwork: &network.EndpointSettings{
Expand All @@ -275,12 +263,6 @@ func CreateNode(
},
}

err = createNodeDirs(fs, nodeID)
if err != nil {
RemoveNodeDir(fs, nodeID)
return nil, err
}

ctx, _ := DefaultCtx()
container, err := c.ContainerCreate(
ctx,
Expand All @@ -290,11 +272,9 @@ func CreateNode(
hostname,
)
if err != nil {
RemoveNodeDir(fs, nodeID)
return nil, err
}
return &NodeState{
ConfigFile: ConfPath(nodeID),
HostKafkaPort: kafkaPort,
ID: nodeID,
ContainerID: container.ID,
Expand Down Expand Up @@ -380,47 +360,6 @@ func nodeIP(c Client, netID string, id uint) (string, error) {
return strings.Join(octets, "."), nil
}

func createNodeDirs(fs afero.Fs, nodeID uint) error {
dir := DataDir(nodeID)
// If it doesn't exist already, create a directory for the node's data.
exists, err := afero.DirExists(fs, dir)
if err != nil {
return err
}
if !exists {
err = fs.MkdirAll(dir, 0755)
if err != nil {
return err
}
}

dir = ConfDir(nodeID)
// If it doesn't exist already, create a directory for the node's config.
exists, err = afero.DirExists(fs, dir)
if err != nil {
return err
}
if !exists {
err = fs.MkdirAll(dir, 0755)
if err != nil {
return err
}
}
return nil
}

func RemoveNodeDir(fs afero.Fs, nodeID uint) error {
err := fs.RemoveAll(NodeDir(nodeID))
if err != nil {
log.Debugf(
"Got an error while removing node %d's dir: %v",
nodeID,
err,
)
}
return err
}

func WrapIfConnErr(err error) error {
if client.IsErrConnectionFailed(err) {
msg := `Couldn't connect to docker.
Expand Down
Loading

0 comments on commit d3411f2

Please sign in to comment.