Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solver: prevent edge merge to inactive states #4887

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,60 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) {
targetEdge.takeOwnership(e)

if targetState != nil {
targetState.addJobs(s, map[*state]struct{}{})

if _, ok := targetState.allPw[s.mpw]; !ok {
targetState.mpw.Add(s.mpw)
targetState.allPw[s.mpw] = struct{}{}
}
}
}

// addJobs recursively adds jobs to state and all its ancestors. currently
// only used during edge merges to add jobs from the source of the merge to the
// target and its ancestors.
// requires that Solver.mu is read-locked and srcState.mu is locked
func (s *state) addJobs(srcState *state, memo map[*state]struct{}) {
if _, ok := memo[s]; ok {
return
}
memo[s] = struct{}{}

s.mu.Lock()
defer s.mu.Unlock()

for j := range srcState.jobs {
s.jobs[j] = struct{}{}
}

for _, inputEdge := range s.vtx.Inputs() {
inputState, ok := s.solver.actives[inputEdge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", inputEdge.Vertex.Digest()).
Error("input vertex not found during addJobs")
continue
}
inputState.addJobs(srcState, memo)

// tricky case: if the inputState's edge was *already* merged we should
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this case actually produces inputState != mergedInputState, is there even a need to bother with the inputState?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided that the whole codebase strictly uses getEdge/getState (and thus always uses the merge target if a merge happened), then yeah you could probably skip inputState.addJobs and just unconditionally call inputState.getEdge and use the return of that.

I'm not sure if that's true (haven't audited everything), do you happen to know if it is?

// also add jobs to the merged edge's state
mergedInputEdge := inputState.getEdge(inputEdge.Index)
if mergedInputEdge == nil || mergedInputEdge.edge.Vertex.Digest() == inputEdge.Vertex.Digest() {
// not merged
continue
}
mergedInputState, ok := s.solver.actives[mergedInputEdge.edge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", mergedInputEdge.edge.Vertex.Digest()).
Error("merged input vertex not found during addJobs")
continue
}
mergedInputState.addJobs(srcState, memo)
}
}

func (s *state) combinedCacheManager() CacheManager {
s.mu.Lock()
cms := make([]CacheManager, 0, len(s.cache)+1)
Expand Down Expand Up @@ -470,16 +517,25 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca
if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
WithField("vertex_digest", v.Digest()).
WithField("actives_digest_key", dgst)
if j != nil {
lg = lg.WithField("job", j.id)
}
lg.Debug("adding active vertex")
for i, inp := range v.Inputs() {
lg.WithField("input_index", i).
WithField("input_vertex_name", inp.Vertex.Name()).
WithField("input_vertex_digest", inp.Vertex.Digest()).
WithField("input_edge_index", inp.Index).
Debug("new active vertex input")
}
}
} else if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
WithField("vertex_digest", v.Digest()).
WithField("actives_digest_key", dgst)
if j != nil {
lg = lg.WithField("job", j.id)
}
Expand All @@ -499,17 +555,6 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca
if _, ok := st.jobs[j]; !ok {
st.jobs[j] = struct{}{}
}
if debugScheduler {
jobIDs := make([]string, 0, len(st.jobs))
for j := range st.jobs {
jobIDs = append(jobIDs, j.id)
}
bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest()).
WithField("jobs", jobIDs).
Debug("current jobs for vertex")
}
}
st.mu.Unlock()

Expand Down
72 changes: 60 additions & 12 deletions solver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,38 +423,77 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro
}

func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) {
log := bklog.G(context.TODO())

log.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest())
log := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index)

log.
WithField("edge_state", e.state).
WithField("req", len(inc)).
WithField("upt", len(updates)).
WithField("out", len(allPipes)).
Debug(">> unpark")

