Skip to content

Commit

Permalink
Merge pull request #1636 from tonistiigi/resolver-pool
Browse files Browse the repository at this point in the history
resolver: add better pooling and custom authenticator
  • Loading branch information
sipsma authored Aug 14, 2020
2 parents 8861f58 + e650092 commit 661cafc
Show file tree
Hide file tree
Showing 29 changed files with 1,056 additions and 367 deletions.
2 changes: 1 addition & 1 deletion cache/remotecache/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func getContentStore(ctx context.Context, sm *session.Manager, g session.Group,
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

caller, err := sm.Get(timeoutCtx, sessionID)
caller, err := sm.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
if err != nil {
return nil, err
}
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
remote := resolver.DefaultPool.GetResolver(hosts, ref, "push", sm, g)
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
Expand All @@ -52,7 +52,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke
if err != nil {
return nil, specs.Descriptor{}, err
}
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
remote := resolver.DefaultPool.GetResolver(hosts, ref, "pull", sm, g)
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err
Expand Down
2 changes: 1 addition & 1 deletion exporter/local/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/oci/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/tar/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source,
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20160425231609-f8ad88b59a58 // indirect
github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340 // indirect
github.com/containerd/console v1.0.0
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f
github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb
github.com/containerd/fifo v0.0.0-20200410184934-f15a3290365b // indirect
github.com/containerd/go-cni v1.0.0
Expand Down Expand Up @@ -71,7 +71,7 @@ require (
)

replace (
github.com/containerd/containerd => github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860
github.com/containerd/containerd => github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f
github.com/docker/docker => github.com/docker/docker v17.12.0-ce-rc1.0.20200310163718-4634ce647cf2+incompatible
github.com/hashicorp/go-immutable-radix => github.com/tonistiigi/go-immutable-radix v0.0.0-20170803185627-826af9ccf0fe
github.com/jaguilar/vt100 => github.com/tonistiigi/vt100 v0.0.0-20190402012908-ad4c4a574305
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on
github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
github.com/containerd/console v1.0.0 h1:fU3UuQapBs+zLJu82NhR11Rif1ny2zfMMAyPJzSN5tQ=
github.com/containerd/console v1.0.0/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860 h1:30UR1cinmvhqtKTpQWOJada+p36BgAuXf5w+aHEJOto=
github.com/containerd/containerd v1.4.0-beta.2.0.20200728183644-eb6354a11860/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f h1:eSl1h+oob8CkeY7TXMs6yrs6eXzgZDbqvsOM0l+EOgk=
github.com/containerd/containerd v1.4.0-beta.2.0.20200730150746-fa1220fce33f/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/containerd/continuity v0.0.0-20200228182428-0f16d7a0959c/go.mod h1:Dq467ZllaHgAtVp4p1xUQWBrFXR9s/wyoTpG8zOJGkY=
github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb h1:nXPkFq8X1a9ycY3GYQpFNxHh3j2JgY7zDZfq2EXMIzk=
Expand Down
13 changes: 7 additions & 6 deletions session/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"google.golang.org/grpc/codes"
)

func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string, string, error) {
return func(host string) (string, string, error) {
var user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, _ string, c session.Caller) error {
func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (session, username, secret string, err error) {
return func(host string) (string, string, string, error) {
var sessionID, user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, id string, c session.Caller) error {
client := NewAuthClient(c.Conn())

resp, err := client.Credentials(ctx, &CredentialsRequest{
Expand All @@ -23,13 +23,14 @@ func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string,
}
return err
}
sessionID = id
user = resp.Username
secret = resp.Secret
return nil
})
if err != nil {
return "", "", err
return "", "", "", err
}
return user, secret, nil
return sessionID, user, secret, nil
}
}
2 changes: 1 addition & 1 deletion session/content/content_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestContentAttachable(t *testing.T) {
})

