Skip to content

Commit

Permalink
feature: support take over old containerd instance when pouchd restart
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Wan <[email protected]>
  • Loading branch information
HusterWan committed May 9, 2018
1 parent 819c4d3 commit 7bf5abc
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 66 deletions.
206 changes: 182 additions & 24 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,43 @@ package ctrd
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/alibaba/pouch/pkg/scheduler"
"github.com/alibaba/pouch/pkg/utils"

"github.com/containerd/containerd"
"github.com/sirupsen/logrus"
)

const (
unixSocketPath = "/run/containerd/containerd.sock"
containerdPidFileName = "containerd.pid"
defaultGrpcClientPoolCapacity = 5
defaultMaxStreamsClient = 100
containerdShutdownTimeout = 15 * time.Second
)

// Config represents the config used to communicated with containerd.
type Config struct {
Address string
// GrpcClientPoolCapacity is the capacity of grpc client pool.
GrpcClientPoolCapacity int
// MaxStreamsClient records the max number of concurrent streams
MaxStreamsClient int
}

// Client is the client side the daemon holds to communicate with containerd.
type Client struct {
mu sync.RWMutex
Config
mu sync.RWMutex
watch *watch
lock *containerLock

daemonPid int
homeDir string
rpcAddr string
oomScoreAdjust int
debugLog bool

// containerd grpc pool
pool []scheduler.Factory
scheduler scheduler.Scheduler
Expand All @@ -42,38 +48,50 @@ type Client struct {
}

// NewClient connect to containerd.
func NewClient(cfg Config) (APIClient, error) {
if cfg.Address == "" {
cfg.Address = unixSocketPath
}

if cfg.GrpcClientPoolCapacity <= 0 {
cfg.GrpcClientPoolCapacity = defaultGrpcClientPoolCapacity
func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) {
// set default value for parameters
copts := clientOpts{
rpcAddr: unixSocketPath,
grpcClientPoolCapacity: defaultGrpcClientPoolCapacity,
maxStreamsClient: defaultMaxStreamsClient,
}

if cfg.MaxStreamsClient <= 0 {
cfg.MaxStreamsClient = defaultMaxStreamsClient
for _, opt := range opts {
if err := opt(&copts); err != nil {
return nil, err
}
}

client := &Client{
Config: cfg,
lock: &containerLock{
ids: make(map[string]struct{}),
},
watch: &watch{
containers: make(map[string]*containerPack),
},
daemonPid: -1,
homeDir: homeDir,
oomScoreAdjust: copts.oomScoreAdjust,
debugLog: copts.debugLog,
rpcAddr: copts.rpcAddr,
}

// start new containerd instance.
if copts.startDaemon {
if err := client.runContainerdDaemon(homeDir, copts); err != nil {
return nil, err
}
}

for i := 0; i < cfg.GrpcClientPoolCapacity; i++ {
cli, err := newWrapperClient(cfg)
for i := 0; i < copts.grpcClientPoolCapacity; i++ {
cli, err := newWrapperClient(copts.rpcAddr, copts.maxStreamsClient)
if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %v", err)
}
client.pool = append(client.pool, cli)
}

logrus.Infof("success to create %d containerd clients, connect to: %s", cfg.GrpcClientPoolCapacity, cfg.Address)
logrus.Infof("success to create %d containerd clients, connect to: %s", copts.grpcClientPoolCapacity, copts.rpcAddr)

scheduler, err := scheduler.NewLRUScheduler(client.pool)
if err != nil {
Expand Down Expand Up @@ -166,3 +184,143 @@ func (c *Client) Version(ctx context.Context) (containerd.Version, error) {

return cli.client.Version(ctx)
}

func (c *Client) runContainerdDaemon(homeDir string, copts clientOpts) error {
if homeDir == "" {
return fmt.Errorf("ctrd: containerd home dir should not be empty")
}

containerdPath, err := exec.LookPath(copts.containerdBinary)
if err != nil {
return fmt.Errorf("failed to find containerd binary %s: %v", copts.containerdBinary, err)
}

stateDir := path.Join(homeDir, "containerd/state")
if _, err := os.Stat(stateDir); err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(stateDir, 0666); err != nil {
return fmt.Errorf("failed to mkdir %s: %v", stateDir, err)
}
}

pidFileName := path.Join(stateDir, containerdPidFileName)
f, err := os.OpenFile(pidFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()

buf := make([]byte, 8)
num, err := f.Read(buf)
if err != nil && err != io.EOF {
return err
}

if num > 0 {
pid, err := strconv.ParseUint(string(buf[:num]), 10, 64)
if err != nil {
return err
}
if utils.IsProcessAlive(int(pid)) {
logrus.Infof("ctrd: previous instance of containerd still alive (%d)", pid)
c.daemonPid = int(pid)
return nil
}
}

// empty container pid file
_, err = f.Seek(0, os.SEEK_SET)
if err != nil {
return err
}

if err := f.Truncate(0); err != nil {
return err
}

// if socket file exists, delete it.
if _, err := os.Stat(c.rpcAddr); err == nil {
os.RemoveAll(c.rpcAddr)
}

cmd, err := c.newContainerdCmd(containerdPath)
if err != nil {
return err
}

if err := utils.SetOOMScore(cmd.Process.Pid, c.oomScoreAdjust); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}

if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}

go cmd.Wait()

c.daemonPid = cmd.Process.Pid
return nil
}

func (c *Client) newContainerdCmd(containerdPath string) (*exec.Cmd, error) {
// Start a new containerd instance
args := []string{
"-a", c.rpcAddr,
"--root", path.Join(c.homeDir, "containerd/root"),
"--state", path.Join(c.homeDir, "containerd/state"),
"-l", utils.If(c.debugLog, "debug", "info").(string),
}

cmd := exec.Command(containerdPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true, Pdeathsig: syscall.SIGKILL}
cmd.Env = nil
// clear the NOTIFY_SOCKET from the env when starting containerd
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
}

if err := cmd.Start(); err != nil {
return nil, err
}

logrus.Infof("ctrd: new containerd process, pid: %d", cmd.Process.Pid)
return cmd, nil
}

// Cleanup handle containerd instance exits.
func (c *Client) Cleanup() error {
if c.daemonPid == -1 {
return nil
}

if err := c.Close(); err != nil {
return err
}

// Ask the daemon to quit
syscall.Kill(c.daemonPid, syscall.SIGTERM)

// Wait up to 15secs for it to stop
for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
if !utils.IsProcessAlive(c.daemonPid) {
break
}
time.Sleep(time.Second)
}

if utils.IsProcessAlive(c.daemonPid) {
logrus.Warnf("ctrd: containerd (%d) didn't stop within 15secs, killing it\n", c.daemonPid)
syscall.Kill(c.daemonPid, syscall.SIGKILL)
}

// cleanup some files
os.Remove(path.Join(c.homeDir, "containerd/state", containerdPidFileName))
os.Remove(c.rpcAddr)

return nil
}
109 changes: 109 additions & 0 deletions ctrd/client_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ctrd

import "fmt"

type clientOpts struct {
startDaemon bool
debugLog bool
rpcAddr string
homeDir string
containerdBinary string
grpcClientPoolCapacity int
maxStreamsClient int
oomScoreAdjust int
}

// ClientOpt allows caller to set options for containerd client.
type ClientOpt func(c *clientOpts) error

// WithStartDaemon set startDaemon flag for containerd client.
// startDaemon is a flag to decide whether start a new containerd instance
// when create a containerd client.
func WithStartDaemon(startDaemon bool) ClientOpt {
return func(c *clientOpts) error {
c.startDaemon = startDaemon
return nil
}
}

// WithRPCAddr set containerd listen address.
func WithRPCAddr(rpcAddr string) ClientOpt {
return func(c *clientOpts) error {
if rpcAddr == "" {
return fmt.Errorf("rpc socket path is empty")
}

c.rpcAddr = rpcAddr
return nil
}
}

// WithDebugLog set debugLog flag for containerd client.
// debugLog decides containerd log level.
func WithDebugLog(debugLog bool) ClientOpt {
return func(c *clientOpts) error {
c.debugLog = debugLog
return nil
}
}

// WithHomeDir set home dir for containerd.
func WithHomeDir(homeDir string) ClientOpt {
return func(c *clientOpts) error {
if homeDir == "" {
return fmt.Errorf("containerd home Dir is empty")
}

c.homeDir = homeDir
return nil
}
}

// WithContainerdBinary specifies the containerd binary path.
func WithContainerdBinary(containerdBinary string) ClientOpt {
return func(c *clientOpts) error {
if containerdBinary == "" {
return fmt.Errorf("containerd binary path is empty")
}

c.containerdBinary = containerdBinary
return nil
}
}

// WithGrpcClientPoolCapacity sets containerd clients pool capacity.
func WithGrpcClientPoolCapacity(grpcClientPoolCapacity int) ClientOpt {
return func(c *clientOpts) error {
if grpcClientPoolCapacity <= 0 {
return fmt.Errorf("containerd clients pool capacity should positive number")
}

c.grpcClientPoolCapacity = grpcClientPoolCapacity
return nil
}
}

// WithMaxStreamsClient sets one containerd grpc client can hold max streams client.
func WithMaxStreamsClient(maxStreamsClient int) ClientOpt {
return func(c *clientOpts) error {

if maxStreamsClient <= 0 {
return fmt.Errorf("containerd max streams client should be positive number")
}

c.maxStreamsClient = maxStreamsClient
return nil
}
}

// WithOOMScoreAdjust sets oom-score for containerd instance.
func WithOOMScoreAdjust(oomScore int) ClientOpt {
return func(c *clientOpts) error {
if oomScore > 1000 || oomScore < -1000 {
return fmt.Errorf("oom-score range should be [-1000, 1000]")
}

c.oomScoreAdjust = oomScore
return nil
}
}
5 changes: 3 additions & 2 deletions ctrd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (

func TestNewClient(t *testing.T) {
type args struct {
cfg Config
homeDir string
opts []ClientOpt
}
tests := []struct {
name string
Expand All @@ -19,7 +20,7 @@ func TestNewClient(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewClient(tt.args.cfg)
got, err := NewClient(tt.args.homeDir, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
1 change: 1 addition & 0 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type APIClient interface {
SnapshotAPIClient

Version(ctx context.Context) (containerd.Version, error)
Cleanup() error
}

// ContainerAPIClient provides access to containerd container features.
Expand Down
Loading

0 comments on commit 7bf5abc

Please sign in to comment.