From 3390d92aebd30bd5d80173abcf6b33652957ab4d Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Wed, 17 Jul 2024 22:34:06 -0700 Subject: [PATCH] bake: use shared session for local sources for multiple targets Detect cases where multiple bake targets would use the same local source. For such cases a separate session request is made in addition to session per target and local source is made available in that source as well. The new sessionID is sent with the request so the frontend can ask associate it with the local source it needs. The sources are still available in the main request session as well. This would be used if frontend ignores the local-sessionid parameter and makes sure that old version continue working. Signed-off-by: Tonis Tiigi --- build/build.go | 158 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/build/build.go b/build/build.go index 64e16998616c..d5d651d95c3d 100644 --- a/build/build.go +++ b/build/build.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "os" + "slices" "strconv" "strings" "sync" @@ -34,6 +35,7 @@ import ( gateway "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" + "github.com/moby/buildkit/session/filesync" "github.com/moby/buildkit/solver/errdefs" "github.com/moby/buildkit/solver/pb" spb "github.com/moby/buildkit/sourcepolicy/pb" @@ -44,6 +46,8 @@ import ( specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/tonistiigi/fsutil" + fstypes "github.com/tonistiigi/fsutil/types" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) @@ -296,6 +300,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s } } + sharedSessions, err := detectSharedMounts(ctx, reqForNodes) + if err != nil { + return nil, err + } + sharedSessionsWG := map[string]*sync.WaitGroup{} + resp = map[string]*client.SolveResponse{} var respMu sync.Mutex results := waitmap.New() @@ -360,7 +370,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s if err != nil { return err } + + var done func() + if sessions, ok := sharedSessions[node.Name]; ok { + wg, ok := sharedSessionsWG[node.Name] + if ok { + wg.Add(1) + } else { + wg = &sync.WaitGroup{} + wg.Add(1) + sharedSessionsWG[node.Name] = wg + for _, s := range sessions { + s := s + eg.Go(func() error { + return s.Run(baseCtx, c.Dialer()) + }) + } + go func() { + wg.Wait() + for _, s := range sessions { + s.Close() + } + }() + } + done = wg.Done + } + eg2.Go(func() error { + if done != nil { + defer done() + } + pw = progress.ResetTime(pw) if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil { @@ -791,6 +831,124 @@ func resultKey(index int, name string) string { return fmt.Sprintf("%d-%s", index, name) } +// detectSharedMounts looks for same local mounts used by multiple requests to the same node +// and creates a separate session that will be used by all detected requests. +func detectSharedMounts(ctx context.Context, reqs map[string][]*reqForNode) (_ map[string][]*session.Session, err error) { + type fsTracker struct { + fs fsutil.FS + so []*client.SolveOpt + } + type fsKey struct { + name string + dir string + } + + m := map[string]map[fsKey]*fsTracker{} + for _, reqs := range reqs { + for _, req := range reqs { + nodeName := req.resolvedNode.Node().Name + if _, ok := m[nodeName]; !ok { + m[nodeName] = map[fsKey]*fsTracker{} + } + fsMap := m[nodeName] + for name, m := range req.so.LocalMounts { + fs, ok := m.(*fs) + if !ok { + continue + } + key := fsKey{name: name, dir: fs.dir} + if _, ok := fsMap[key]; !ok { + fsMap[key] = &fsTracker{fs: fs.FS} + } + fsMap[key].so = append(fsMap[key].so, req.so) + } + } + } + + type sharedSession struct { + *session.Session + fsMap map[string]fsutil.FS + } + + sessionMap := map[string][]*sharedSession{} + + defer func() { + if err != nil { + for _, sessions := range sessionMap { + for _, s := range sessions { + s.Close() + } + } + } + }() + + for node, fsMap := range m { + for key, fs := range fsMap { + if len(fs.so) <= 1 { + continue + } + + sessions := sessionMap[node] + + // find session that doesn't have the fs name reserved + idx := slices.IndexFunc(sessions, func(s *sharedSession) bool { + _, ok := s.fsMap[key.name] + return !ok + }) + + var ss *sharedSession + if idx == -1 { + s, err := session.NewSession(ctx, "", fs.so[0].SharedKey) + if err != nil { + return nil, err + } + ss = &sharedSession{Session: s, fsMap: map[string]fsutil.FS{}} + sessions = append(sessions, ss) + sessionMap[node] = sessions + } else { + ss = sessions[idx] + } + + ss.fsMap[key.name] = fs.fs + for _, so := range fs.so { + if so.FrontendAttrs == nil { + so.FrontendAttrs = map[string]string{} + } + so.FrontendAttrs["local-sessionid:"+key.name] = ss.ID() + } + } + } + + resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult { + st.Uid = 0 + st.Gid = 0 + return fsutil.MapResultKeep + } + + // convert back to regular sessions + sessions := map[string][]*session.Session{} + for n, ss := range sessionMap { + arr := make([]*session.Session, 0, len(ss)) + for _, s := range ss { + arr = append(arr, s.Session) + + src := make(filesync.StaticDirSource, len(s.fsMap)) + for name, fs := range s.fsMap { + fs, err := fsutil.NewFilterFS(fs, &fsutil.FilterOpt{ + Map: resetUIDAndGID, + }) + if err != nil { + return nil, err + } + src[name] = fs + } + s.Allow(filesync.NewFSSyncProvider(src)) + } + sessions[n] = arr + } + return sessions, nil +} + // calculateChildTargets returns all the targets that depend on current target for reverse index func calculateChildTargets(reqs map[string][]*reqForNode, opt map[string]Options) map[string][]string { out := make(map[string][]string)