diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index aff1314faf..a2c50d5d9c 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -194,7 +194,8 @@ func (m *flushManager) dataWarmFlush( // Flush first because we will only snapshot if there are no outstanding flushes. flushTimes, err := m.namespaceFlushTimes(ns, startTime) if err != nil { - return err + multiErr = multiErr.Add(err) + continue } err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist) if err != nil { diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index d5fbe2c72b..a15849948c 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -23,6 +23,7 @@ package storage import ( "errors" "sort" + "strings" "sync" "testing" "time" @@ -198,6 +199,53 @@ func TestFlushManagerFlushDoneFlushError(t *testing.T) { require.EqualError(t, fakeErr, fm.Flush(now).Error()) } +// TestFlushManagerNamespaceFlushTimesErr makes sure that namespaceFlushTimes errors do +// not leave the persist manager in an invalid state. +func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + fakeErr = errors.New("some-err") + mockPersistManager = persist.NewMockManager(ctrl) + mockFlushPersist = persist.NewMockFlushPreparer(ctrl) + mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) + ) + + // Make sure DoneFlush is called despite encountering an error, once for snapshot and once for warm flush. + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + + mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) + mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) + + mockIndexFlusher := persist.NewMockIndexFlush(ctrl) + mockIndexFlusher.EXPECT().DoneIndex().Return(nil) + mockPersistManager.EXPECT().StartIndexPersist().Return(mockIndexFlusher, nil) + + testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) + db := newMockdatabase(ctrl) + db.EXPECT().Options().Return(testOpts).AnyTimes() + + nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false)) + ns := NewMockdatabaseNamespace(ctrl) + ns.EXPECT().Options().Return(nsOpts).AnyTimes() + ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() + ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, fakeErr).AnyTimes() + ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + db.EXPECT().GetOwnedNamespaces().Return([]databaseNamespace{ns}, nil) + + cl := commitlog.NewMockCommitLog(ctrl) + cl.EXPECT().RotateLogs().Return(testCommitlogFile, nil).AnyTimes() + + fm := newFlushManager(db, cl, tally.NoopScope).(*flushManager) + fm.pm = mockPersistManager + + now := time.Unix(0, 0) + require.True(t, strings.Contains(fm.Flush(now).Error(), fakeErr.Error())) +} + // TestFlushManagerFlushDoneSnapshotError makes sure that snapshot errors do not // impact flushing or index operations. func TestFlushManagerFlushDoneSnapshotError(t *testing.T) { @@ -508,45 +556,6 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { require.Equal(t, now, lastSuccessfulSnapshot) } -// func TestFlushManagerFlushSnapshotHonorsMinimumInterval(t *testing.T) { -// ctrl := gomock.NewController(t) -// defer ctrl.Finish() - -// var ( -// fm, ns1, ns2, _ = newMultipleFlushManagerNeedsFlush(t, ctrl) -// now = time.Now() -// ) -// fm.lastSuccessfulSnapshotStartTime = now - -// for _, ns := range []*MockdatabaseNamespace{ns1, ns2} { -// // Expect flushes but not snapshots. -// var ( -// rOpts = ns.Options().RetentionOptions() -// blockSize = rOpts.BlockSize() -// start = retention.FlushTimeStart(ns.Options().RetentionOptions(), now) -// flushEnd = retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) -// num = numIntervals(start, flushEnd, blockSize) -// ) - -// for i := 0; i < num; i++ { -// st := start.Add(time.Duration(i) * blockSize) -// ns.EXPECT().NeedsFlush(st, st).Return(false) -// } -// } - -// bootstrapStates := DatabaseBootstrapState{ -// NamespaceBootstrapStates: map[string]ShardBootstrapStates{ -// ns1.ID().String(): ShardBootstrapStates{}, -// ns2.ID().String(): ShardBootstrapStates{}, -// }, -// } -// require.NoError(t, fm.Flush(now, bootstrapStates)) - -// lastSuccessfulSnapshot, ok := fm.LastSuccessfulSnapshotStartTime() -// require.True(t, ok) -// require.Equal(t, now, lastSuccessfulSnapshot) -// } - type timesInOrder []time.Time func (a timesInOrder) Len() int { return len(a) }