From 6a1b046b7e3ec17314f99a4bf6601cfbdf60903d Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 18 Jun 2020 20:28:04 -0400 Subject: [PATCH] [dbnode] Ensure index data consistency (#2399) --- src/dbnode/persist/fs/fs_mock.go | 54 ++++++---- src/dbnode/persist/fs/merger.go | 31 +++--- src/dbnode/persist/fs/merger_test.go | 14 ++- src/dbnode/persist/fs/persist_manager.go | 7 +- src/dbnode/persist/fs/types.go | 5 +- src/dbnode/persist/fs/write.go | 65 +++++++++--- src/dbnode/persist/types.go | 8 +- src/dbnode/storage/README.md | 5 + src/dbnode/storage/namespace.go | 43 +++++--- src/dbnode/storage/shard.go | 128 ++++++++++++++--------- src/dbnode/storage/shard_test.go | 12 ++- src/dbnode/storage/storage_mock.go | 44 +++++++- src/dbnode/storage/types.go | 8 +- 13 files changed, 296 insertions(+), 128 deletions(-) diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 2e55fa6edd..635cc05875 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -30,9 +30,10 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + persist "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/x/xio" - "github.com/m3db/m3/src/m3ninx/persist" + persist0 "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -78,6 +79,21 @@ func (mr *MockDataFileSetWriterMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDataFileSetWriter)(nil).Close)) } +// DeferClose mocks base method +func (m *MockDataFileSetWriter) DeferClose() (persist.DataCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeferClose") + ret0, _ := ret[0].(persist.DataCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeferClose indicates an expected call of DeferClose +func (mr *MockDataFileSetWriterMockRecorder) DeferClose() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeferClose", reflect.TypeOf((*MockDataFileSetWriter)(nil).DeferClose)) +} + // Open mocks base method func (m *MockDataFileSetWriter) Open(arg0 DataWriterOpenOptions) error { m.ctrl.T.Helper() @@ -525,7 +541,7 @@ func (mr *MockIndexFileSetWriterMockRecorder) Open(arg0 interface{}) *gomock.Cal } // WriteSegmentFileSet mocks base method -func (m *MockIndexFileSetWriter) WriteSegmentFileSet(arg0 persist.IndexSegmentFileSetWriter) error { +func (m *MockIndexFileSetWriter) WriteSegmentFileSet(arg0 persist0.IndexSegmentFileSetWriter) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WriteSegmentFileSet", arg0) ret0, _ := ret[0].(error) @@ -576,10 +592,10 @@ func (mr *MockIndexFileSetReaderMockRecorder) Close() *gomock.Call { } // IndexVolumeType mocks base method -func (m *MockIndexFileSetReader) IndexVolumeType() persist.IndexVolumeType { +func (m *MockIndexFileSetReader) IndexVolumeType() persist0.IndexVolumeType { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "IndexVolumeType") - ret0, _ := ret[0].(persist.IndexVolumeType) + ret0, _ := ret[0].(persist0.IndexVolumeType) return ret0 } @@ -605,10 +621,10 @@ func (mr *MockIndexFileSetReaderMockRecorder) Open(arg0 interface{}) *gomock.Cal } // ReadSegmentFileSet mocks base method -func (m *MockIndexFileSetReader) ReadSegmentFileSet() (persist.IndexSegmentFileSet, error) { +func (m *MockIndexFileSetReader) ReadSegmentFileSet() (persist0.IndexSegmentFileSet, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadSegmentFileSet") - ret0, _ := ret[0].(persist.IndexSegmentFileSet) + ret0, _ := ret[0].(persist0.IndexSegmentFileSet) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -671,10 +687,10 @@ func (m *MockIndexSegmentFileSetWriter) EXPECT() *MockIndexSegmentFileSetWriterM } // Files mocks base method -func (m *MockIndexSegmentFileSetWriter) Files() []persist.IndexSegmentFileType { +func (m *MockIndexSegmentFileSetWriter) Files() []persist0.IndexSegmentFileType { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Files") - ret0, _ := ret[0].([]persist.IndexSegmentFileType) + ret0, _ := ret[0].([]persist0.IndexSegmentFileType) return ret0 } @@ -727,10 +743,10 @@ func (mr *MockIndexSegmentFileSetWriterMockRecorder) SegmentMetadata() *gomock.C } // SegmentType mocks base method -func (m *MockIndexSegmentFileSetWriter) SegmentType() persist.IndexSegmentType { +func (m *MockIndexSegmentFileSetWriter) SegmentType() persist0.IndexSegmentType { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SegmentType") - ret0, _ := ret[0].(persist.IndexSegmentType) + ret0, _ := ret[0].(persist0.IndexSegmentType) return ret0 } @@ -741,7 +757,7 @@ func (mr *MockIndexSegmentFileSetWriterMockRecorder) SegmentType() *gomock.Call } // WriteFile mocks base method -func (m *MockIndexSegmentFileSetWriter) WriteFile(arg0 persist.IndexSegmentFileType, arg1 io.Writer) error { +func (m *MockIndexSegmentFileSetWriter) WriteFile(arg0 persist0.IndexSegmentFileType, arg1 io.Writer) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WriteFile", arg0, arg1) ret0, _ := ret[0].(error) @@ -778,10 +794,10 @@ func (m *MockIndexSegmentFileSet) EXPECT() *MockIndexSegmentFileSetMockRecorder } // Files mocks base method -func (m *MockIndexSegmentFileSet) Files() []persist.IndexSegmentFile { +func (m *MockIndexSegmentFileSet) Files() []persist0.IndexSegmentFile { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Files") - ret0, _ := ret[0].([]persist.IndexSegmentFile) + ret0, _ := ret[0].([]persist0.IndexSegmentFile) return ret0 } @@ -834,10 +850,10 @@ func (mr *MockIndexSegmentFileSetMockRecorder) SegmentMetadata() *gomock.Call { } // SegmentType mocks base method -func (m *MockIndexSegmentFileSet) SegmentType() persist.IndexSegmentType { +func (m *MockIndexSegmentFileSet) SegmentType() persist0.IndexSegmentType { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SegmentType") - ret0, _ := ret[0].(persist.IndexSegmentType) + ret0, _ := ret[0].(persist0.IndexSegmentType) return ret0 } @@ -871,10 +887,10 @@ func (m *MockIndexSegmentFile) EXPECT() *MockIndexSegmentFileMockRecorder { } // Files mocks base method -func (m *MockIndexSegmentFile) Files() []persist.IndexSegmentFile { +func (m *MockIndexSegmentFile) Files() []persist0.IndexSegmentFile { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Files") - ret0, _ := ret[0].([]persist.IndexSegmentFile) + ret0, _ := ret[0].([]persist0.IndexSegmentFile) return ret0 } @@ -927,10 +943,10 @@ func (mr *MockIndexSegmentFileMockRecorder) SegmentMetadata() *gomock.Call { } // SegmentType mocks base method -func (m *MockIndexSegmentFile) SegmentType() persist.IndexSegmentType { +func (m *MockIndexSegmentFile) SegmentType() persist0.IndexSegmentType { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SegmentType") - ret0, _ := ret[0].(persist.IndexSegmentType) + ret0, _ := ret[0].(persist0.IndexSegmentType) return ret0 } diff --git a/src/dbnode/persist/fs/merger.go b/src/dbnode/persist/fs/merger.go index 5b0a86fe46..1b5aeb2736 100644 --- a/src/dbnode/persist/fs/merger.go +++ b/src/dbnode/persist/fs/merger.go @@ -89,7 +89,7 @@ func (m *merger) Merge( flushPreparer persist.FlushPreparer, nsCtx namespace.Context, onFlush persist.OnFlushSeries, -) (err error) { +) (persist.DataCloser, error) { var ( reader = m.reader blockAllocSize = m.blockAllocSize @@ -114,10 +114,12 @@ func (m *merger) Merge( }, FileSetType: persist.FileSetFlushType, } + closer persist.DataCloser + err error ) if err := reader.Open(openOpts); err != nil { - return err + return closer, err } defer func() { // Only set the error here if not set by the end of the function, since @@ -129,7 +131,7 @@ func (m *merger) Merge( nsMd, err := namespace.NewMetadata(nsID, nsOpts) if err != nil { - return err + return closer, err } prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMd, @@ -141,7 +143,7 @@ func (m *merger) Merge( } prepared, err := flushPreparer.PrepareData(prepareOpts) if err != nil { - return err + return closer, err } var ( @@ -191,7 +193,7 @@ func (m *merger) Merge( // The merge is performed in two stages. The first stage is to loop through // series on disk and merge it with what's in the merge target. Looping - // through disk in the first stage is done intentionally to read disk + // through disk in the first stage is prepared intentionally to read disk // sequentially to optimize for spinning disk access. The second stage is to // persist the rest of the series in the merge target that were not // persisted in the first stage. @@ -199,7 +201,7 @@ func (m *merger) Merge( // First stage: loop through series on disk. for id, tagsIter, data, checksum, err := reader.Read(); err != io.EOF; id, tagsIter, data, checksum, err = reader.Read() { if err != nil { - return err + return closer, err } idsToFinalize = append(idsToFinalize, id) @@ -211,7 +213,7 @@ func (m *merger) Merge( ctx.Reset() mergeWithData, hasInMemoryData, err := mergeWith.Read(ctx, id, blockStart, nsCtx) if err != nil { - return err + return closer, err } if hasInMemoryData { segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData) @@ -222,7 +224,7 @@ func (m *merger) Merge( tags, err := convert.TagsFromTagsIter(id, tagsIter, identPool) tagsIter.Close() if err != nil { - return err + return closer, err } tagsToFinalize = append(tagsToFinalize, tags) @@ -232,15 +234,15 @@ func (m *merger) Merge( if len(segmentReaders) == 1 && hasInMemoryData == false { segment, err := segmentReaders[0].Segment() if err != nil { - return err + return closer, err } if err := persistSegmentWithChecksum(id, tags, segment, checksum, prepared.Persist); err != nil { - return err + return closer, err } } else { if err := persistSegmentReaders(id, tags, segmentReaders, iterResources, prepared.Persist); err != nil { - return err + return closer, err } } // Closing the context will finalize the data returned from @@ -278,12 +280,11 @@ func (m *merger) Merge( return err }, nsCtx) if err != nil { - return err + return closer, err } - // Close the flush preparer, which writes the rest of the files in the - // fileset. - return prepared.Close() + // NB(bodu): Return a deferred closer so that we can guarantee that cold index writes are persisted first. + return prepared.DeferClose() } func appendBlockReadersToSegmentReaders(segReaders []xio.SegmentReader, brs []xio.BlockReader) []xio.SegmentReader { diff --git a/src/dbnode/persist/fs/merger_test.go b/src/dbnode/persist/fs/merger_test.go index 59c26b6dfc..4077bf0d8c 100644 --- a/src/dbnode/persist/fs/merger_test.go +++ b/src/dbnode/persist/fs/merger_test.go @@ -437,6 +437,7 @@ func testMergeWith( reader := mockReaderFromData(ctrl, diskData) var persisted []persistedData + var deferClosed bool preparer := persist.NewMockFlushPreparer(ctrl) preparer.EXPECT().PrepareData(gomock.Any()).Return( persist.PreparedDataPersist{ @@ -450,7 +451,13 @@ func testMergeWith( }) return nil }, - Close: func() error { return nil }, + DeferClose: func() (persist.DataCloser, error) { + return func() error { + require.False(t, deferClosed) + deferClosed = true + return nil + }, nil + }, }, nil) nsCtx := namespace.Context{} @@ -463,8 +470,11 @@ func testMergeWith( BlockStart: startTime, } mergeWith := mockMergeWithFromData(t, ctrl, diskData, mergeTargetData) - err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{}) + close, err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{}) require.NoError(t, err) + require.False(t, deferClosed) + require.NoError(t, close()) + require.True(t, deferClosed) assertPersistedAsExpected(t, persisted, expectedData) } diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 61563cd608..05a32d9799 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -200,7 +200,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) { func (pm *persistManager) reset() { pm.status = persistManagerIdle - pm.start = timeZero + pm.start = timeZero pm.count = 0 pm.bytesWritten = 0 pm.worked = 0 @@ -490,6 +490,7 @@ func (pm *persistManager) PrepareData(opts persist.DataPrepareOptions) (persist. prepared.Persist = pm.persist prepared.Close = pm.closeData + prepared.DeferClose = pm.deferCloseData return prepared, nil } @@ -544,6 +545,10 @@ func (pm *persistManager) closeData() error { return pm.dataPM.writer.Close() } +func (pm *persistManager) deferCloseData() (persist.DataCloser, error) { + return pm.dataPM.writer.DeferClose() +} + // DoneFlush is called by the databaseFlushManager to finish the data persist process. func (pm *persistManager) DoneFlush() error { pm.Lock() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 4b33e6ce99..e17d453635 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -92,6 +92,9 @@ type DataFileSetWriter interface { // WriteAll will write the id and all byte slices and returns an error on a write error. // Callers must not call this method with a given ID more than once. WriteAll(id ident.ID, tags ident.Tags, data []checked.Bytes, checksum uint32) error + + // DeferClose returns a DataCloser that defers writing of a checkpoint file. + DeferClose() (persist.DataCloser, error) } // SnapshotMetadataFileWriter writes out snapshot metadata files. @@ -557,7 +560,7 @@ type Merger interface { flushPreparer persist.FlushPreparer, nsCtx namespace.Context, onFlush persist.OnFlushSeries, - ) error + ) (persist.DataCloser, error) } // NewMergerFn is the function to call to get a new Merger. diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index cb05a82c3b..df4e70a8a7 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -60,6 +60,7 @@ type writer struct { summariesPercent float64 bloomFilterFalsePositivePercent float64 + bufferSize int infoFdWithDigest digest.FdWithDigestWriter indexFdWithDigest digest.FdWithDigestWriter @@ -128,6 +129,7 @@ func NewWriter(opts Options) (DataFileSetWriter, error) { newDirectoryMode: opts.NewDirectoryMode(), summariesPercent: opts.IndexSummariesPercent(), bloomFilterFalsePositivePercent: opts.IndexBloomFilterFalsePositivePercent(), + bufferSize: bufferSize, infoFdWithDigest: digest.NewFdWithDigestWriter(bufferSize), indexFdWithDigest: digest.NewFdWithDigestWriter(bufferSize), summariesFdWithDigest: digest.NewFdWithDigestWriter(bufferSize), @@ -327,13 +329,40 @@ func (w *writer) Close() error { } // NB(xichen): only write out the checkpoint file if there are no errors // encountered between calling writer.Open() and writer.Close(). - if err := w.writeCheckpointFile(); err != nil { + if err := writeCheckpointFile( + w.checkpointFilePath, + w.digestFdWithDigestContents.Digest().Sum32(), + w.digestBuf, + w.newFileMode, + ); err != nil { w.err = err return err } return nil } +func (w *writer) DeferClose() (persist.DataCloser, error) { + err := w.close() + if w.err != nil { + return nil, w.err + } + if err != nil { + w.err = err + return nil, err + } + checkpointFilePath := w.checkpointFilePath + digestChecksum := w.digestFdWithDigestContents.Digest().Sum32() + newFileMode := w.newFileMode + return func() error { + return writeCheckpointFile( + checkpointFilePath, + digestChecksum, + digest.NewBuffer(), + newFileMode, + ) + }, nil +} + func (w *writer) close() error { if err := w.writeIndexRelatedFiles(); err != nil { return err @@ -359,21 +388,6 @@ func (w *writer) close() error { ) } -func (w *writer) writeCheckpointFile() error { - fd, err := w.openWritable(w.checkpointFilePath) - if err != nil { - return err - } - digestChecksum := w.digestFdWithDigestContents.Digest().Sum32() - if err := w.digestBuf.WriteDigestToFile(fd, digestChecksum); err != nil { - // NB(prateek): intentionally skipping fd.Close() error, as failure - // to write takes precedence over failure to close the file - fd.Close() - return err - } - return fd.Close() -} - func (w *writer) openWritable(filePath string) (*os.File, error) { return OpenWritable(filePath, w.newFileMode) } @@ -566,3 +580,22 @@ func (w *writer) writeInfoFileContents( _, err = w.infoFdWithDigest.Write(w.encoder.Bytes()) return err } + +func writeCheckpointFile( + checkpointFilePath string, + digestChecksum uint32, + digestBuf digest.Buffer, + newFileMode os.FileMode, +) error { + fd, err := OpenWritable(checkpointFilePath, newFileMode) + if err != nil { + return err + } + if err := digestBuf.WriteDigestToFile(fd, digestChecksum); err != nil { + // NB(prateek): intentionally skipping fd.Close() error, as failure + // to write takes precedence over failure to close the file + fd.Close() + return err + } + return fd.Close() +} diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 6d69713a15..932d256909 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -40,10 +40,14 @@ type DataFn func(id ident.ID, tags ident.Tags, segment ts.Segment, checksum uint // blocks for a (shard, blockStart) combination. type DataCloser func() error +// DeferCloser returns a DataCloser that persists the data checkpoint file when called. +type DeferCloser func() (DataCloser, error) + // PreparedDataPersist is an object that wraps holds a persist function and a closer. type PreparedDataPersist struct { - Persist DataFn - Close DataCloser + Persist DataFn + Close DataCloser + DeferClose DeferCloser } // CommitLogFiles represents a slice of commitlog files. diff --git a/src/dbnode/storage/README.md b/src/dbnode/storage/README.md index 3a7359b315..0c54d521f1 100644 --- a/src/dbnode/storage/README.md +++ b/src/dbnode/storage/README.md @@ -8,6 +8,11 @@ Flush occurs in the following steps: - data warm flush - rotate commit log - data cold flush + - rotate cold mutable index segments + - flush cold tsdb data and write most files to disk (except checkpoint files) + - flush cold index data to disk and reload + - evict rotated cold mutable index segments + - write tsdb checkpoint files (completes the tsdb cold flush lifecycle) - data snapshot - drops rotated commit log when we are done - index flush diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index a45065acbe..46764245fc 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1168,11 +1168,11 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { return nil } - multiErr := xerrors.NewMultiError() shards := n.OwnedShards() resources, err := newColdFlushReuseableResources(n.opts) if err != nil { + n.metrics.flushColdData.ReportError(n.nowFn().Sub(callStart)) return err } @@ -1187,36 +1187,47 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { if n.reverseIndex != nil { onColdFlushDone, err = n.reverseIndex.ColdFlush(shards) if err != nil { + n.metrics.flushColdData.ReportError(n.nowFn().Sub(callStart)) return err } } onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n) if err != nil { + n.metrics.flushColdData.ReportError(n.nowFn().Sub(callStart)) return err } + // NB(bodu): Deferred shard cold flushes so that we can ensure that cold flush index data is + // persisted before persisting TSDB data to ensure crash consistency. + multiErr := xerrors.NewMultiError() + shardColdFlushes := make([]ShardColdFlush, 0, len(shards)) for _, shard := range shards { - err := shard.ColdFlush(flushPersist, resources, nsCtx, onColdFlushNs) + shardColdFlush, err := shard.ColdFlush(flushPersist, resources, nsCtx, onColdFlushNs) if err != nil { detailedErr := fmt.Errorf("shard %d failed to compact: %v", shard.ID(), err) multiErr = multiErr.Add(detailedErr) - // Continue with remaining shards. + continue + } + shardColdFlushes = append(shardColdFlushes, shardColdFlush) + } + + // We go through this error checking process to allow for partially successful flushes. + indexColdFlushError := onColdFlushNs.Done() + if indexColdFlushError == nil && onColdFlushDone != nil { + // Only evict rotated cold mutable index segments if the index cold flush was sucessful + // or we will lose queryability of data that's still in mem. + indexColdFlushError = onColdFlushDone() + } + if indexColdFlushError == nil { + // NB(bodu): We only want to complete data cold flushes if the index cold flush + // is successful. If index cold flush is successful, we want to attempt writing + // of checkpoint files to complete the cold data flush lifecycle for successful shards. + for _, shardColdFlush := range shardColdFlushes { + multiErr = multiErr.Add(shardColdFlush.Done()) } } - - if err := onColdFlushNs.Done(); err != nil { - multiErr = multiErr.Add(err) - } - if onColdFlushDone != nil { - multiErr = multiErr.Add(onColdFlushDone()) - } - - // TODO: Don't write checkpoints for shards until this time, - // otherwise it's possible to cold flush data but then not - // have the OnColdFlush processor actually successfully finish. - // Should only lay down files once onColdFlush.ColdFlushDone() - // finishes without an error. + multiErr = multiErr.Add(indexColdFlushError) res := multiErr.FinalError() n.metrics.flushColdData.ReportSuccessOrError(res, n.nowFn().Sub(callStart)) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 17ffc5d5d6..5fc2256b6e 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2211,12 +2211,12 @@ func (s *dbShard) ColdFlush( resources coldFlushReuseableResources, nsCtx namespace.Context, onFlush persist.OnFlushSeries, -) error { +) (ShardColdFlush, error) { // We don't flush data when the shard is still bootstrapping. s.RLock() if s.bootstrapState != Bootstrapped { s.RUnlock() - return errShardNotBootstrappedToFlush + return shardColdFlush{}, errShardNotBootstrappedToFlush } // Use blockStatesSnapshotWithRLock to avoid having to re-acquire read lock. blockStates := s.blockStatesSnapshotWithRLock() @@ -2232,7 +2232,7 @@ func (s *dbShard) ColdFlush( blockStatesSnapshot, bootstrapped := blockStates.UnwrapValue() if !bootstrapped { - return errFlushStateIsNotInitialized + return shardColdFlush{}, errFlushStateIsNotInitialized } var ( @@ -2274,7 +2274,7 @@ func (s *dbShard) ColdFlush( return true }) if loopErr != nil { - return loopErr + return shardColdFlush{}, loopErr } if dirtySeries.Len() == 0 { @@ -2282,9 +2282,13 @@ func (s *dbShard) ColdFlush( // may be non-empty when dirtySeries is empty because we purposely // leave empty seriesLists in the dirtySeriesToWrite map to avoid having // to reallocate them in subsequent usages of the shared resource. - return nil + return shardColdFlush{}, nil } + flush := shardColdFlush{ + shard: s, + doneFns: make([]shardColdFlushDone, 0, len(dirtySeriesToWrite)), + } merger := s.newMergerFn(resources.fsReader, s.opts.DatabaseBlockOptions().DatabaseBlockAllocSize(), s.opts.SegmentReaderPool(), s.opts.MultiReaderIteratorPool(), s.opts.IdentifierPool(), s.opts.EncoderPool(), s.opts.ContextPool(), s.namespace.Options()) @@ -2308,54 +2312,18 @@ func (s *dbShard) ColdFlush( } nextVersion := coldVersion + 1 - err = merger.Merge(fsID, mergeWithMem, nextVersion, flushPreparer, nsCtx, onFlush) - if err != nil { - multiErr = multiErr.Add(err) - continue - } - - // After writing the full block successfully update the ColdVersionFlushed number. This will - // allow the SeekerManager to open a lease on the latest version of the fileset files because - // the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at - // ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be - // evicted (which is the desired behavior because we haven't updated the open leases yet which - // means the newly written data is not available for querying via the SeekerManager yet.) - s.setFlushStateColdVersionFlushed(startTime, nextVersion) - - // Notify all block leasers that a new volume for the namespace/shard/blockstart - // has been created. This will block until all leasers have relinquished their - // leases. - _, err = s.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{ - Namespace: s.namespace.ID(), - Shard: s.ID(), - BlockStart: startTime, - }, block.LeaseState{Volume: nextVersion}) - // After writing the full block successfully **and** propagating the new lease to the - // BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function - // completes concurrent ticks will be able to evict the data from memory that was just flushed - // (which is now safe to do since the SeekerManager has been notified of the presence of new - // files). - // - // NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases - // succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift - // which would increase the complexity of the code to address a situation that is probably not - // recoverable (failure to UpdateOpenLeases is an invariant violated error). - s.setFlushStateColdVersionRetrievable(startTime, nextVersion) + close, err := merger.Merge(fsID, mergeWithMem, nextVersion, flushPreparer, nsCtx, onFlush) if err != nil { - instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) { - l.With( - zap.String("namespace", s.namespace.ID().String()), - zap.Uint32("shard", s.ID()), - zap.Time("blockStart", startTime), - zap.Int("nextVersion", nextVersion), - ).Error("failed to update open leases after updating flush state cold version") - }) multiErr = multiErr.Add(err) continue } + flush.doneFns = append(flush.doneFns, shardColdFlushDone{ + startTime: startTime, + nextVersion: nextVersion, + close: close, + }) } - - return multiErr.FinalError() + return flush, multiErr.FinalError() } func (s *dbShard) Snapshot( @@ -2587,6 +2555,70 @@ func (s *dbShard) logFlushResult(r dbShardFlushResult) { ) } +type shardColdFlushDone struct { + startTime time.Time + nextVersion int + close persist.DataCloser +} + +type shardColdFlush struct { + shard *dbShard + doneFns []shardColdFlushDone +} + +func (s shardColdFlush) Done() error { + multiErr := xerrors.NewMultiError() + for _, done := range s.doneFns { + startTime := done.startTime + nextVersion := done.nextVersion + + if err := done.close(); err != nil { + multiErr = multiErr.Add(err) + continue + } + // After writing the full block successfully update the ColdVersionFlushed number. This will + // allow the SeekerManager to open a lease on the latest version of the fileset files because + // the BlockLeaseVerifier will check the ColdVersionFlushed value, but the buffer only looks at + // ColdVersionRetrievable so a concurrent tick will not yet cause the blocks in memory to be + // evicted (which is the desired behavior because we haven't updated the open leases yet which + // means the newly written data is not available for querying via the SeekerManager yet.) + s.shard.setFlushStateColdVersionFlushed(startTime, nextVersion) + + // Notify all block leasers that a new volume for the namespace/shard/blockstart + // has been created. This will block until all leasers have relinquished their + // leases. + _, err := s.shard.opts.BlockLeaseManager().UpdateOpenLeases(block.LeaseDescriptor{ + Namespace: s.shard.namespace.ID(), + Shard: s.shard.ID(), + BlockStart: startTime, + }, block.LeaseState{Volume: nextVersion}) + // After writing the full block successfully **and** propagating the new lease to the + // BlockLeaseManager, update the ColdVersionRetrievable in the flush state. Once this function + // completes concurrent ticks will be able to evict the data from memory that was just flushed + // (which is now safe to do since the SeekerManager has been notified of the presence of new + // files). + // + // NB(rartoul): Ideally the ColdVersionRetrievable would only be updated if the call to UpdateOpenLeases + // succeeded, but that would allow the ColdVersionRetrievable and ColdVersionFlushed numbers to drift + // which would increase the complexity of the code to address a situation that is probably not + // recoverable (failure to UpdateOpenLeases is an invariant violated error). + s.shard.setFlushStateColdVersionRetrievable(startTime, nextVersion) + if err != nil { + instrument.EmitAndLogInvariantViolation(s.shard.opts.InstrumentOptions(), func(l *zap.Logger) { + l.With( + zap.String("namespace", s.shard.namespace.ID().String()), + zap.Uint32("shard", s.shard.ID()), + zap.Time("blockStart", startTime), + zap.Int("nextVersion", nextVersion), + ).Error("failed to update open leases after updating flush state cold version") + }) + multiErr = multiErr.Add(err) + continue + } + } + return multiErr.FinalError() +} + // dbShardFlushResult is a helper struct for keeping track of the result of flushing all the // series in the shard. type dbShardFlushResult struct { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index b151b6db5f..6c675ba4fc 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -635,8 +635,9 @@ func TestShardColdFlush(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, coldVersion) } - err = shard.ColdFlush(preparer, resources, nsCtx, &persist.NoOpColdFlushNamespace{}) + shardColdFlush, err := shard.ColdFlush(preparer, resources, nsCtx, &persist.NoOpColdFlushNamespace{}) require.NoError(t, err) + require.NoError(t, shardColdFlush.Done()) // After a cold flush, t0-t6 previously dirty block starts should be updated // to version 1. for i := t0; i.Before(t6.Add(blockSize)); i = i.Add(blockSize) { @@ -701,7 +702,9 @@ func TestShardColdFlushNoMergeIfNothingDirty(t *testing.T) { } nsCtx := namespace.Context{} - shard.ColdFlush(preparer, resources, nsCtx, &persist.NoOpColdFlushNamespace{}) + shardColdFlush, err := shard.ColdFlush(preparer, resources, nsCtx, &persist.NoOpColdFlushNamespace{}) + require.NoError(t, err) + require.NoError(t, shardColdFlush.Done()) // After a cold flush, t0-t3 should remain version 0, since nothing should // actually be merged. for i := t0; i.Before(t3.Add(blockSize)); i = i.Add(blockSize) { @@ -733,8 +736,9 @@ func (m *noopMerger) Merge( flushPreparer persist.FlushPreparer, nsCtx namespace.Context, onFlush persist.OnFlushSeries, -) error { - return nil +) (persist.DataCloser, error) { + closer := func() error { return nil } + return closer, nil } func newFSMergeWithMemTestFn( diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index d499d03697..f456ebe850 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1880,11 +1880,12 @@ func (mr *MockdatabaseShardMockRecorder) WarmFlush(blockStart, flush, nsCtx inte } // ColdFlush mocks base method -func (m *MockdatabaseShard) ColdFlush(flush persist.FlushPreparer, resources coldFlushReuseableResources, nsCtx namespace.Context, onFlush persist.OnFlushSeries) error { +func (m *MockdatabaseShard) ColdFlush(flush persist.FlushPreparer, resources coldFlushReuseableResources, nsCtx namespace.Context, onFlush persist.OnFlushSeries) (ShardColdFlush, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ColdFlush", flush, resources, nsCtx, onFlush) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(ShardColdFlush) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ColdFlush indicates an expected call of ColdFlush @@ -1996,6 +1997,43 @@ func (mr *MockdatabaseShardMockRecorder) SeriesReadWriteRef(id, tags, opts inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesReadWriteRef", reflect.TypeOf((*MockdatabaseShard)(nil).SeriesReadWriteRef), id, tags, opts) } +// MockShardColdFlush is a mock of ShardColdFlush interface +type MockShardColdFlush struct { + ctrl *gomock.Controller + recorder *MockShardColdFlushMockRecorder +} + +// MockShardColdFlushMockRecorder is the mock recorder for MockShardColdFlush +type MockShardColdFlushMockRecorder struct { + mock *MockShardColdFlush +} + +// NewMockShardColdFlush creates a new mock instance +func NewMockShardColdFlush(ctrl *gomock.Controller) *MockShardColdFlush { + mock := &MockShardColdFlush{ctrl: ctrl} + mock.recorder = &MockShardColdFlushMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockShardColdFlush) EXPECT() *MockShardColdFlushMockRecorder { + return m.recorder +} + +// Done mocks base method +func (m *MockShardColdFlush) Done() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Done") + ret0, _ := ret[0].(error) + return ret0 +} + +// Done indicates an expected call of Done +func (mr *MockShardColdFlushMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockShardColdFlush)(nil).Done)) +} + // MockNamespaceIndex is a mock of NamespaceIndex interface type MockNamespaceIndex struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index cc695dc026..a3c5c22c3e 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -527,7 +527,7 @@ type databaseShard interface { resources coldFlushReuseableResources, nsCtx namespace.Context, onFlush persist.OnFlushSeries, - ) error + ) (ShardColdFlush, error) // Snapshot snapshot's the unflushed WarmWrites in this shard. Snapshot( @@ -572,6 +572,12 @@ type databaseShard interface { ) (SeriesReadWriteRef, error) } +// ShardColdFlush exposes a done method to finalize shard cold flush +// by persisting data and updating shard state/block leases. +type ShardColdFlush interface { + Done() error +} + // ShardSeriesReadWriteRefOptions are options for SeriesReadWriteRef // for the shard. type ShardSeriesReadWriteRefOptions struct {