Skip to content

Commit

Permalink
Merge pull request #2403 from tonistiigi/push-deadlock
Browse files Browse the repository at this point in the history
limited: fix possible deadlock when pushhandler calls fetcher
  • Loading branch information
AkihiroSuda authored Oct 7, 2021
2 parents b2ff444 + 7153f5a commit b8462c3
Showing 1 changed file with 25 additions and 59 deletions.
84 changes: 25 additions & 59 deletions util/resolver/limited/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

type contextKeyT string

var contextKey = contextKeyT("buildkit/util/resolver/limited")

var Default = New(4)

type Group struct {
Expand All @@ -30,7 +33,13 @@ type req struct {
ref string
}

func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (func(), error) {
func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (context.Context, func(), error) {
if v := ctx.Value(contextKey); v != nil {
return ctx, func() {}, nil
}

ctx = context.WithValue(ctx, contextKey, struct{}{})

// json request get one additional connection
highPriority := strings.HasSuffix(desc.MediaType, "+json")

Expand All @@ -46,16 +55,16 @@ func (r *req) acquire(ctx context.Context, desc ocispecs.Descriptor) (func(), er
r.g.mu.Unlock()
if !highPriority {
if err := s[0].Acquire(ctx, 1); err != nil {
return nil, err
return ctx, nil, err
}
}
if err := s[1].Acquire(ctx, 1); err != nil {
if !highPriority {
s[0].Release(1)
}
return nil, err
return ctx, nil, err
}
return func() {
return ctx, func() {
s[1].Release(1)
if !highPriority {
s[0].Release(1)
Expand All @@ -78,60 +87,17 @@ func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher {
return &fetcher{Fetcher: f, req: g.req(ref)}
}

func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher {
return &pusher{Pusher: p, req: g.req(ref)}
}

type pusher struct {
remotes.Pusher
req *req
}

func (p *pusher) Push(ctx context.Context, desc ocispecs.Descriptor) (content.Writer, error) {
release, err := p.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
w, err := p.Pusher.Push(ctx, desc)
if err != nil {
release()
return nil, err
}
ww := &writer{Writer: w}
closer := func() {
if !ww.closed {
logrus.Warnf("writer not closed cleanly: %s", desc.Digest)
func (g *Group) PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc {
ph := remotes.PushHandler(pusher, provider)
req := g.req(ref)
return func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
ctx, release, err := req.acquire(ctx, desc)
if err != nil {
return nil, err
}
release()
defer release()
return ph(ctx, desc)
}
ww.release = closer
runtime.SetFinalizer(ww, func(rc *writer) {
rc.close()
})
return ww, nil
}

type writer struct {
content.Writer
once sync.Once
release func()
closed bool
}

func (w *writer) Close() error {
w.closed = true
w.close()
return w.Writer.Close()
}

func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
w.closed = true
w.close()
return w.Writer.Commit(ctx, size, expected, opts...)
}

func (w *writer) close() {
w.once.Do(w.release)
}

type fetcher struct {
Expand All @@ -140,7 +106,7 @@ type fetcher struct {
}

func (f *fetcher) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
release, err := f.req.acquire(ctx, desc)
ctx, release, err := f.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -196,7 +162,7 @@ func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string
}

func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc {
return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider)
return Default.PushHandler(pusher, provider, ref)
}

func domain(ref string) string {
Expand Down

0 comments on commit b8462c3

Please sign in to comment.