-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathplan.go
244 lines (228 loc) · 6.9 KB
/
plan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package scplan
import (
"sort"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scgraph"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan/deprules"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan/opgen"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/errors"
)
// Params holds the arguments for planning.
type Params struct {
// ExecutionPhase indicates the phase that the plan should be constructed for.
ExecutionPhase scop.Phase
// CreatedDescriptorIDs contains IDs for new descriptors created by the same
// schema changer (i.e., earlier in the same transaction). New descriptors
// can have most of their schema changes fully executed in the same
// transaction.
//
// This doesn't do anything right now.
CreatedDescriptorIDs catalog.DescriptorIDSet
}
// A Plan is a schema change plan, primarily containing ops to be executed that
// are partitioned into stages.
type Plan struct {
Params Params
Initial scpb.State
Graph *scgraph.Graph
Stages []Stage
}
// A Stage is a sequence of ops to be executed "together" as part of a schema
// change.
//
// Stages also contain the state before and after the execution of the ops in
// the stage, reflecting the fact that any set of ops can be thought of as a
// transition from one state to another.
type Stage struct {
Before, After scpb.State
Ops scop.Ops
Revertible bool
}
// MakePlan generates a Plan for a particular phase of a schema change, given
// the initial state for a set of targets.
func MakePlan(initial scpb.State, params Params) (_ Plan, err error) {
defer func() {
if r := recover(); r != nil {
rAsErr, ok := r.(error)
if !ok {
rAsErr = errors.Errorf("panic during MakePlan: %v", r)
}
err = errors.CombineErrors(err, rAsErr)
}
}()
g, err := opgen.BuildGraph(params.ExecutionPhase, initial)
if err != nil {
return Plan{}, err
}
if err := deprules.Apply(g); err != nil {
return Plan{}, err
}
stages := buildStages(initial, g, params)
return Plan{
Params: params,
Initial: initial,
Graph: g,
Stages: stages,
}, nil
}
// validateStages sanity checks stages to ensure no
// invalid execution plans are made.
func validateStages(stages []Stage) {
revertibleAllowed := true
for idx, stage := range stages {
if !stage.Revertible {
revertibleAllowed = false
}
if stage.Revertible && !revertibleAllowed {
panic(errors.AssertionFailedf(
"invalid execution plan revertability flipped at stage (%d): %v", idx, stage))
}
}
}
func buildStages(init scpb.State, g *scgraph.Graph, params Params) []Stage {
// Fetch the order of the graph, which will be used to
// evaluating edges in topological order.
nodeRanks := g.GetNodeRanks()
// TODO(ajwerner): deal with the case where the target status was
// fulfilled by something that preceded the initial state.
cur := init
fulfilled := map[*scpb.Node]struct{}{}
filterUnsatisfiedEdgesStep := func(edges []*scgraph.OpEdge) ([]*scgraph.OpEdge, bool) {
candidates := make(map[*scpb.Node]struct{})
for _, e := range edges {
candidates[e.To()] = struct{}{}
}
// Check to see if the current set of edges will have their dependencies met
// if they are all run. Any which will not must be pruned. This greedy
// algorithm works, but a justification is in order.
failed := map[*scgraph.OpEdge]struct{}{}
for _, e := range edges {
_ = g.ForEachDepEdgeFrom(e.To(), func(de *scgraph.DepEdge) error {
_, isFulfilled := fulfilled[de.To()]
_, isCandidate := candidates[de.To()]
if isFulfilled || isCandidate {
return nil
}
failed[e] = struct{}{}
return iterutil.StopIteration()
})
}
if len(failed) == 0 {
return edges, true
}
truncated := edges[:0]
for _, e := range edges {
if _, found := failed[e]; !found {
truncated = append(truncated, e)
}
}
return truncated, false
}
filterUnsatisfiedEdges := func(edges []*scgraph.OpEdge) ([]*scgraph.OpEdge, bool) {
for len(edges) > 0 {
if filtered, done := filterUnsatisfiedEdgesStep(edges); !done {
edges = filtered
} else {
return filtered, true
}
}
return edges, false
}
buildStageType := func(edges []*scgraph.OpEdge) (Stage, bool) {
edges, ok := filterUnsatisfiedEdges(edges)
if !ok {
return Stage{}, false
}
sort.SliceStable(edges,
func(i, j int) bool {
// Higher ranked edges should go first.
return nodeRanks[edges[i].To()] > nodeRanks[edges[j].To()]
})
next := append(cur[:0:0], cur...)
isStageRevertible := true
var ops []scop.Op
for revertible := 1; revertible >= 0; revertible-- {
isStageRevertible = revertible == 1
// FIXME: Flip the loop
for _, e := range edges {
for i, ts := range cur {
if e.From() == ts && isStageRevertible == e.Revertible() {
next[i] = e.To()
ops = append(ops, e.Op()...)
break
}
}
}
// If we added non-revertible stages
// then this stage is done
if len(ops) != 0 {
break
}
}
return Stage{
Before: cur,
After: next,
Ops: scop.MakeOps(ops...),
Revertible: isStageRevertible,
}, true
}
var stages []Stage
for {
// Note that the current nodes are fulfilled for the sake of dependency
// checking.
for _, ts := range cur {
fulfilled[ts] = struct{}{}
}
// Extract the set of op edges for the current stage.
var opEdges []*scgraph.OpEdge
for _, t := range cur {
// TODO(ajwerner): improve the efficiency of this lookup.
// Look for an opEdge from this node. Then, for the other side
// of the opEdge, look for dependencies.
if oe, ok := g.GetOpEdgeFrom(t); ok {
opEdges = append(opEdges, oe)
}
}
// Group the op edges a per-type basis.
opTypes := make(map[scop.Type][]*scgraph.OpEdge)
for _, oe := range opEdges {
for _, op := range oe.Op() {
opTypes[op.Type()] = append(opTypes[op.Type()], oe)
}
}
// Greedily attempt to find a stage which can be executed. This is sane
// because once a dependency is met, it never becomes unmet.
var didSomething bool
var s Stage
for _, typ := range []scop.Type{
scop.MutationType,
scop.BackfillType,
scop.ValidationType,
} {
if s, didSomething = buildStageType(opTypes[typ]); didSomething {
break
}
}
if !didSomething {
break
}
// Sort ops based on graph dependencies.
//SortOps(g, s.Ops.Slice())
stages = append(stages, s)
cur = s.After
}
validateStages(stages)
return stages
}