Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/raft: Fix memory allocation issue and Metadata tracking issues with snapshots #8793

Merged
merged 6 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down
124 changes: 94 additions & 30 deletions physical/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type FSM struct {
storeLatestState bool

chunker *raftchunking.ChunkingBatchingFSM

// testSnapshotRestoreError is used in tests to simulate an error while
// restoring a snapshot.
testSnapshotRestoreError bool
}

// NewFSM constructs a FSM using the given directory
Expand Down Expand Up @@ -193,20 +197,20 @@ func (f *FSM) witnessIndex(i *IndexValue) {
}
}

func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error {
func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error {
var indexBytes []byte
latestIndex, _ := f.LatestState()

latestIndex.Index = index
latestIndex.Term = term
latestIndex.Index = metadata.Index
latestIndex.Term = metadata.Term

var err error
indexBytes, err = proto.Marshal(latestIndex)
if err != nil {
return err
}

protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration)
protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration)
configBytes, err := proto.Marshal(protoConfig)
if err != nil {
return err
Expand All @@ -232,16 +236,16 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat
}
}

atomic.StoreUint64(f.latestIndex, index)
atomic.StoreUint64(f.latestTerm, term)
atomic.StoreUint64(f.latestIndex, metadata.Index)
atomic.StoreUint64(f.latestTerm, metadata.Term)
f.latestConfig.Store(protoConfig)

return nil
}

// Delete deletes the given key from the bolt file.
func (f *FSM) Delete(ctx context.Context, path string) error {
defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these renames seems like a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're correct, however the FSM's Delete, DeletePrefix, and Put functions aren't called during normal vault operation. So these shouldn't have been emitted. Get and List will be a change, but there is already another function that emits these same metrics with more accuracy. Really the name here is a bug since it's scoping the metric to an incorrect system.

Copy link
Contributor Author

@briankassouf briankassouf Apr 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and put the metrics for Get and List back so as not to break any existing dashboards. We are now emitting both the old and new names.


f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -253,7 +257,7 @@ func (f *FSM) Delete(ctx context.Context, path string) error {

// Delete deletes the given key from the bolt file.
func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -277,7 +281,7 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {

// Get retrieves the value at the given path from the bolt file.
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand Down Expand Up @@ -311,7 +315,7 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {

// Put writes the given entry to the bolt file.
func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand All @@ -324,7 +328,7 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {

// List retrieves the set of keys with the given prefix from the bolt file.
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())

f.l.RLock()
defer f.l.RUnlock()
Expand Down Expand Up @@ -531,6 +535,8 @@ type writeErrorCloser interface {
// (size, checksum, etc) and a second for the sink of the data. We also use a
// proto delimited writer so we can stream proto messages to the sink.
func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now())

protoWriter := protoio.NewDelimitedWriter(sink)
metadataProtoWriter := protoio.NewDelimitedWriter(metaSink)

Expand Down Expand Up @@ -573,7 +579,9 @@ func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink write

// Snapshot implements the FSM interface. It returns a noop snapshot object.
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
return &noopSnapshotter{}, nil
return &noopSnapshotter{
fsm: f,
}, nil
}

// SetNoopRestore is used to disable restore operations on raft startup. Because
Expand All @@ -589,48 +597,91 @@ func (f *FSM) SetNoopRestore(enabled bool) {
// first deletes the existing bucket to clear all existing data, then recreates
// it so we can copy in the snapshot.
func (f *FSM) Restore(r io.ReadCloser) error {
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now())

if f.noopRestore == true {
return nil
}

snapMeta := r.(*boltSnapshotMetadataReader).Metadata()

protoReader := protoio.NewDelimitedReader(r, math.MaxInt32)
defer protoReader.Close()

f.l.Lock()
defer f.l.Unlock()

// Start a write transaction.
// Delete the existing data bucket and create a new one.
f.logger.Debug("snapshot restore: deleting bucket")
err := f.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(dataBucketName)
if err != nil {
return err
}

b, err := tx.CreateBucket(dataBucketName)
_, err = tx.CreateBucket(dataBucketName)
if err != nil {
return err
}

for {
return nil
})
if err != nil {
f.logger.Error("could not restore snapshot: could not clear existing bucket", "error", err)
return err
}

// If we are testing a failed snapshot error here.
if f.testSnapshotRestoreError {
return errors.New("Test error")
}

f.logger.Debug("snapshot restore: deleting bucket done")
f.logger.Debug("snapshot restore: writing keys")

var done bool
var keys int
for !done {
err := f.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(dataBucketName)
s := new(pb.StorageEntry)
err := protoReader.ReadMsg(s)
if err != nil {
if err == io.EOF {
return nil

// Commit in batches of 50k. Bolt holds all the data in memory and
// doesn't split the pages until commit so we do incremental writes.
// This is safe since we have a write lock on the fsm's lock.
for i := 0; i < 50000; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly curious here, but how was 50k chosen at the batch value to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boltdb docs say we shouldn't go over 100,000k items in a batch update. So i just went a good bit below that

err := protoReader.ReadMsg(s)
if err != nil {
if err == io.EOF {
done = true
return nil
}
return err
}
return err
}

err = b.Put([]byte(s.Key), s.Value)
if err != nil {
return err
err = b.Put([]byte(s.Key), s.Value)
if err != nil {
return err
}
keys += 1
}

return nil
})
if err != nil {
f.logger.Error("could not restore snapshot", "error", err)
return err
}

return nil
})
if err != nil {
f.logger.Error("could not restore snapshot", "error", err)
f.logger.Trace("snapshot restore: writing keys", "num_written", keys)
}

f.logger.Debug("snapshot restore: writing keys done")

// Write the metadata after we have applied all the snapshot data
f.logger.Debug("snapshot restore: writing metadata")
if err := f.witnessSnapshot(snapMeta); err != nil {
f.logger.Error("could not write metadata", "error", err)
return err
}

Expand All @@ -639,10 +690,23 @@ func (f *FSM) Restore(r io.ReadCloser) error {

// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
// since our SnapshotStore reads data out of the FSM on Open().
type noopSnapshotter struct{}
type noopSnapshotter struct {
fsm *FSM
}

// Persist doesn't do anything.
// Persist implements the fsm.Snapshot interface. It doesn't need to persist any
// state data, but it does persist the raft metadata. This is necessary so we
// can be sure to capture indexes for operation types that are not sent to the
// FSM.
func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
boltSnapshotSink := sink.(*BoltSnapshotSink)

// We are processing a snapshot, fastforward the index, term, and
// configuration to the latest seen by the raft system.
if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil {
return err
}

return nil
}

Expand Down
67 changes: 62 additions & 5 deletions physical/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,77 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str
return backend, raftDir
}

