diff --git a/manager/state/raft/storage.go b/manager/state/raft/storage.go index 95074ffc19..e47fe01b2f 100644 --- a/manager/state/raft/storage.go +++ b/manager/state/raft/storage.go @@ -10,6 +10,7 @@ import ( "sort" "strings" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" @@ -80,7 +81,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) { } n.wal, err = wal.Create(n.walDir(), metadata) if err != nil { - return raft.Peer{}, fmt.Errorf("create wal error: %v", err) + return raft.Peer{}, fmt.Errorf("create WAL error: %v", err) } n.cluster.AddMember(&membership.Member{RaftMember: raftNode}) @@ -127,7 +128,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC repaired := false for { if n.wal, err = wal.Open(n.walDir(), walsnap); err != nil { - return fmt.Errorf("open wal error: %v", err) + return fmt.Errorf("open WAL error: %v", err) } if metadata, st, ents, err = n.wal.ReadAll(); err != nil { if err := n.wal.Close(); err != nil { @@ -135,7 +136,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC } // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - return fmt.Errorf("read wal error (%v) and cannot be repaired", err) + return fmt.Errorf("read WAL error (%v) and cannot be repaired", err) } if !wal.Repair(n.walDir()) { return fmt.Errorf("WAL error (%v) cannot be repaired", err) @@ -157,7 +158,7 @@ func (n *Node) readWAL(ctx context.Context, snapshot *raftpb.Snapshot, forceNewC var raftNode api.RaftMember if err := raftNode.Unmarshal(metadata); err != nil { - return fmt.Errorf("error unmarshalling wal metadata: %v", err) + return fmt.Errorf("error unmarshalling WAL metadata: %v", err) } n.Config.ID = raftNode.RaftID @@ -274,25 +275,103 @@ func (n *Node) saveSnapshot(snapshot raftpb.Snapshot, keepOldSnapshots uint64) e // This means that if the current snapshot doesn't appear in the // directory for some strange reason, we won't delete anything, which // is the safe behavior. + curSnapshotIdx := -1 var ( - afterCurSnapshot bool - removeErr error + removeErr error + oldestSnapshot string ) + for i, snapFile := range snapshots { - if afterCurSnapshot { - if uint64(len(snapshots)-i) <= keepOldSnapshots { - return removeErr - } - err := os.Remove(filepath.Join(n.snapDir(), snapFile)) - if err != nil && removeErr == nil { - removeErr = err + if curSnapshotIdx >= 0 && i > curSnapshotIdx { + if uint64(i-curSnapshotIdx) > keepOldSnapshots { + err := os.Remove(filepath.Join(n.snapDir(), snapFile)) + if err != nil && removeErr == nil { + removeErr = err + } + continue } } else if snapFile == curSnapshot { - afterCurSnapshot = true + curSnapshotIdx = i + } + oldestSnapshot = snapFile + } + + if removeErr != nil { + return removeErr + } + + // Remove any WAL files that only contain data from before the oldest + // remaining snapshot. + + if oldestSnapshot == "" { + return nil + } + + // Parse index out of oldest snapshot's filename + var snapTerm, snapIndex uint64 + _, err = fmt.Sscanf(oldestSnapshot, "%016x-%016x.snap", &snapTerm, &snapIndex) + if err != nil { + return fmt.Errorf("malformed snapshot filename %s: %v", oldestSnapshot, err) + } + + // List the WALs + dirents, err = ioutil.ReadDir(n.walDir()) + if err != nil { + return err + } + + var wals []string + for _, dirent := range dirents { + if strings.HasSuffix(dirent.Name(), ".wal") { + wals = append(wals, dirent.Name()) } } - return removeErr + // Sort WAL filenames in lexical order + sort.Sort(sort.StringSlice(wals)) + + found := false + deleteUntil := -1 + + for i, walName := range wals { + var walSeq, walIndex uint64 + _, err = fmt.Sscanf(walName, "%016x-%016x.wal", &walSeq, &walIndex) + if err != nil { + return fmt.Errorf("could not parse WAL name %s: %v", walName, err) + } + + if walIndex >= snapIndex { + deleteUntil = i - 1 + found = true + break + } + } + + // If all WAL files started with indices below the oldest snapshot's + // index, we can delete all but the newest WAL file. + if !found && len(wals) != 0 { + deleteUntil = len(wals) - 1 + } + + for i := 0; i < deleteUntil; i++ { + walPath := filepath.Join(n.walDir(), wals[i]) + l, err := fileutil.NewLock(walPath) + if err != nil { + continue + } + err = l.TryLock() + if err != nil { + return fmt.Errorf("could not lock old WAL file %s for removal: %v", wals[i], err) + } + err = os.Remove(walPath) + l.Unlock() + l.Destroy() + if err != nil { + return fmt.Errorf("error removing old WAL file %s: %v", wals[i], err) + } + } + + return nil } func (n *Node) doSnapshot(raftConfig *api.RaftConfig) { diff --git a/manager/state/raft/storage_test.go b/manager/state/raft/storage_test.go index b24c18123d..977036f97a 100644 --- a/manager/state/raft/storage_test.go +++ b/manager/state/raft/storage_test.go @@ -5,11 +5,14 @@ import ( "io/ioutil" "path/filepath" "testing" + "time" "github.com/docker/swarmkit/api" raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" + "github.com/docker/swarmkit/manager/state/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/context" ) func TestRaftSnapshot(t *testing.T) { @@ -243,3 +246,180 @@ func TestRaftSnapshotRestart(t *testing.T) { require.NoError(t, err) raftutils.CheckValuesOnNodes(t, clockSource, nodes, nodeIDs, values) } + +func TestGCWAL(t *testing.T) { + if testing.Short() { + t.Skip("TestGCWAL skipped with -short because it's resource-intensive") + } + t.Parallel() + + // Additional log entries from cluster setup, leader election + extraLogEntries := 5 + // Number of large entries to propose + proposals := 47 + + // Bring up a 3 node cluster + nodes, clockSource := raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0}) + + for i := 0; i != proposals; i++ { + _, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i)) + assert.NoError(t, err, "failed to propose value") + } + + time.Sleep(250 * time.Millisecond) + + // Snapshot should have been triggered just before the WAL rotated, so + // both WAL files should be preserved + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 snapshot, found %d", len(dirents)) + } + + dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal")) + if err != nil { + return err + } + if len(dirents) != 2 { + return fmt.Errorf("expected 2 WAL files, found %d", len(dirents)) + } + return nil + })) + + raftutils.TeardownCluster(t, nodes) + + // Repeat this test, but trigger the snapshot after the WAL has rotated + proposals++ + nodes, clockSource = raftutils.NewRaftCluster(t, tc, &api.RaftConfig{SnapshotInterval: uint64(proposals + extraLogEntries), LogEntriesForSlowFollowers: 0}) + defer raftutils.TeardownCluster(t, nodes) + + for i := 0; i != proposals; i++ { + _, err := proposeHugeValue(t, nodes[1], DefaultProposalTime, fmt.Sprintf("id%d", i)) + assert.NoError(t, err, "failed to propose value") + } + + time.Sleep(250 * time.Millisecond) + + // This time only one WAL file should be saved. + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + dirents, err := ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "snap")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 snapshot, found %d", len(dirents)) + } + + dirents, err = ioutil.ReadDir(filepath.Join(nodes[1].StateDir, "wal")) + if err != nil { + return err + } + if len(dirents) != 1 { + return fmt.Errorf("expected 1 WAL file, found %d", len(dirents)) + } + return nil + })) + + // Restart the whole cluster + for _, node := range nodes { + node.Server.Stop() + node.Shutdown() + } + + raftutils.AdvanceTicks(clockSource, 5) + + i := 0 + for k, node := range nodes { + nodes[k] = raftutils.RestartNode(t, clockSource, node, false) + i++ + } + raftutils.WaitForCluster(t, clockSource, nodes) + + // Is the data intact after restart? + for _, node := range nodes { + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + var err error + node.MemoryStore().View(func(tx store.ReadTx) { + var allNodes []*api.Node + allNodes, err = store.FindNodes(tx, store.All) + if err != nil { + return + } + if len(allNodes) != proposals { + err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes)) + return + } + }) + return err + })) + } + + // It should still be possible to propose values + _, err := raftutils.ProposeValue(t, raftutils.Leader(nodes), DefaultProposalTime, "newnode") + assert.NoError(t, err, "failed to propose value") + + for _, node := range nodes { + assert.NoError(t, raftutils.PollFunc(clockSource, func() error { + var err error + node.MemoryStore().View(func(tx store.ReadTx) { + var allNodes []*api.Node + allNodes, err = store.FindNodes(tx, store.All) + if err != nil { + return + } + if len(allNodes) != proposals+1 { + err = fmt.Errorf("expected %d nodes, got %d", proposals, len(allNodes)) + return + } + }) + return err + })) + } +} + +// proposeHugeValue proposes a 1.4MB value to a raft test cluster +func proposeHugeValue(t *testing.T, raftNode *raftutils.TestNode, time time.Duration, nodeID ...string) (*api.Node, error) { + nodeIDStr := "id1" + if len(nodeID) != 0 { + nodeIDStr = nodeID[0] + } + a := make([]byte, 1400000) + for i := 0; i != len(a); i++ { + a[i] = 'a' + } + node := &api.Node{ + ID: nodeIDStr, + Spec: api.NodeSpec{ + Annotations: api.Annotations{ + Name: nodeIDStr, + Labels: map[string]string{ + "largestring": string(a), + }, + }, + }, + } + + storeActions := []*api.StoreAction{ + { + Action: api.StoreActionKindCreate, + Target: &api.StoreAction_Node{ + Node: node, + }, + }, + } + + ctx, _ := context.WithTimeout(context.Background(), time) + + err := raftNode.ProposeValue(ctx, storeActions, func() { + err := raftNode.MemoryStore().ApplyStoreActions(storeActions) + assert.NoError(t, err, "error applying actions") + }) + if err != nil { + return nil, err + } + + return node, nil +}