-
Notifications
You must be signed in to change notification settings - Fork 3
/
fsm.go
261 lines (221 loc) · 6.63 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package raftchunking
import (
"fmt"
"io"
"github.com/hashicorp/go-raftchunking/types"
"github.com/hashicorp/raft"
"google.golang.org/protobuf/proto"
)
var _ raft.FSM = (*ChunkingFSM)(nil)
var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil)
var _ raft.BatchingFSM = (*ChunkingBatchingFSM)(nil)
type ChunkingSuccess struct {
Response interface{}
}
// ChunkingFSM is an FSM that implements chunking; it's the sister of
// ChunkingApply.
//
// N.B.: If a term change happens the final apply from the client will have a
// nil result and not be passed through to the underlying FSM. To detect this,
// the final apply to the underlying FSM is wrapped in ChunkingSuccess.
type ChunkingFSM struct {
underlying raft.FSM
store ChunkStorage
lastTerm uint64
}
type ChunkingBatchingFSM struct {
*ChunkingFSM
underlyingBatchingFSM raft.BatchingFSM
}
type ChunkingConfigurationStore struct {
*ChunkingFSM
underlyingConfigurationStore raft.ConfigurationStore
}
func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM {
ret := &ChunkingFSM{
underlying: underlying,
store: store,
}
if store == nil {
ret.store = NewInmemChunkStorage()
}
return ret
}
func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM {
ret := &ChunkingBatchingFSM{
ChunkingFSM: &ChunkingFSM{
underlying: underlying,
store: store,
},
underlyingBatchingFSM: underlying,
}
if store == nil {
ret.ChunkingFSM.store = NewInmemChunkStorage()
}
return ret
}
func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore {
ret := &ChunkingConfigurationStore{
ChunkingFSM: &ChunkingFSM{
underlying: underlying,
store: store,
},
underlyingConfigurationStore: underlying,
}
if store == nil {
ret.ChunkingFSM.store = NewInmemChunkStorage()
}
return ret
}
func (c *ChunkingFSM) applyChunk(l *raft.Log) (*raft.Log, error) {
if l.Term != c.lastTerm {
// Term has changed. A raft library client that was applying chunks
// should get an error that it's no longer the leader and bail, and
// then any client of (Consul, Vault, etc.) should then retry the full
// chunking operation automatically, which will be under a different
// opnum. So it should be safe in this case to clear the map.
if err := c.store.RestoreChunks(nil); err != nil {
return nil, err
}
c.lastTerm = l.Term
}
// Get chunk info from extensions
var ci types.ChunkInfo
if err := proto.Unmarshal(l.Extensions, &ci); err != nil {
return nil, fmt.Errorf("error unmarshaling chunk info: %w", err)
}
// Store the current chunk and find out if all chunks have arrived
done, err := c.store.StoreChunk(&ChunkInfo{
OpNum: ci.OpNum,
SequenceNum: ci.SequenceNum,
NumChunks: ci.NumChunks,
Term: l.Term,
Data: l.Data,
})
if err != nil {
return nil, err
}
if !done {
return nil, nil
}
// All chunks are here; get the full set and clear storage of the op
chunks, err := c.store.FinalizeOp(ci.OpNum)
if err != nil {
return nil, err
}
finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize)
for _, chunk := range chunks {
finalData = append(finalData, chunk.Data...)
}
// Use the latest log's values with the final data
logToApply := &raft.Log{
Index: l.Index,
Term: l.Term,
Type: l.Type,
Data: finalData,
Extensions: ci.NextExtensions,
}
return logToApply, nil
}
// Apply applies the log, handling chunking as needed. The return value will
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}
logToApply, err := c.applyChunk(l)
if err != nil {
return err
}
if logToApply != nil {
return ChunkingSuccess{Response: c.underlying.Apply(logToApply)}
}
return nil
}
func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) {
return c.underlying.Snapshot()
}
func (c *ChunkingFSM) Restore(rc io.ReadCloser) error {
return c.underlying.Restore(rc)
}
// Note: this is used in tests via the Raft package test helper functions, even
// if it's not used in client code
func (c *ChunkingFSM) Underlying() raft.FSM {
return c.underlying
}
func (c *ChunkingFSM) CurrentState() (*State, error) {
chunks, err := c.store.GetChunks()
if err != nil {
return nil, err
}
return &State{
ChunkMap: chunks,
}, nil
}
func (c *ChunkingFSM) RestoreState(state *State) error {
// If nil we'll restore to blank, so create a new state with a nil map
if state == nil {
state = new(State)
}
return c.store.RestoreChunks(state.ChunkMap)
}
func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) {
c.underlyingConfigurationStore.StoreConfiguration(index, configuration)
}
// ApplyBatch applies the logs, handling chunking as needed. The return value will
// be an array containing an error or whatever is returned from the underlying
// Apply for each log.
func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{} {
// responses has a response for each log; their slice index should match.
responses := make([]interface{}, len(logs))
// sentLogs keeps track of which logs we sent. The key is the raft Index
// associated with the log and the value is true if this is a finalized set
// of chunks.
sentLogs := make(map[uint64]bool)
// sendLogs is the subset of logs that we need to pass onto the underlying
// FSM.
sendLogs := make([]*raft.Log, 0, len(logs))
for i, l := range logs {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
sendLogs = append(sendLogs, l)
sentLogs[l.Index] = false
continue
}
logToApply, err := c.applyChunk(l)
if err != nil {
responses[i] = err
continue
}
if logToApply != nil {
sendLogs = append(sendLogs, logToApply)
sentLogs[l.Index] = true
}
}
// Send remaining logs to the underlying FSM.
var sentResponses []interface{}
if len(sendLogs) > 0 {
sentResponses = c.underlyingBatchingFSM.ApplyBatch(sendLogs)
}
var sentCounter int
for j, l := range logs {
// If the response is already set we errored above and should continue
// onto the next.
if responses[j] != nil {
continue
}
var resp interface{}
if chunked, ok := sentLogs[l.Index]; ok {
resp = sentResponses[sentCounter]
if chunked {
resp = ChunkingSuccess{Response: sentResponses[sentCounter]}
}
sentCounter++
}
responses[j] = resp
}
return responses
}