-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
task.go
363 lines (331 loc) · 14.9 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package apply
import (
"context"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3/raftpb"
)
// StateMachine represents an instance of a replicated state machine being
// driven by a replication group. The state machine accepts Commands that
// have been committed to the replication group's log and applies them to
// advance to a new state.
//
// All state transitions performed by the state machine are expected to be
// deterministic, which ensures that if each instance is driven from the
// same consistent shared log, they will all stay in sync.
//
// The implementation may not be and commonly is not thread safe.
type StateMachine interface {
// NewEphemeralBatch creates an EphemeralBatch. This kind of batch is not able
// to make changes to the StateMachine, but can be used for the purpose of
// checking commands to determine whether they will be rejected or not when
// staged in a real Batch. The principal user of ephemeral batches is
// AckCommittedEntriesBeforeApplication.
//
// There must only be a single EphemeralBatch *or* Batch open at any given
// point in time.
NewEphemeralBatch() EphemeralBatch
// NewBatch creates a new batch that is suitable for accumulating the effects
// that a group of Commands will have on the replicated state machine.
// Commands are staged in the batch one-by-one and then the entire batch is
// applied to the StateMachine at once via its ApplyToStateMachine method.
//
// There must only be a single EphemeralBatch *or* Batch open at any given
// point in time.
NewBatch() Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
// that the commands are committed to the state machine's log. Once the
// in-memory side-effects of the Command are applied, an AppliedCommand
// is returned so that it can be finished and acknowledged.
//
// The method will always be called with a Command that has been checked
// and whose persistent state transition has been applied to the state
// machine. Because this method is called after applying the persistent
// state transition for a Command, it may not be called in the case of
// an untimely crash. This means that applying these side-effects will
// typically update the in-memory representation of the state machine
// to the same state that it would be in if the process restarted.
ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error)
}
// ErrRemoved can be returned from ApplySideEffects which will stop the task
// from processing more commands and return immediately. The error should
// only be thrown by non-trivial commands.
var ErrRemoved = errors.New("replica removed")
// An EphemeralBatch can stage a number of commands, but lacks the ability
// to apply them to a state machine.
type EphemeralBatch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
//
// TODO(tbg): consider renaming this to Add, so that in implementations
// of this we less unambiguously refer to "staging" commands into the
// pebble batch.
Stage(context.Context, Command) (CheckedCommand, error)
// Close closes the batch and releases any resources that it holds.
Close()
}
// Batch accumulates a series of updates from Commands and performs them
// all at once to its StateMachine when applied. Groups of Commands will be
// staged in the Batch such that one or more trivial Commands are staged or
// exactly one non-trivial Command is staged.
type Batch interface {
EphemeralBatch
// ApplyToStateMachine applies the persistent state transitions staged
// in the Batch to the StateMachine, atomically.
//
// Command application can not resume in the event of an error, that is,
// the surrounding StateMachine must be considered defunct.
ApplyToStateMachine(context.Context) error
}
// Decoder is capable of decoding a list of committed raft entries and
// binding any that were locally proposed to their local proposals.
type Decoder interface {
// DecodeAndBind decodes each of the provided raft entries into commands
// and binds any that were proposed locally to their local proposals.
// The method must only be called once per Decoder. It returns whether
// any of the commands were bound to local proposals waiting for
// acknowledgement.
DecodeAndBind(context.Context, []raftpb.Entry) (anyLocal bool, _ error)
// NewCommandIter creates an iterator over the replicated commands that
// were passed to DecodeAndBind. The method must not be called until
// after DecodeAndBind is called.
NewCommandIter() CommandIterator
// Reset resets the Decoder and releases any resources that it holds.
Reset()
}
// Task is an object capable of coordinating the application of commands to
// a replicated state machine after they have been durably committed to a
// raft log.
//
// Committed raft entries are provided to the task through its Decode
// method. The task will then apply these entries to the provided state
// machine when ApplyCommittedEntries is called.
type Task struct {
sm StateMachine
dec Decoder
// Have entries been decoded yet?
decoded bool
// Were any of the decoded commands locally proposed?
anyLocal bool
// The maximum number of commands that can be applied in a batch.
batchSize int32
}
// MakeTask creates a new task with the provided state machine and decoder.
func MakeTask(sm StateMachine, dec Decoder) Task {
return Task{sm: sm, dec: dec}
}
// Decode decodes the committed raft entries into commands and prepared for the
// commands to be applied to the replicated state machine.
func (t *Task) Decode(ctx context.Context, committedEntries []raftpb.Entry) error {
var err error
t.anyLocal, err = t.dec.DecodeAndBind(ctx, committedEntries)
t.decoded = true
return err
}
func (t *Task) assertDecoded() {
if !t.decoded {
panic("Task.Decode not called yet")
}
}
// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of
// raft entries that have been durably committed to the raft log but have not
// yet been applied to the proposer replica's replicated state machine.
//
// This is safe because a proposal through raft can be known to have succeeded
// as soon as it is durably replicated to a quorum of replicas (i.e. has
// committed in the raft log). The proposal does not need to wait for the
// effects of the proposal to be applied in order to know whether its changes
// will succeed or fail. This is because the raft log is the provider of
// atomicity and durability for replicated writes, not (ignoring log
// truncation) the replicated state machine itself.
//
// However, there are a few complications to acknowledging the success of a
// proposal at this stage:
//
// 1. Committing an entry in the raft log and having the command in that entry
// succeed are similar but not equivalent concepts. Even if the entry succeeds
// in achieving durability by replicating to a quorum of replicas, its command
// may still be rejected "beneath raft". This means that a (deterministic)
// check after replication decides that the command will not be applied to the
// replicated state machine. In that case, the client waiting on the result of
// the command should not be informed of its success. Luckily, this check is
// cheap to perform so we can do it here and when applying the command.
//
// Determining whether the command will succeed or be rejected before applying
// it for real is accomplished using an ephemeral batch. Commands are staged in
// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged
// immediately even though the ephemeral batch itself cannot be used to update
// the durable state machine. Once the rejection status of each command is
// determined, any successful commands that permit acknowledgement before
// application (see CanAckBeforeApplication) are acknowledged. The ephemeral
// batch is then thrown away.
//
// 2. Some commands perform non-trivial work such as updating Replica configuration
// state or performing Range splits. In those cases, it's likely that the client
// is interested in not only knowing whether it has succeeded in sequencing the
// change in the raft log, but also in knowing when the change has gone into
// effect. There's currently no exposed hook to ask for an acknowledgement only
// after a command has been applied, so for simplicity the current implementation
// only ever acks transactional writes before they have gone into effect. All
// other commands wait until they have been applied to ack their client.
//
// 3. Even though we can determine whether a command has succeeded without applying
// it, the effect of the command will not be visible to conflicting commands until
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. Note that when catching up a follower that is behind, the (etcd/raft)
// leader will emit an MsgApp with a commit index that encompasses the entries
// in the MsgApp, and Ready() will expose these as present in both the Entries
// and CommittedEntries slices (i.e. append and apply). We don't ack these
// early - the caller will pass the "old" last index in.
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
}
// Create a new ephemeral application batch. All we're interested in is
// whether commands will be rejected or not when staged in a real batch.
batch := t.sm.NewEphemeralBatch()
defer batch.Close()
iter := t.dec.NewCommandIter()
defer iter.Close()
// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})
// Stage the commands in the (ephemeral) batch.
stagedIter, err := mapCmdIter(batchIter, batch.Stage)
if err != nil {
return err
}
// Acknowledge any locally-proposed commands that succeeded in being staged
// in the batch and can be acknowledged before they are actually applied.
// Don't acknowledge rejected proposals early because the StateMachine may
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess(cmd.Ctx())
}
return nil
})
}
// SetMaxBatchSize sets the maximum application batch size. If 0, no limit
// will be placed on the number of commands that can be applied in a batch.
func (t *Task) SetMaxBatchSize(size int) {
t.batchSize = int32(size)
}
// ApplyCommittedEntries applies raft entries that have been committed to the
// raft log but have not yet been applied to the replicated state machine.
func (t *Task) ApplyCommittedEntries(ctx context.Context) error {
t.assertDecoded()
iter := t.dec.NewCommandIter()
for iter.Valid() {
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
}
}
}
iter.Close()
return nil
}
// applyOneBatch consumes a batch-worth of commands from the provided iter and
// applies them atomically to the StateMachine. A batch will contain either:
// a) one or more trivial commands
// b) exactly one non-trivial command
func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error {
// Create a new application batch.
batch := t.sm.NewBatch()
defer batch.Close()
// Consume a batch-worth of commands.
pol := trivialPolicy{maxCount: t.batchSize}
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
return pol.maybeAdd(cmd.IsTrivial())
})
// Stage each command in the batch.
stagedIter, err := mapCmdIter(batchIter, batch.Stage)
if err != nil {
return err
}
// Apply the persistent state transitions to the state machine.
if err := batch.ApplyToStateMachine(ctx); err != nil {
return err
}
// Apply the side-effects of each command to the state machine.
appliedIter, err := mapCheckedCmdIter(stagedIter, t.sm.ApplySideEffects)
if err != nil {
return err
}
// Finish and acknowledge the outcome of each command.
return forEachAppliedCmdIter(ctx, appliedIter, AppliedCommand.AckOutcomeAndFinish)
}
// trivialPolicy encodes a batching policy that allows a batch to consist of
// either one or more trivial commands or exactly one non-trivial command.
type trivialPolicy struct {
maxCount int32
trivialCount int32
nonTrivialCount int32
}
// maybeAdd returns whether a command with the specified triviality should be
// added to a batch given the batching policy. If the method returns true, the
// command is considered to have been added.
func (p *trivialPolicy) maybeAdd(trivial bool) bool {
if !trivial {
if p.trivialCount+p.nonTrivialCount > 0 {
return false
}
p.nonTrivialCount++
return true
}
if p.nonTrivialCount > 0 {
return false
}
if p.maxCount > 0 && p.maxCount == p.trivialCount {
return false
}
p.trivialCount++
return true
}
// Close ends the task, releasing any resources that it holds and resetting the
// Decoder. The Task cannot be used again after being closed.
func (t *Task) Close() {
t.dec.Reset()
*t = Task{}
}