Skip to content

Commit

Permalink
Reset old logs after snapshot restore
Browse files Browse the repository at this point in the history
This commit makes use MonotonicLogStore type assertion to delete all
entries from the LogStore after snapshot restore.
  • Loading branch information
mpalmi committed Mar 12, 2023
1 parent ec7349b commit f10b599
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 8 deletions.
8 changes: 8 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,14 @@ func (r *Raft) restoreSnapshot() error {
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)

// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and
// continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
}

// Success!
return nil
}
Expand Down
17 changes: 14 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,13 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)

// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to remove old logs", "error", err)
}
}

r.logger.Info("restored user snapshot", "index", latestIndex)
return nil
}
Expand Down Expand Up @@ -1790,15 +1797,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
// Clear old logs if r.logs is a MonotonicLogStore. Otherwise compact the
// logs. In both cases, log any errors and continue.
if mlogs, ok := r.logs.(MonotonicLogStore); ok && mlogs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
} else if err := r.compactLogs(req.LastLogIndex); err != nil {
r.logger.Error("failed to compact logs", "error", err)
}

r.logger.Info("Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}

// setLastContact is used to set the last contact time to now
Expand Down
95 changes: 92 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1019,6 +1020,79 @@ func TestRaft_SnapshotRestore(t *testing.T) {
}
}

func TestRaft_SnapshotRestore_Monotonic(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
conf.TrailingLogs = 10
opts := &MakeClusterOpts{
Peers: 1,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c := MakeClusterCustom(t, opts)
defer c.Close()

leader := c.Leader()

// Commit a lot of things
var future Future
for i := 0; i < 100; i++ {
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
}

// Wait for the last future to apply
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Take a snapshot
snapFuture := leader.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Check for snapshot
snaps, _ := leader.snapshots.List()
if len(snaps) != 1 {
t.Fatalf("should have a snapshot")
}
snap := snaps[0]

// Logs should be trimmed
if idx, _ := leader.logs.FirstIndex(); idx != snap.Index-conf.TrailingLogs+1 {
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, idx)
}

// Shutdown
shutdown := leader.Shutdown()
if err := shutdown.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Restart the Raft
r := leader
// Can't just reuse the old transport as it will be closed
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
cfg := r.config()
r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
t.Fatalf("err: %v", err)
}
c.rafts[0] = r

// We should have restored from the snapshot!
if last := r.getLastApplied(); last != snap.Index {
t.Fatalf("bad last index: %d, expecting %d", last, snap.Index)
}

// Verify that logs have been reset
first, _ := r.logs.FirstIndex()
last, _ := r.logs.LastIndex()
assert.Zero(t, first)
assert.Zero(t, last)
}

func TestRaft_SnapshotRestore_Progress(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
Expand Down Expand Up @@ -1342,7 +1416,7 @@ func TestRaft_UserSnapshot(t *testing.T) {

// snapshotAndRestore does a snapshot and restore sequence and applies the given
// offset to the snapshot index, so we can try out different situations.
func snapshotAndRestore(t *testing.T, offset uint64) {
func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool) {
// Make the cluster.
conf := inmemConfig(t)

Expand All @@ -1352,7 +1426,18 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 500 * time.Millisecond

c := MakeCluster(3, t, conf)
var c *cluster
if monotonicLogStore {
opts := &MakeClusterOpts{
Peers: 3,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c = MakeClusterCustom(t, opts)
} else {
c = MakeCluster(3, t, conf)
}
defer c.Close()

// Wait for things to get stable and commit some things.
Expand Down Expand Up @@ -1448,7 +1533,10 @@ func TestRaft_UserRestore(t *testing.T) {

for _, c := range cases {
t.Run(fmt.Sprintf("case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c)
snapshotAndRestore(t, c, false)
})
t.Run(fmt.Sprintf("monotonic case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c, true)
})
}
}
Expand Down Expand Up @@ -2380,6 +2468,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
t.Errorf("leadership shouldn't have started, but instead it error with: %v", err)
}
}

func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down
26 changes: 26 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,29 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
}
return nil
}

// removeOldLogs removes all old logs from the store. This is used for
// MonotonicLogStores after restore. Callers should verify that the store
// implementation is monotonic prior to calling.
func (r *Raft) removeOldLogs() error {
defer metrics.MeasureSince([]string{"raft", "removeOldLogs"}, time.Now())

// Determine log ranges to truncate
firstLogIdx, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %w", err)
}

lastLogIdx, err := r.logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last log index: %w", err)
}

r.logger.Info("removing all old logs from log store", "first", firstLogIdx, "last", lastLogIdx)

if err := r.logs.DeleteRange(firstLogIdx, lastLogIdx); err != nil {
return fmt.Errorf("log truncation failed: %v", err)
}

return nil
}
10 changes: 8 additions & 2 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (m *MockSnapshot) Persist(sink SnapshotSink) error {
func (m *MockSnapshot) Release() {
}

// MockMonotonicLogStore is a stubbed LogStore wrapper for testing the
// MockMonotonicLogStore is a LogStore wrapper for testing the
// MonotonicLogStore interface.
type MockMonotonicLogStore struct {
s LogStore
Expand Down Expand Up @@ -714,6 +714,7 @@ type MakeClusterOpts struct {
ConfigStoreFSM bool
MakeFSMFunc func() FSM
LongstopTimeout time.Duration
MonotonicLogs bool
}

// makeCluster will return a cluster with the given config and number of peers.
Expand Down Expand Up @@ -789,11 +790,16 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
// Create all the rafts
c.startTime = time.Now()
for i := 0; i < opts.Peers; i++ {
logs := c.stores[i]
var logs LogStore
logs = c.stores[i]
store := c.stores[i]
snap := c.snaps[i]
trans := c.trans[i]

if opts.MonotonicLogs {
logs = &MockMonotonicLogStore{s: logs}
}

peerConf := opts.Conf
peerConf.LocalID = configuration.Servers[i].ID
peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID))
Expand Down

0 comments on commit f10b599

Please sign in to comment.