for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().(*edgeRequest).desiredState
}
log.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v preprocessfunc=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil, e.preprocessFunc(dep) != nil)
log.
WithField("dep_index", i).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[i].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[i].Vertex.Digest()).
WithField("dep_state", dep.state).
WithField("dep_desired_state", des).
WithField("dep_keys", len(dep.keys)).
WithField("dep_has_slow_cache", e.slowCacheFunc(dep) != nil).
WithField("dep_preprocess_func", e.preprocessFunc(dep) != nil).
Debug(":: dep")
}

for i, in := range inc {
req := in.Request()
log.Debugf("> incoming-%d: %p dstate=%s canceled=%v", i, in, req.Payload.(*edgeRequest).desiredState, req.Canceled)
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState).
WithField("incoming_canceled", req.Canceled).
Debug("> incoming")
}

for i, up := range updates {
if up == e.cacheMapReq {
log.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update cacheMapReq")
} else if up == e.execReq {
log.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update execReq")
} else {
st, ok := up.Status().Value.(*edgeState)
if ok {
index := -1
if dep, ok := e.depRequests[up]; ok {
index = int(dep.index)
}
log.Debugf("> update-%d: %p input-%d keys=%d state=%s", i, up, index, len(st.keys), st.state)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
WithField("update_input_index", index).
WithField("update_keys", len(st.keys)).
WithField("update_state", st.state).
Debugf("> update edgeState")
} else {
log.Debugf("> update-%d: unknown", i)
log.
WithField("update_index", i).
Debug("> update unknown")
}
}
}
Expand All @@ -463,7 +502,16 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) {
log := bklog.G(context.TODO())
for i, in := range inc {
log.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed)
}
log.Debugf("<< unpark %s\n", e.edge.Vertex.Name())
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_complete", in.Status().Completed).
Debug("< incoming")
}
log.
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("edge_state", e.state).
Debug("<< unpark")
}
134 changes: 134 additions & 0 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3447,6 +3447,140 @@ func TestUnknownBuildID(t *testing.T) {
require.Contains(t, err.Error(), "no such job")
}

func TestStaleEdgeMerge(t *testing.T) {
// should not be possible to merge to an edge no longer in the actives map
t.Parallel()
ctx := context.TODO()

s := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
})
defer s.Close()

depV0 := vtxConst(1, vtxOpt{name: "depV0"})
depV1 := vtxConst(1, vtxOpt{name: "depV1"})
depV2 := vtxConst(1, vtxOpt{name: "depV2"})

// These should all end up edge merged
v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{
{Vertex: depV0},
}})
v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{
{Vertex: depV1},
}})
v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{
{Vertex: depV2},
}})

j0, err := s.NewJob("job0")
require.NoError(t, err)
g0 := Edge{Vertex: v0}
res, err := j0.Build(ctx, g0)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)

// this edge should be merged with the one from j0
j1, err := s.NewJob("job1")
require.NoError(t, err)
g1 := Edge{Vertex: v1}
res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// discard j0, verify that v0 is still active and it's state contains j1 since j1's
// edge was merged to v0's state
require.NoError(t, j0.Discard())

require.Contains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// verify another job can still merge
j2, err := s.NewJob("job2")
require.NoError(t, err)
g2 := Edge{Vertex: v2}
res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.Contains(t, s.actives, v1.Digest())
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.NotContains(t, s.actives[v1.Digest()].jobs, j2)
require.Contains(t, s.actives, depV1.Digest())
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)
require.NotContains(t, s.actives[depV1.Digest()].jobs, j2)

require.Contains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives[v2.Digest()].jobs, j1)
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.NotContains(t, s.actives[depV2.Digest()].jobs, j1)
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard j1, verify only referenced edges still exist
require.NoError(t, j1.Discard())

require.Contains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, depV1.Digest())

require.Contains(t, s.actives, v2.Digest())
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard the last job and verify everything was removed now
require.NoError(t, j2.Discard())
require.NotContains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives, depV2.Digest())
}

func generateSubGraph(nodes int) (Edge, int) {
if nodes == 1 {
value := rand.Int() % 500 //nolint:gosec
Expand Down