g.Go(func() error {
c, err := m.Get(ctx, s.ID())
c, err := m.Get(ctx, s.ID(), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion session/filesync/filesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestFileSyncIncludePatterns(t *testing.T) {
})

g.Go(func() (reterr error) {
c, err := m.Get(ctx, s.ID())
c, err := m.Get(ctx, s.ID(), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion session/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (sm *Manager) Any(ctx context.Context, g Group, f func(context.Context, str

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
c, err := sm.Get(timeoutCtx, id)
c, err := sm.Get(timeoutCtx, id, false)
if err != nil {
lastErr = err
continue
Expand Down
8 changes: 6 additions & 2 deletions session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (sm *Manager) handleConn(ctx context.Context, conn net.Conn, opts map[strin
}

// Get returns a session by ID
func (sm *Manager) Get(ctx context.Context, id string) (Caller, error) {
func (sm *Manager) Get(ctx context.Context, id string, noWait bool) (Caller, error) {
// session prefix is used to identify vertexes with different contexts so
// they would not collide, but for lookup we don't need the prefix
if p := strings.SplitN(id, ":", 2); len(p) == 2 && len(p[1]) > 0 {
Expand Down Expand Up @@ -180,14 +180,18 @@ func (sm *Manager) Get(ctx context.Context, id string) (Caller, error) {
}
var ok bool
c, ok = sm.sessions[id]
if !ok || c.closed() {
if (!ok || c.closed()) && !noWait {
sm.updateCondition.Wait()
continue
}
sm.mu.Unlock()
break
}

if c == nil {
return nil, nil
}

return c, nil
}

Expand Down
57 changes: 23 additions & 34 deletions source/containerimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,8 @@ func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.Re
}

res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) {
dgst, dt, err := imageutil.Config(ctx, ref, pull.NewResolver(g, pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, g),
ImageStore: is.ImageStore,
Mode: rm,
Ref: ref,
}), is.ContentStore, is.LeaseManager, opt.Platform)
res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g).WithImageStore(is.ImageStore, rm)
dgst, dt, err := imageutil.Config(ctx, ref, res, is.ContentStore, is.LeaseManager, opt.Platform)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,28 +112,30 @@ func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session
Src: imageIdentifier.Reference,
}
p := &puller{
CacheAccessor: is.CacheAccessor,
LeaseManager: is.LeaseManager,
Puller: pullerUtil,
id: imageIdentifier,
ResolverOpt: pull.ResolverOpt{
Hosts: is.RegistryHosts,
Auth: resolver.NewSessionAuthenticator(sm, nil),
ImageStore: is.ImageStore,
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
},
vtx: vtx,
CacheAccessor: is.CacheAccessor,
LeaseManager: is.LeaseManager,
Puller: pullerUtil,
id: imageIdentifier,
RegistryHosts: is.RegistryHosts,
ImageStore: is.ImageStore,
Mode: imageIdentifier.ResolveMode,
Ref: imageIdentifier.Reference.String(),
SessionManager: sm,
vtx: vtx,
}
return p, nil
}

type puller struct {
CacheAccessor cache.Accessor
LeaseManager leases.Manager
ResolverOpt pull.ResolverOpt
id *source.ImageIdentifier
vtx solver.Vertex
CacheAccessor cache.Accessor
LeaseManager leases.Manager
RegistryHosts docker.RegistryHosts
ImageStore images.Store
Mode source.ResolveMode
Ref string
SessionManager *session.Manager
id *source.ImageIdentifier
vtx solver.Vertex

cacheKeyOnce sync.Once
cacheKeyErr error
Expand Down Expand Up @@ -169,11 +166,7 @@ func mainManifestKey(ctx context.Context, desc specs.Descriptor, platform specs.
}

func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cacheKey string, cacheOpts solver.CacheOpts, cacheDone bool, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
p.Puller.Resolver = resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.id.ResolveMode)

p.cacheKeyOnce.Do(func() {
ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
Expand Down Expand Up @@ -253,11 +246,7 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach
}

func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.ImmutableRef, err error) {
if p.Puller.Resolver == nil {
p.Puller.Resolver = pull.NewResolver(g, p.ResolverOpt)
} else {
p.ResolverOpt.Auth.AddSession(g)
}
p.Puller.Resolver = resolver.DefaultPool.GetResolver(p.RegistryHosts, p.Ref, "pull", p.SessionManager, g).WithImageStore(p.ImageStore, p.id.ResolveMode)

if len(p.manifest.Remote.Descriptors) == 0 {
return nil, nil
Expand Down
3 changes: 0 additions & 3 deletions util/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
return nil, err
}

// workaround for gcr, authentication not supported on blob endpoints
EnsureManifestRequested(ctx, p.Resolver, p.ref)

platform := platforms.Only(p.Platform)

var mu sync.Mutex // images.Dispatch calls handlers in parallel
Expand Down
Loading

0 comments on commit 661cafc

Please sign in to comment.