Skip to content

Commit

Permalink
[dbnode] Ensure index data consistency (#2399)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Jun 19, 2020
1 parent 967f455 commit 6a1b046
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 128 deletions.
54 changes: 35 additions & 19 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions src/dbnode/persist/fs/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *merger) Merge(
flushPreparer persist.FlushPreparer,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
) (err error) {
) (persist.DataCloser, error) {
var (
reader = m.reader
blockAllocSize = m.blockAllocSize
Expand All @@ -114,10 +114,12 @@ func (m *merger) Merge(
},
FileSetType: persist.FileSetFlushType,
}
closer persist.DataCloser
err error
)

if err := reader.Open(openOpts); err != nil {
return err
return closer, err
}
defer func() {
// Only set the error here if not set by the end of the function, since
Expand All @@ -129,7 +131,7 @@ func (m *merger) Merge(

nsMd, err := namespace.NewMetadata(nsID, nsOpts)
if err != nil {
return err
return closer, err
}
prepareOpts := persist.DataPrepareOptions{
NamespaceMetadata: nsMd,
Expand All @@ -141,7 +143,7 @@ func (m *merger) Merge(
}
prepared, err := flushPreparer.PrepareData(prepareOpts)
if err != nil {
return err
return closer, err
}

var (
Expand Down Expand Up @@ -191,15 +193,15 @@ func (m *merger) Merge(

// The merge is performed in two stages. The first stage is to loop through
// series on disk and merge it with what's in the merge target. Looping
// through disk in the first stage is done intentionally to read disk
// through disk in the first stage is prepared intentionally to read disk
// sequentially to optimize for spinning disk access. The second stage is to
// persist the rest of the series in the merge target that were not
// persisted in the first stage.

// First stage: loop through series on disk.
for id, tagsIter, data, checksum, err := reader.Read(); err != io.EOF; id, tagsIter, data, checksum, err = reader.Read() {
if err != nil {
return err
return closer, err
}
idsToFinalize = append(idsToFinalize, id)

Expand All @@ -211,7 +213,7 @@ func (m *merger) Merge(
ctx.Reset()
mergeWithData, hasInMemoryData, err := mergeWith.Read(ctx, id, blockStart, nsCtx)
if err != nil {
return err
return closer, err
}
if hasInMemoryData {
segmentReaders = appendBlockReadersToSegmentReaders(segmentReaders, mergeWithData)
Expand All @@ -222,7 +224,7 @@ func (m *merger) Merge(
tags, err := convert.TagsFromTagsIter(id, tagsIter, identPool)
tagsIter.Close()
if err != nil {
return err
return closer, err
}
tagsToFinalize = append(tagsToFinalize, tags)

Expand All @@ -232,15 +234,15 @@ func (m *merger) Merge(
if len(segmentReaders) == 1 && hasInMemoryData == false {
segment, err := segmentReaders[0].Segment()
if err != nil {
return err
return closer, err
}

if err := persistSegmentWithChecksum(id, tags, segment, checksum, prepared.Persist); err != nil {
return err
return closer, err
}
} else {
if err := persistSegmentReaders(id, tags, segmentReaders, iterResources, prepared.Persist); err != nil {
return err
return closer, err
}
}
// Closing the context will finalize the data returned from
Expand Down Expand Up @@ -278,12 +280,11 @@ func (m *merger) Merge(
return err
}, nsCtx)
if err != nil {
return err
return closer, err
}

// Close the flush preparer, which writes the rest of the files in the
// fileset.
return prepared.Close()
// NB(bodu): Return a deferred closer so that we can guarantee that cold index writes are persisted first.
return prepared.DeferClose()
}

func appendBlockReadersToSegmentReaders(segReaders []xio.SegmentReader, brs []xio.BlockReader) []xio.SegmentReader {
Expand Down
14 changes: 12 additions & 2 deletions src/dbnode/persist/fs/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func testMergeWith(
reader := mockReaderFromData(ctrl, diskData)

var persisted []persistedData
var deferClosed bool
preparer := persist.NewMockFlushPreparer(ctrl)
preparer.EXPECT().PrepareData(gomock.Any()).Return(
persist.PreparedDataPersist{
Expand All @@ -450,7 +451,13 @@ func testMergeWith(
})
return nil
},
Close: func() error { return nil },
DeferClose: func() (persist.DataCloser, error) {
return func() error {
require.False(t, deferClosed)
deferClosed = true
return nil
}, nil
},
}, nil)
nsCtx := namespace.Context{}

Expand All @@ -463,8 +470,11 @@ func testMergeWith(
BlockStart: startTime,
}
mergeWith := mockMergeWithFromData(t, ctrl, diskData, mergeTargetData)
err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{})
close, err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx, &persist.NoOpColdFlushNamespace{})
require.NoError(t, err)
require.False(t, deferClosed)
require.NoError(t, close())
require.True(t, deferClosed)

assertPersistedAsExpected(t, persisted, expectedData)
}
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func NewPersistManager(opts Options) (persist.Manager, error) {

func (pm *persistManager) reset() {
pm.status = persistManagerIdle
pm.start = timeZero
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
Expand Down Expand Up @@ -490,6 +490,7 @@ func (pm *persistManager) PrepareData(opts persist.DataPrepareOptions) (persist.

prepared.Persist = pm.persist
prepared.Close = pm.closeData
prepared.DeferClose = pm.deferCloseData

return prepared, nil
}
Expand Down Expand Up @@ -544,6 +545,10 @@ func (pm *persistManager) closeData() error {
return pm.dataPM.writer.Close()
}

func (pm *persistManager) deferCloseData() (persist.DataCloser, error) {
return pm.dataPM.writer.DeferClose()
}

// DoneFlush is called by the databaseFlushManager to finish the data persist process.
func (pm *persistManager) DoneFlush() error {
pm.Lock()
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type DataFileSetWriter interface {
// WriteAll will write the id and all byte slices and returns an error on a write error.
// Callers must not call this method with a given ID more than once.
WriteAll(id ident.ID, tags ident.Tags, data []checked.Bytes, checksum uint32) error

// DeferClose returns a DataCloser that defers writing of a checkpoint file.
DeferClose() (persist.DataCloser, error)
}

// SnapshotMetadataFileWriter writes out snapshot metadata files.
Expand Down Expand Up @@ -557,7 +560,7 @@ type Merger interface {
flushPreparer persist.FlushPreparer,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
) error
) (persist.DataCloser, error)
}

// NewMergerFn is the function to call to get a new Merger.
Expand Down
Loading

0 comments on commit 6a1b046

Please sign in to comment.