Skip to content

Commit

Permalink
bake: use shared session for local sources for multiple targets
Browse files Browse the repository at this point in the history
Detect cases where multiple bake targets would use the same
local source. For such cases a separate session request is
made in addition to session per target and local source
is made available in that source as well.

The new sessionID is sent with the request so the frontend
can ask associate it with the local source it needs.

The sources are still available in the main request session
as well. This would be used if frontend ignores the local-sessionid
parameter and makes sure that old version continue working.

Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Jul 19, 2024
1 parent bf41d2f commit 3390d92
Showing 1 changed file with 158 additions and 0 deletions.
158 changes: 158 additions & 0 deletions build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"os"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -34,6 +35,7 @@ import (
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
spb "github.com/moby/buildkit/sourcepolicy/pb"
Expand All @@ -44,6 +46,8 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -296,6 +300,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
}
}

sharedSessions, err := detectSharedMounts(ctx, reqForNodes)
if err != nil {
return nil, err
}
sharedSessionsWG := map[string]*sync.WaitGroup{}

resp = map[string]*client.SolveResponse{}
var respMu sync.Mutex
results := waitmap.New()
Expand Down Expand Up @@ -360,7 +370,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
if err != nil {
return err
}

var done func()
if sessions, ok := sharedSessions[node.Name]; ok {
wg, ok := sharedSessionsWG[node.Name]
if ok {
wg.Add(1)
} else {
wg = &sync.WaitGroup{}
wg.Add(1)
sharedSessionsWG[node.Name] = wg
for _, s := range sessions {
s := s
eg.Go(func() error {
return s.Run(baseCtx, c.Dialer())
})
}
go func() {
wg.Wait()
for _, s := range sessions {
s.Close()
}
}()
}
done = wg.Done
}

eg2.Go(func() error {
if done != nil {
defer done()
}

pw = progress.ResetTime(pw)

if err := waitContextDeps(ctx, dp.driverIndex, results, so); err != nil {
Expand Down Expand Up @@ -791,6 +831,124 @@ func resultKey(index int, name string) string {
return fmt.Sprintf("%d-%s", index, name)
}

// detectSharedMounts looks for same local mounts used by multiple requests to the same node
// and creates a separate session that will be used by all detected requests.
func detectSharedMounts(ctx context.Context, reqs map[string][]*reqForNode) (_ map[string][]*session.Session, err error) {
type fsTracker struct {
fs fsutil.FS
so []*client.SolveOpt
}
type fsKey struct {
name string
dir string
}

m := map[string]map[fsKey]*fsTracker{}
for _, reqs := range reqs {
for _, req := range reqs {
nodeName := req.resolvedNode.Node().Name
if _, ok := m[nodeName]; !ok {
m[nodeName] = map[fsKey]*fsTracker{}
}
fsMap := m[nodeName]
for name, m := range req.so.LocalMounts {
fs, ok := m.(*fs)
if !ok {
continue
}
key := fsKey{name: name, dir: fs.dir}
if _, ok := fsMap[key]; !ok {
fsMap[key] = &fsTracker{fs: fs.FS}
}
fsMap[key].so = append(fsMap[key].so, req.so)
}
}
}

type sharedSession struct {
*session.Session
fsMap map[string]fsutil.FS
}

sessionMap := map[string][]*sharedSession{}

defer func() {
if err != nil {
for _, sessions := range sessionMap {
for _, s := range sessions {
s.Close()
}
}
}
}()

for node, fsMap := range m {
for key, fs := range fsMap {
if len(fs.so) <= 1 {
continue
}

sessions := sessionMap[node]

// find session that doesn't have the fs name reserved
idx := slices.IndexFunc(sessions, func(s *sharedSession) bool {
_, ok := s.fsMap[key.name]
return !ok
})

var ss *sharedSession
if idx == -1 {
s, err := session.NewSession(ctx, "", fs.so[0].SharedKey)
if err != nil {
return nil, err
}
ss = &sharedSession{Session: s, fsMap: map[string]fsutil.FS{}}
sessions = append(sessions, ss)
sessionMap[node] = sessions
} else {
ss = sessions[idx]
}

ss.fsMap[key.name] = fs.fs
for _, so := range fs.so {
if so.FrontendAttrs == nil {
so.FrontendAttrs = map[string]string{}
}
so.FrontendAttrs["local-sessionid:"+key.name] = ss.ID()
}
}
}

resetUIDAndGID := func(p string, st *fstypes.Stat) fsutil.MapResult {
st.Uid = 0
st.Gid = 0
return fsutil.MapResultKeep
}

// convert back to regular sessions
sessions := map[string][]*session.Session{}
for n, ss := range sessionMap {
arr := make([]*session.Session, 0, len(ss))
for _, s := range ss {
arr = append(arr, s.Session)

src := make(filesync.StaticDirSource, len(s.fsMap))
for name, fs := range s.fsMap {
fs, err := fsutil.NewFilterFS(fs, &fsutil.FilterOpt{
Map: resetUIDAndGID,
})
if err != nil {
return nil, err
}
src[name] = fs
}
s.Allow(filesync.NewFSSyncProvider(src))
}
sessions[n] = arr
}
return sessions, nil
}

// calculateChildTargets returns all the targets that depend on current target for reverse index
func calculateChildTargets(reqs map[string][]*reqForNode, opt map[string]Options) map[string][]string {
out := make(map[string][]string)
Expand Down

0 comments on commit 3390d92

Please sign in to comment.