-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
mock_store_liveness.go
318 lines (269 loc) · 9.28 KB
/
mock_store_liveness.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.
package raftstoreliveness
import (
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
// livenessEntry is an entry in the MockStoreLiveness state.
type livenessEntry struct {
// isSupporting controls whether supportFor returns true or false.
isSupporting bool
supportForEpoch pb.Epoch
// isSupported controls whether supportFrom returns true or false.
isSupported bool
supportFromEpoch pb.Epoch
}
// initLivenessEntry is the initial state entry placed in MockStoreLiveness.
var initLivenessEntry = livenessEntry{
// Initially, the peer is giving support to all other peers.
isSupporting: true,
supportForEpoch: 1,
// Initially, the peer is receiving support from all other peers.
isSupported: true,
supportFromEpoch: 1,
}
// MockStoreLiveness is a mock implementation of StoreLiveness. It initially
// treats all store to store connections as live, but it can be configured to
// withdraw support, grant support, and bump the supported epoch to/from any two
// peers.
//
// Each peer's state can be altered independently This makes it possible to
// construct a uni-directional partition.
type MockStoreLiveness struct {
id pb.PeerID
// state is a map, where state[i] represents the liveness entry for peer i.
state map[pb.PeerID]livenessEntry
// supportExpired controls whether this peer considers the leader support
// expired or not.
supportExpired bool
}
var _ StoreLiveness = &MockStoreLiveness{}
func NewMockStoreLiveness(id pb.PeerID) *MockStoreLiveness {
return &MockStoreLiveness{
id: id,
state: make(map[pb.PeerID]livenessEntry),
supportExpired: false,
}
}
// createStoreLivenessEntry creates a new state entry for the given peer.
func (m *MockStoreLiveness) createStoreLivenessEntry(id pb.PeerID) {
if _, exists := m.state[id]; exists {
panic("attempting to create a store liveness entry that already exists")
}
m.state[id] = initLivenessEntry
}
// SupportFor implements the StoreLiveness interface.
func (m *MockStoreLiveness) SupportFor(id pb.PeerID) (pb.Epoch, bool) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call SupportFor() for a non-existing entry")
}
return entry.supportForEpoch, entry.isSupporting
}
// SupportFrom implements the StoreLiveness interface.
func (m *MockStoreLiveness) SupportFrom(id pb.PeerID) (pb.Epoch, hlc.Timestamp) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call SupportFrom() for a non-existing entry")
}
if entry.isSupported {
return entry.supportFromEpoch, hlc.MaxTimestamp
}
return 0, hlc.Timestamp{}
}
// SupportFromEnabled implements the StoreLiveness interface.
func (*MockStoreLiveness) SupportFromEnabled() bool {
return true
}
// SupportExpired implements the StoreLiveness interface.
func (m *MockStoreLiveness) SupportExpired(ts hlc.Timestamp) bool {
if m.supportExpired {
return true
}
// If not configured explicitly, infer from the supplied timestamp.
switch ts {
case hlc.Timestamp{}:
return true
case hlc.MaxTimestamp:
return false
default:
panic("unexpected timestamp")
}
}
// bumpSupportForEpoch bumps the supportFor epoch for the given peer.
func (m *MockStoreLiveness) bumpSupportForEpoch(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call bumpSupportForEpoch() for a non-existing entry")
}
entry.supportForEpoch++
m.state[id] = entry
}
// bumpSupportFromEpoch bumps the supportFrom epoch for the given peer.
func (m *MockStoreLiveness) bumpSupportFromEpoch(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call bumpSupportFromEpoch() for a non-existing entry")
}
entry.supportFromEpoch++
m.state[id] = entry
}
// grantSupportFor grants support for the given peer.
func (m *MockStoreLiveness) grantSupportFor(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call grantSupportFor() for a non-existing entry")
}
entry.isSupporting = true
m.state[id] = entry
}
// grantSupportFrom grants support from the given peer.
func (m *MockStoreLiveness) grantSupportFrom(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call grantSupportFrom() for a non-existing entry")
}
entry.isSupported = true
m.state[id] = entry
}
// withdrawSupportFor withdraws support for the given peer.
func (m *MockStoreLiveness) withdrawSupportFor(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call withdrawSupportFor() for a non-existing entry")
}
entry.isSupporting = false
m.state[id] = entry
}
// withdrawSupportFrom withdraws support from the given peer.
func (m *MockStoreLiveness) withdrawSupportFrom(id pb.PeerID) {
entry, exists := m.state[id]
if !exists {
panic("attempting to call withdrawSupportFrom() for a non-existing entry")
}
entry.isSupported = false
m.state[id] = entry
}
// setSupportExpired explicitly controls what SupportExpired returns regardless
// of the timestamp.
func (m *MockStoreLiveness) setSupportExpired(expired bool) {
m.supportExpired = expired
}
// LivenessFabric is a global view of the store liveness state.
type LivenessFabric struct {
// state is an array, where state[i] represents the MockStoreLiveness entry
// for peer i.
state map[pb.PeerID]*MockStoreLiveness
}
// NewLivenessFabric initializes and returns a LivenessFabric.
func NewLivenessFabric() *LivenessFabric {
state := make(map[pb.PeerID]*MockStoreLiveness)
return &LivenessFabric{
state: state,
}
}
// AddPeer adds a peer to the liveness fabric.
func (l *LivenessFabric) AddPeer(id pb.PeerID) {
if _, exists := l.state[id]; exists {
panic("attempting to add a peer that already exists")
}
l.state[id] = NewMockStoreLiveness(id)
l.state[id].createStoreLivenessEntry(id)
// Iterate over all liveness stores in the fabric, and add the new peer to
// their state.
for _, storeLiveness := range l.state {
// We added our self above.
if storeLiveness.id == id {
continue
}
storeLiveness.createStoreLivenessEntry(id)
l.state[id].createStoreLivenessEntry(storeLiveness.id)
}
}
// GetStoreLiveness return the MockStoreLiveness for the given peer.
func (l *LivenessFabric) GetStoreLiveness(id pb.PeerID) *MockStoreLiveness {
entry, exists := l.state[id]
if !exists {
panic("attempting to call GetStoreLiveness() for a non-existing id")
}
return entry
}
// BumpEpoch bumps the epoch supported by "fromID" for "forID" and starts
// supporting the new epoch. We also update state on "forID" to reflect support
// at this new epoch.
func (l *LivenessFabric) BumpEpoch(fromID pb.PeerID, forID pb.PeerID) {
fromEntry, exists := l.state[fromID]
if !exists {
panic("attempting to call BumpEpoch() for a non-existing fromID entry")
}
fromEntry.bumpSupportForEpoch(forID)
fromEntry.grantSupportFor(forID)
forEntry, exists := l.state[forID]
if !exists {
panic("attempting to call BumpEpoch() for a non-existing forID entry")
}
forEntry.bumpSupportFromEpoch(fromID)
forEntry.grantSupportFrom(fromID)
}
// WithdrawSupport withdraws the support by "fromID" for "forID". We also update
// state on "forID" to reflect the withdrawal of support.
func (l *LivenessFabric) WithdrawSupport(fromID pb.PeerID, forID pb.PeerID) {
fromEntry, exists := l.state[fromID]
if !exists {
panic("attempting to call WithdrawSupport() for a non-existing fromID entry")
}
fromEntry.withdrawSupportFor(forID)
forEntry, exists := l.state[forID]
if !exists {
panic("attempting to call WithdrawSupport() for a non-existing forID entry")
}
forEntry.withdrawSupportFrom(fromID)
}
// GrantSupport grants the support by "fromID" for "forID". We also update state
// on "forID" to reflect the withdrawal of support.
func (l *LivenessFabric) GrantSupport(fromID pb.PeerID, forID pb.PeerID) {
fromEntry, exists := l.state[fromID]
if !exists {
panic("attempting to call GrantSupport() for a non-existing fromID entry")
}
fromEntry.grantSupportFor(forID)
forEntry, exists := l.state[forID]
if !exists {
panic("attempting to call GrantSupport() for a non-existing forID entry")
}
forEntry.grantSupportFrom(fromID)
}
// SetSupportExpired explicitly controls what SupportExpired returns regardless
// of the timestamp supplied to it.
func (l *LivenessFabric) SetSupportExpired(peer pb.PeerID, expired bool) {
entry, exists := l.state[peer]
if !exists {
panic("attempting to call SetSupportExpired() for a non-existing peer entry")
}
entry.setSupportExpired(expired)
}
// WithdrawSupportForPeerFromAllPeers withdraws support for the target peer from
// all peers in the liveness fabric.
func (l *LivenessFabric) WithdrawSupportForPeerFromAllPeers(target pb.PeerID) {
for curID := range l.state {
l.WithdrawSupport(curID, target)
}
}
// GrantSupportForPeerFromAllPeers grants support for the target peer from
// // all peers in the liveness fabric.
func (l *LivenessFabric) GrantSupportForPeerFromAllPeers(targetID pb.PeerID) {
for curID := range l.state {
l.GrantSupport(curID, targetID)
}
}
// BumpAllSupportEpochs bumps all the support epochs in the liveness fabric.
func (l *LivenessFabric) BumpAllSupportEpochs() {
for s1ID, storeLiveness := range l.state {
for s2ID := range storeLiveness.state {
l.BumpEpoch(s1ID, s2ID)
}
}
}