Skip to content

Commit

Permalink
fix: add lock timeout to reduce lock failure cases
Browse files Browse the repository at this point in the history
Signed-off-by: Allen Sun <[email protected]>
  • Loading branch information
allencloud committed Sep 29, 2018
1 parent 8be6286 commit 1e822b7
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 136 deletions.
23 changes: 19 additions & 4 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/alibaba/pouch/pkg/kmutex"
"github.com/alibaba/pouch/pkg/scheduler"
"github.com/alibaba/pouch/pkg/utils"

Expand All @@ -29,6 +30,10 @@ const (
defaultGrpcClientPoolCapacity = 5
defaultMaxStreamsClient = 100
containerdShutdownTimeout = 15 * time.Second

// trylock and lock with timeout is a policy used when accessing containerd object in ctrd.
// To avoid infinite block and directly return, we add a timeout to support object lock.
containerdObjectLockTimeout = time.Duration(5 * time.Second)
)

// ErrGetCtrdClient is an error returned when failed to get a containerd grpc client from clients pool.
Expand All @@ -38,7 +43,9 @@ var ErrGetCtrdClient = errors.New("failed to get a containerd grpc client")
type Client struct {
mu sync.RWMutex
watch *watch
lock *containerLock

// lock ensures that only one caller of client can get access to the same object in containerd.
lock *kmutex.KMutex

daemonPid int
homeDir string
Expand Down Expand Up @@ -72,9 +79,7 @@ func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) {
}

client := &Client{
lock: &containerLock{
ids: make(map[string]struct{}),
},
lock: kmutex.New(),
watch: &watch{
containers: make(map[string]*containerPack),
},
Expand Down Expand Up @@ -416,3 +421,13 @@ func (c *Client) collectContainerdEvents() {
}
}
}

// Trylock tries to lock the object in containerd.
func (c *Client) Trylock(id string) bool {
return c.lock.LockWithTimeout(id, containerdObjectLockTimeout)
}

// Unlock unlocks the object in containerd.
func (c *Client) Unlock(id string) {
c.lock.Unlock(id)
}
54 changes: 27 additions & 27 deletions ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (c *Client) ContainerStats(ctx context.Context, id string) (*containerdtype

// containerStats returns stats of the container.
func (c *Client) containerStats(ctx context.Context, id string) (*containerdtypes.Metric, error) {
if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -196,10 +196,10 @@ func (c *Client) ContainerPIDs(ctx context.Context, id string) ([]int, error) {

// containerPIDs returns the all processes's ids inside the container.
func (c *Client) containerPIDs(ctx context.Context, id string) ([]int, error) {
if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -255,10 +255,10 @@ func (c *Client) recoverContainer(ctx context.Context, id string, io *containeri
return fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

lc, err := wrapperCli.client.LoadContainer(ctx, id)
if err != nil {
Expand Down Expand Up @@ -317,10 +317,10 @@ func (c *Client) destroyContainer(ctx context.Context, id string, timeout int64)

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())

if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -385,10 +385,10 @@ func (c *Client) PauseContainer(ctx context.Context, id string) error {

// pauseContainer pause container.
func (c *Client) pauseContainer(ctx context.Context, id string) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -416,10 +416,10 @@ func (c *Client) UnpauseContainer(ctx context.Context, id string) error {

// unpauseContainer unpauses a container.
func (c *Client) unpauseContainer(ctx context.Context, id string) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand All @@ -444,10 +444,10 @@ func (c *Client) CreateContainer(ctx context.Context, container *Container, chec
id = container.ID
)

if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

if err := c.createContainer(ctx, ref, id, checkpointDir, container); err != nil {
return convertCtrdErr(err)
Expand Down Expand Up @@ -596,10 +596,10 @@ func (c *Client) UpdateResources(ctx context.Context, id string, resources types

// updateResources updates the configurations of a container.
func (c *Client) updateResources(ctx context.Context, id string, resources types.Resources) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand All @@ -626,10 +626,10 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi
// resizeContainer changes the size of the TTY of the init process running
// in the container to the given height and width.
func (c *Client) resizeContainer(ctx context.Context, id string, opts types.ResizeOptions) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions ctrd/container_lock.go

This file was deleted.

53 changes: 0 additions & 53 deletions ctrd/container_lock_test.go

This file was deleted.

5 changes: 5 additions & 0 deletions ctrd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func convertCtrdErr(err error) error {

return err
}

// LockFailedError is constructing a much more readable lock failed error.
func LockFailedError(containerID string) error {
return errors.Wrapf(errtypes.ErrLockfailed, "container %s is accessed by other request and please try again", containerID)
}
27 changes: 26 additions & 1 deletion ctrd/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/alibaba/pouch/pkg/errtypes"

"github.com/containerd/containerd/errdefs"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -81,3 +80,29 @@ func Test_convertCtrdErr(t *testing.T) {
})
}
}

func TestLockFailedError(t *testing.T) {
type args struct {
containerID string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "normal test case",
args: args{
containerID: "asdfghjkl",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := LockFailedError(tt.args.containerID); (err != nil) != tt.wantErr {
t.Errorf("LockFailedError() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Loading

0 comments on commit 1e822b7

Please sign in to comment.