Skip to content

Commit

Permalink
llb: avoid concurrent map write on parallel marshal
Browse files Browse the repository at this point in the history
Calling marshal changes the internal state of the op, for example
addCap() helper adds capability constraints. These can race with
same map being read by another Marshal call. Locking the Marshal
function itself also makes sure that the cache is not recomputed
in this case.

Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Dec 7, 2024
1 parent 409f7b4 commit 30413b5
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 26 deletions.
1 change: 0 additions & 1 deletion client/llb/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// For example, after marshalling a LLB state and sending over the wire, the
// LLB state can be reconstructed from the definition.
type DefinitionOp struct {
MarshalCache
mu sync.Mutex
ops map[digest.Digest]*pb.Op
defs map[digest.Digest][]byte
Expand Down
9 changes: 6 additions & 3 deletions client/llb/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type DiffOp struct {
MarshalCache
cache MarshalCache
lower Output
upper Output
output Output
Expand All @@ -31,7 +31,10 @@ func (m *DiffOp) Validate(ctx context.Context, constraints *Constraints) error {
}

func (m *DiffOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := m.Load(constraints); err == nil {
cache := m.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}
if err := m.Validate(ctx, constraints); err != nil {
Expand Down Expand Up @@ -72,7 +75,7 @@ func (m *DiffOp) Marshal(ctx context.Context, constraints *Constraints) (digest.
return "", nil, nil, nil, err
}

return m.Store(dt, md, m.constraints.SourceLocations, constraints)
return cache.Store(dt, md, m.constraints.SourceLocations, constraints)
}

func (m *DiffOp) Output() Output {
Expand Down
14 changes: 10 additions & 4 deletions client/llb/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type mount struct {
}

type ExecOp struct {
MarshalCache
cache MarshalCache
proxyEnv *ProxyEnv
root Output
mounts []*mount
Expand All @@ -63,6 +63,9 @@ type ExecOp struct {
}

func (e *ExecOp) AddMount(target string, source Output, opt ...MountOption) Output {
cache := e.cache.Acquire()
defer cache.Release()

m := &mount{
target: target,
source: source,
Expand All @@ -84,7 +87,7 @@ func (e *ExecOp) AddMount(target string, source Output, opt ...MountOption) Outp
}
m.output = o
}
e.Store(nil, nil, nil, nil)
cache.Store(nil, nil, nil, nil)
e.isValidated = false
return m.output
}
Expand Down Expand Up @@ -128,7 +131,10 @@ func (e *ExecOp) Validate(ctx context.Context, c *Constraints) error {
}

func (e *ExecOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := e.Load(c); err == nil {
cache := e.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -446,7 +452,7 @@ func (e *ExecOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []
if err != nil {
return "", nil, nil, nil, err
}
return e.Store(dt, md, e.constraints.SourceLocations, c)
return cache.Store(dt, md, e.constraints.SourceLocations, c)
}

func (e *ExecOp) Output() Output {
Expand Down
7 changes: 5 additions & 2 deletions client/llb/fileop.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,10 @@ func (ms *marshalState) add(fa *FileAction, c *Constraints) (*fileActionState, e
}

func (f *FileOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := f.Load(c); err == nil {
cache := f.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -816,7 +819,7 @@ func (f *FileOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []
if err != nil {
return "", nil, nil, nil, err
}
return f.Store(dt, md, f.constraints.SourceLocations, c)
return cache.Store(dt, md, f.constraints.SourceLocations, c)
}

func normalizePath(parent, p string, keepSlash bool) string {
Expand Down
13 changes: 13 additions & 0 deletions client/llb/fileop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestFileMkdir(t *testing.T) {
Expand Down Expand Up @@ -737,3 +738,15 @@ func TestFileOpMarshalConsistency(t *testing.T) {
prevDef = def.Def
}
}

func TestParallelMarshal(t *testing.T) {
st := Scratch().File(Mkfile("/tmp", 0644, []byte("tmp 1")))
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 100; i++ {
eg.Go(func() error {
_, err := st.Marshal(ctx)
return err
})
}
require.NoError(t, eg.Wait())
}
9 changes: 6 additions & 3 deletions client/llb/llbbuild/llbbuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewBuildOp(source llb.Output, opt ...BuildOption) llb.Vertex {
}

type build struct {
llb.MarshalCache
cache llb.MarshalCache
source llb.Output
info *BuildInfo
constraints llb.Constraints
Expand All @@ -47,7 +47,10 @@ func (b *build) Validate(context.Context, *llb.Constraints) error {
}

func (b *build) Marshal(ctx context.Context, c *llb.Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*llb.SourceLocation, error) {
if dgst, dt, md, srcs, err := b.Load(c); err == nil {
cache := b.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -85,7 +88,7 @@ func (b *build) Marshal(ctx context.Context, c *llb.Constraints) (digest.Digest,
if err != nil {
return "", nil, nil, nil, err
}
return b.Store(dt, md, b.constraints.SourceLocations, c)
return cache.Store(dt, md, b.constraints.SourceLocations, c)
}

func (b *build) Output() llb.Output {
Expand Down
29 changes: 22 additions & 7 deletions client/llb/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,45 @@ func MarshalConstraints(base, override *Constraints) (*pb.Op, *pb.OpMetadata) {
}

type MarshalCache struct {
cache sync.Map
mu sync.Mutex
cache map[*Constraints]*marshalCacheResult
}

func (mc *MarshalCache) Load(c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
v, ok := mc.cache.Load(c)
type MarshalCacheInstance struct {
*MarshalCache
}

func (mc *MarshalCache) Acquire() *MarshalCacheInstance {
mc.mu.Lock()
return &MarshalCacheInstance{mc}
}

func (mc *MarshalCacheInstance) Load(c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
res, ok := mc.cache[c]
if !ok {
return "", nil, nil, nil, cerrdefs.ErrNotFound
}

res := v.(*marshalCacheResult)
return res.digest, res.dt, res.md, res.srcs, nil
}

func (mc *MarshalCache) Store(dt []byte, md *pb.OpMetadata, srcs []*SourceLocation, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
func (mc *MarshalCacheInstance) Store(dt []byte, md *pb.OpMetadata, srcs []*SourceLocation, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
res := &marshalCacheResult{
digest: digest.FromBytes(dt),
dt: dt,
md: md,
srcs: srcs,
}
mc.cache.Store(c, res)
if mc.cache == nil {
mc.cache = make(map[*Constraints]*marshalCacheResult)
}
mc.cache[c] = res
return res.digest, res.dt, res.md, res.srcs, nil
}

func (mc *MarshalCacheInstance) Release() {
mc.mu.Unlock()
}

type marshalCacheResult struct {
digest digest.Digest
dt []byte
Expand Down
9 changes: 6 additions & 3 deletions client/llb/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type MergeOp struct {
MarshalCache
cache MarshalCache
inputs []Output
output Output
constraints Constraints
Expand All @@ -32,7 +32,10 @@ func (m *MergeOp) Validate(ctx context.Context, constraints *Constraints) error
}

func (m *MergeOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := m.Load(constraints); err == nil {
cache := m.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}

Expand All @@ -59,7 +62,7 @@ func (m *MergeOp) Marshal(ctx context.Context, constraints *Constraints) (digest
return "", nil, nil, nil, err
}

return m.Store(dt, md, m.constraints.SourceLocations, constraints)
return cache.Store(dt, md, m.constraints.SourceLocations, constraints)
}

func (m *MergeOp) Output() Output {
Expand Down
9 changes: 6 additions & 3 deletions client/llb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

type SourceOp struct {
MarshalCache
cache MarshalCache
id string
attrs map[string]string
output Output
Expand Down Expand Up @@ -49,7 +49,10 @@ func (s *SourceOp) Validate(ctx context.Context, c *Constraints) error {
}

func (s *SourceOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := s.Load(constraints); err == nil {
cache := s.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -82,7 +85,7 @@ func (s *SourceOp) Marshal(ctx context.Context, constraints *Constraints) (diges
return "", nil, nil, nil, err
}

return s.Store(dt, md, s.constraints.SourceLocations, constraints)
return cache.Store(dt, md, s.constraints.SourceLocations, constraints)
}

func (s *SourceOp) Output() Output {
Expand Down

0 comments on commit 30413b5

Please sign in to comment.