This repository has been archived by the owner on Apr 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
persistent_storage.go
228 lines (186 loc) · 5.46 KB
/
persistent_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package canoe
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"os"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
type walMetadata struct {
NodeID uint64 `json:"node_id"`
ClusterID uint64 `json:"cluster_id"`
}
func (rn *Node) initPersistentStorage() error {
if err := rn.initSnap(); err != nil {
return errors.Wrap(err, "Error initializing snapshot")
}
raftSnap, err := rn.ss.Load()
if err != nil {
if err != snap.ErrNoSnapshot && err != snap.ErrEmptySnapshot {
return errors.Wrap(err, "Error loading latest snapshot")
}
}
var walSnap walpb.Snapshot
if raftSnap != nil {
walSnap.Index, walSnap.Term = raftSnap.Metadata.Index, raftSnap.Metadata.Term
}
if err := rn.initWAL(walSnap); err != nil {
return errors.Wrap(err, "Error initializing WAL")
}
return nil
}
// Correct order of ops
// 1: Restore Metadata from WAL
// 2: Apply any persisted snapshot to FSM
// 3: Apply any Snapshot to raft storage
// 4: Apply any hardstate to raft storage
// 5: Apply and WAL Entries to raft storage
func (rn *Node) restoreRaft() error {
raftSnap, err := rn.ss.Load()
if err != nil {
if err != snap.ErrNoSnapshot && err != snap.ErrEmptySnapshot {
return errors.Wrap(err, "Error loading latest snapshot")
}
}
var walSnap walpb.Snapshot
if raftSnap != nil {
walSnap.Index, walSnap.Term = raftSnap.Metadata.Index, raftSnap.Metadata.Term
} else {
raftSnap = &raftpb.Snapshot{}
}
wMetadata, hState, ents, err := rn.wal.ReadAll()
if err != nil {
return errors.Wrap(err, "Error reading WAL")
}
// NOTE: Step 1
if err := rn.restoreMetadata(wMetadata); err != nil {
return errors.Wrap(err, "Error restoring from WAL metadata")
}
// We can do this now that we restored the metadata
if err := rn.attachTransport(); err != nil {
return errors.Wrap(err, "Error attaching raft Transport layer")
}
if err := rn.transport.Start(); err != nil {
return errors.Wrap(err, "Error starting raft transport layer")
}
// NOTE: Step 2
if err := rn.restoreFSMFromSnapshot(*raftSnap); err != nil {
return errors.Wrap(err, "Error restoring FSM from snapshot")
}
// NOTE: Step 3, 4, 5
if err := rn.restoreMemoryStorage(*raftSnap, hState, ents); err != nil {
return errors.Wrap(err, "Error restoring raft memory storage")
}
return nil
}
func (rn *Node) initSnap() error {
if rn.snapDir() == "" {
return nil
}
if err := os.MkdirAll(rn.snapDir(), 0750); err != nil && !os.IsExist(err) {
return errors.Wrap(err, "Error trying to create directory for snapshots")
}
rn.ss = snap.New(rn.snapDir())
return nil
}
func (rn *Node) persistSnapshot(raftSnap raftpb.Snapshot) error {
if rn.ss != nil {
if err := rn.ss.SaveSnap(raftSnap); err != nil {
return errors.Wrap(err, "Error saving snapshot to persistent storage")
}
}
if rn.wal != nil {
var walSnap walpb.Snapshot
walSnap.Index, walSnap.Term = raftSnap.Metadata.Index, raftSnap.Metadata.Term
if err := rn.wal.SaveSnapshot(walSnap); err != nil {
return errors.Wrap(err, "Error updating WAL with snapshot")
}
if err := rn.wal.ReleaseLockTo(raftSnap.Metadata.Index); err != nil {
return errors.Wrap(err, "Error releasing WAL locks")
}
}
return nil
}
func (rn *Node) initWAL(walSnap walpb.Snapshot) error {
if rn.walDir() == "" {
return nil
}
if !wal.Exist(rn.walDir()) {
metaStruct := &walMetadata{
NodeID: rn.id,
ClusterID: rn.cid,
}
metaData, err := json.Marshal(metaStruct)
if err != nil {
return errors.Wrap(err, "Error marshaling WAL metadata")
}
w, err := wal.Create(rn.walDir(), metaData)
if err != nil {
return errors.Wrap(err, "Error creating new WAL")
}
rn.wal = w
} else {
// This assumes we WILL be reading this once elsewhere
w, err := wal.Open(rn.walDir(), walSnap)
if err != nil {
return errors.Wrap(err, "Error opening existing WAL")
}
rn.wal = w
}
return nil
}
func (rn *Node) restoreMetadata(wMetadata []byte) error {
var metaData walMetadata
if err := json.Unmarshal(wMetadata, &metaData); err != nil {
return errors.Wrap(err, "Error unmarshaling WAL metadata")
}
rn.id, rn.cid = metaData.NodeID, metaData.ClusterID
rn.raftConfig.ID = metaData.NodeID
return nil
}
func (rn *Node) restoreMemoryStorage(raftSnap raftpb.Snapshot, hState raftpb.HardState, ents []raftpb.Entry) error {
if !raft.IsEmptySnap(raftSnap) {
if err := rn.raftStorage.ApplySnapshot(raftSnap); err != nil {
return errors.Wrap(err, "Error applying snapshot to raft memory storage")
}
}
if rn.wal != nil {
if err := rn.raftStorage.SetHardState(hState); err != nil {
return errors.Wrap(err, "Error setting memory hardstate")
}
if err := rn.raftStorage.Append(ents); err != nil {
return errors.Wrap(err, "Error appending entries to memory storage")
}
}
return nil
}
func (rn *Node) deletePersistentData() error {
if rn.snapDir() != "" {
if err := os.RemoveAll(rn.snapDir()); err != nil {
return errors.Wrap(err, "Error deleting snapshot directory")
}
}
if rn.walDir() != "" {
//TODO: Should be delete walDir or snapDir()?
if err := os.RemoveAll(rn.walDir()); err != nil {
return errors.Wrap(err, "Error deleting WAL directory")
}
}
return nil
}
func (rn *Node) walDir() string {
if rn.dataDir == "" {
return ""
}
return fmt.Sprintf("%s%s", rn.dataDir, walDirExtension)
}
func (rn *Node) snapDir() string {
if rn.dataDir == "" {
return ""
}
return fmt.Sprintf("%s%s", rn.dataDir, snapDirExtension)
}