func connectPeers(nodes ...*RaftBackend) {
for _, node := range nodes {
for _, peer := range nodes {
if node == peer {
continue
}

node.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(peer.NodeID()), peer.raftTransport)
peer.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(node.NodeID()), node.raftTransport)
}
}
}

func stepDownLeader(t *testing.T, node *RaftBackend) {
t.Helper()

if err := node.raft.LeadershipTransfer().Error(); err != nil {
t.Fatal(err)
}

timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
if err := node.raft.VerifyLeader().Error(); err != nil {
return
}
time.Sleep(100 * time.Millisecond)
}

t.Fatal("still leader")
}

func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend {
t.Helper()
timeout := time.Now().Add(time.Second * 10)
for !time.Now().After(timeout) {
for _, node := range nodes {
if node.raft.Leader() == raft.ServerAddress(node.NodeID()) {
return node
}
}
time.Sleep(100 * time.Millisecond)
}

t.Fatal("no leader")
return nil
}

func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) {
t.Helper()
if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil {
t.Fatal(err)
}
}

func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error {
t.Helper()
index1, config1 := fsm1.LatestState()
index2, config2 := fsm2.LatestState()

if !proto.Equal(index1, index2) {
t.Fatalf("indexes did not match: %+v != %+v", index1, index2)
return fmt.Errorf("indexes did not match: %+v != %+v", index1, index2)
}
if !proto.Equal(config1, config2) {
t.Fatalf("configs did not match: %+v != %+v", config1, config2)
return fmt.Errorf("configs did not match: %+v != %+v", config1, config2)
}

compareDBs(t, fsm1.db, fsm2.db)
return compareDBs(t, fsm1.db, fsm2.db)
}

func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) {
func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error {
t.Helper()
db1 := make(map[string]string)
db2 := make(map[string]string)

Expand Down Expand Up @@ -135,8 +190,10 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) {
}

if diff := deep.Equal(db1, db2); diff != nil {
t.Fatal(diff)
return fmt.Errorf("%+v", diff)
}

return nil
}

func TestRaft_Backend(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions physical/raft/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,6 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin
return nil, fmt.Errorf("unsupported snapshot version %d", version)
}

// We are processing a snapshot, fastforward the index, term, and
// configuration to the latest seen by the raft system. This could include
// log indexes for operation types that are never sent to the FSM.
if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil {
return nil, err
}

// Create the sink
sink := &BoltSnapshotSink{
store: f,
Expand Down Expand Up @@ -208,6 +201,11 @@ func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser,
if err != nil {
return nil, nil, err
}

readCloser = &boltSnapshotMetadataReader{
meta: meta,
ReadCloser: readCloser,
}
}

return meta, readCloser, nil
Expand Down Expand Up @@ -286,3 +284,12 @@ func (s *BoltSnapshotSink) Cancel() error {

return nil
}

type boltSnapshotMetadataReader struct {
io.ReadCloser
meta *raft.SnapshotMeta
}

func (r *boltSnapshotMetadataReader) Metadata() *raft.SnapshotMeta {
return r.meta
}
Loading