Skip to content

Commit

Permalink
Fix bug in flush.go that would cause persist manager to get stuck in …
Browse files Browse the repository at this point in the history
…`idle` state (#2007)
  • Loading branch information
Richard Artoul authored Oct 17, 2019
1 parent 351e12d commit 1eb7e3f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (m *flushManager) dataWarmFlush(
// Flush first because we will only snapshot if there are no outstanding flushes.
flushTimes, err := m.namespaceFlushTimes(ns, startTime)
if err != nil {
return err
multiErr = multiErr.Add(err)
continue
}
err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
if err != nil {
Expand Down
87 changes: 48 additions & 39 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package storage
import (
"errors"
"sort"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -198,6 +199,53 @@ func TestFlushManagerFlushDoneFlushError(t *testing.T) {
require.EqualError(t, fakeErr, fm.Flush(now).Error())
}

// TestFlushManagerNamespaceFlushTimesErr makes sure that namespaceFlushTimes errors do
// not leave the persist manager in an invalid state.
func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
fakeErr = errors.New("some-err")
mockPersistManager = persist.NewMockManager(ctrl)
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl)
)

// Make sure DoneFlush is called despite encountering an error, once for snapshot and once for warm flush.
mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2)
mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2)

mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil)
mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil)

mockIndexFlusher := persist.NewMockIndexFlush(ctrl)
mockIndexFlusher.EXPECT().DoneIndex().Return(nil)
mockPersistManager.EXPECT().StartIndexPersist().Return(mockIndexFlusher, nil)

testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager)
db := newMockdatabase(ctrl)
db.EXPECT().Options().Return(testOpts).AnyTimes()

nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false))
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, fakeErr).AnyTimes()
ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
db.EXPECT().GetOwnedNamespaces().Return([]databaseNamespace{ns}, nil)

cl := commitlog.NewMockCommitLog(ctrl)
cl.EXPECT().RotateLogs().Return(testCommitlogFile, nil).AnyTimes()

fm := newFlushManager(db, cl, tally.NoopScope).(*flushManager)
fm.pm = mockPersistManager

now := time.Unix(0, 0)
require.True(t, strings.Contains(fm.Flush(now).Error(), fakeErr.Error()))
}

// TestFlushManagerFlushDoneSnapshotError makes sure that snapshot errors do not
// impact flushing or index operations.
func TestFlushManagerFlushDoneSnapshotError(t *testing.T) {
Expand Down Expand Up @@ -508,45 +556,6 @@ func TestFlushManagerFlushSnapshot(t *testing.T) {
require.Equal(t, now, lastSuccessfulSnapshot)
}

// func TestFlushManagerFlushSnapshotHonorsMinimumInterval(t *testing.T) {
// ctrl := gomock.NewController(t)
// defer ctrl.Finish()

// var (
// fm, ns1, ns2, _ = newMultipleFlushManagerNeedsFlush(t, ctrl)
// now = time.Now()
// )
// fm.lastSuccessfulSnapshotStartTime = now

// for _, ns := range []*MockdatabaseNamespace{ns1, ns2} {
// // Expect flushes but not snapshots.
// var (
// rOpts = ns.Options().RetentionOptions()
// blockSize = rOpts.BlockSize()
// start = retention.FlushTimeStart(ns.Options().RetentionOptions(), now)
// flushEnd = retention.FlushTimeEnd(ns.Options().RetentionOptions(), now)
// num = numIntervals(start, flushEnd, blockSize)
// )

// for i := 0; i < num; i++ {
// st := start.Add(time.Duration(i) * blockSize)
// ns.EXPECT().NeedsFlush(st, st).Return(false)
// }
// }

// bootstrapStates := DatabaseBootstrapState{
// NamespaceBootstrapStates: map[string]ShardBootstrapStates{
// ns1.ID().String(): ShardBootstrapStates{},
// ns2.ID().String(): ShardBootstrapStates{},
// },
// }
// require.NoError(t, fm.Flush(now, bootstrapStates))

// lastSuccessfulSnapshot, ok := fm.LastSuccessfulSnapshotStartTime()
// require.True(t, ok)
// require.Equal(t, now, lastSuccessfulSnapshot)
// }

type timesInOrder []time.Time

func (a timesInOrder) Len() int { return len(a) }
Expand Down

0 comments on commit 1eb7e3f

Please sign in to comment.