Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Support checkpointing in cold flusher and preparing index writer per block #3040

Merged
merged 6 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 121 additions & 94 deletions src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,106 @@ type dataPersistManager struct {
snapshotID uuid.UUID
}

type indexPersistManager struct {
writer IndexFileSetWriter
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter

type singleUseIndexWriterState struct {
// identifiers required to know which file to open
// after persistence is over
fileSetIdentifier FileSetFileIdentifier
fileSetType persist.FileSetType

// track state of writers
writeErr error
initialized bool
// track state of writer
writeErr error
}

// Support writing to multiple index blocks/filesets during index persist.
// This allows us to prepare an index fileset writer per block start.
type singleUseIndexWriter struct {
// back-ref to the index persist manager so we can share resources there
manager *indexPersistManager
writer IndexFileSetWriter

state singleUseIndexWriterState
}

func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error {
// Lock the index persist manager as we're sharing the segment builder as a resource.
s.manager.Lock()
defer s.manager.Unlock()

markError := func(err error) {
s.state.writeErr = err
}
if err := s.state.writeErr; err != nil {
return fmt.Errorf("encountered error: %w, skipping further attempts to persist data", err)
}

if err := s.manager.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also hold the s.manager.Lock for same level of lock granularity as the persistIndex method? Even if not required today, someone might add something to the close method that could be used concurrently by mistake later.

s.manager.Lock()
defer s.manager.Unlock()

// This writer will be thrown away after we're done persisting.
defer func() {
s.state = singleUseIndexWriterState{fileSetType: -1}
s.manager = nil
s.writer = nil
}()
Comment on lines +167 to +172
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this is the case honestly we could probably just avoid even doing anything at the end here.

Copy link
Contributor Author

@notbdu notbdu Dec 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea, that makes sense, I think we should keep this for hygiene but I suppose I'm being overly defensive here haha.


// s.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := s.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := s.state.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: s.state.fileSetIdentifier,
FileSetType: s.state.fileSetType,
},
FilesystemOptions: s.manager.opts,
newReaderFn: s.manager.newReaderFn,
newPersistentSegmentFn: s.manager.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

type indexPersistManager struct {
sync.Mutex

// segmentWriter holds the bulk of the re-usable in-mem resources so
// we want to share this across writers.
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter

// hooks used for testing
newReaderFn newIndexReaderFn
newPersistentSegmentFn newPersistentSegmentFn
newIndexWriterFn newIndexWriterFn

// options used by index writers
opts Options
}

type newIndexReaderFn func(Options) (IndexFileSetReader, error)
Expand All @@ -140,6 +224,8 @@ type newPersistentSegmentFn func(
m3ninxfs.Options,
) (m3ninxfs.Segment, error)

type newIndexWriterFn func(Options) (IndexFileSetWriter, error)

type persistManagerMetrics struct {
writeDurationMs tally.Gauge
throttleDurationMs tally.Gauge
Expand All @@ -163,11 +249,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
return nil, err
}

idxWriter, err := NewIndexWriter(opts)
if err != nil {
return nil, err
}

segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter(
opts.FSTWriterOptions())
if err != nil {
Expand All @@ -186,30 +267,31 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
snapshotMetadataWriter: NewSnapshotMetadataWriter(opts),
},
indexPM: indexPersistManager{
writer: idxWriter,
segmentWriter: segmentWriter,
// fs opts are used by underlying index writers
opts: opts,
},
status: persistManagerIdle,
metrics: newPersistManagerMetrics(scope),
}
pm.indexPM.newReaderFn = NewIndexReader
pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment
pm.indexPM.newIndexWriterFn = NewIndexWriter
pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm)

return pm, nil
}

func (pm *persistManager) reset() {
func (pm *persistManager) resetWithLock() error {
pm.status = persistManagerIdle
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
pm.slept = 0
pm.indexPM.segmentWriter.Reset(nil)
pm.indexPM.writeErr = nil
pm.indexPM.initialized = false
pm.dataPM.snapshotID = nil

return pm.indexPM.segmentWriter.Reset(nil)
}

// StartIndexPersist is called by the databaseFlushManager to begin the persist process for
Expand Down Expand Up @@ -271,83 +353,32 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis
IndexVolumeType: opts.IndexVolumeType,
}

writer, err := pm.indexPM.newIndexWriterFn(pm.opts)
if err != nil {
return prepared, err
}
idxWriter := &singleUseIndexWriter{
manager: &pm.indexPM,
writer: writer,
state: singleUseIndexWriterState{
// track which file we are writing in the persist manager, so we
// know which file to read back on `closeIndex` being called.
fileSetIdentifier: fileSetID,
fileSetType: opts.FileSetType,
},
}
// create writer for required fileset file.
if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil {
if err := idxWriter.writer.Open(idxWriterOpts); err != nil {
return prepared, err
}

// track which file we are writing in the persist manager, so we
// know which file to read back on `closeIndex` being called.
pm.indexPM.fileSetIdentifier = fileSetID
pm.indexPM.fileSetType = opts.FileSetType
pm.indexPM.initialized = true

// provide persistManager hooks into PreparedIndexPersist object
prepared.Persist = pm.persistIndex
prepared.Close = pm.closeIndex
prepared.Persist = idxWriter.persistIndex
prepared.Close = idxWriter.closeIndex

return prepared, nil
}

func (pm *persistManager) persistIndex(builder segment.Builder) error {
// FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path
markError := func(err error) {
pm.indexPM.writeErr = err
}
if err := pm.indexPM.writeErr; err != nil {
return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err)
}

if err := pm.indexPM.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (pm *persistManager) closeIndex() ([]segment.Segment, error) {
// ensure StartIndexPersist was called
if !pm.indexPM.initialized {
return nil, errPersistManagerNotPersisting
}
pm.indexPM.initialized = false

// i.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := pm.indexPM.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := pm.indexPM.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: pm.indexPM.fileSetIdentifier,
FileSetType: pm.indexPM.fileSetType,
},
FilesystemOptions: pm.opts,
newReaderFn: pm.indexPM.newReaderFn,
newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

// DoneIndex is called by the databaseFlushManager to finish the index persist process.
func (pm *persistManager) DoneIndex() error {
pm.Lock()
Expand All @@ -362,9 +393,7 @@ func (pm *persistManager) DoneIndex() error {
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()

return nil
return pm.resetWithLock()
}

// StartFlushPersist is called by the databaseFlushManager to begin the persist process.
Expand Down Expand Up @@ -558,7 +587,7 @@ func (pm *persistManager) DoneFlush() error {
return errPersistManagerCannotDoneFlushNotFlush
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process.
Expand Down Expand Up @@ -594,23 +623,21 @@ func (pm *persistManager) DoneSnapshot(
return fmt.Errorf("error writing out snapshot metadata file: %v", err)
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// Close all resources.
func (pm *persistManager) Close() {
pm.runtimeOptsListener.Close()
}

func (pm *persistManager) doneShared() error {
func (pm *persistManager) doneSharedWithLock() error {
// Emit timing metrics
pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond))
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()

return nil
return pm.resetWithLock()
}

func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) {
Expand Down
Loading