Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Support checkpointing in cold flusher and preparing index writer per block #3040

Merged
merged 6 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 138 additions & 87 deletions src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand All @@ -63,6 +64,7 @@ var (
errPersistManagerFileSetAlreadyExists = errors.New("persist manager cannot prepare, fileset already exists")
errPersistManagerCannotDoneSnapshotNotSnapshot = errors.New("persist manager cannot done snapshot, file set type is not snapshot")
errPersistManagerCannotDoneFlushNotFlush = errors.New("persist manager cannot done flush, file set type is not flush")
errPersistManagerIndexWriterAlreadyExists = errors.New("persist manager cannot create index writer, already exists")
)

type sleepFn func(time.Duration)
Expand Down Expand Up @@ -115,22 +117,100 @@ type dataPersistManager struct {
snapshotID uuid.UUID
}

type indexPersistManager struct {
writer IndexFileSetWriter
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter
// Support writing to multiple index blocks/filesets during index persist.
// This allows us to prepare an index fileset writer per block start.
type singleUseIndexWriter struct {
// back-ref to the index persist manager so we can share resources there
manager *indexPersistManager
writer IndexFileSetWriter

// identifiers required to know which file to open
// after persistence is over
fileSetIdentifier FileSetFileIdentifier
fileSetType persist.FileSetType

// track state of writers
writeErr error
initialized bool
// track state of writer
writeErr error
}

func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error {
// Lock the index persist manager as we're sharing the segment builder as a resource.
s.manager.Lock()
defer s.manager.Unlock()

markError := func(err error) {
s.writeErr = err
}
if err := s.writeErr; err != nil {
return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err)
}

if err := s.manager.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also hold the s.manager.Lock for same level of lock granularity as the persistIndex method? Even if not required today, someone might add something to the close method that could be used concurrently by mistake later.

// This writer will be thrown away after we're done persisting.
defer func() {
s.writeErr = nil
s.fileSetType = -1
s.fileSetIdentifier = FileSetFileIdentifier{}
Copy link
Collaborator

@robskillington robskillington Dec 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth packing these into a single struct so can totally zero out?

i.e.

type singleUseIndexWriter struct {
	// back-ref to the index persist manager so we can share resources there
	manager *indexPersistManager
	writer  IndexFileSetWriter

	// state holds the mutable state that is cleared on reset
	state singleUseIndexWriterState
}

type singleUseIndexWriterState struct {
	// identifiers required to know which file to open
	// after persistence is over
	fileSetIdentifier FileSetFileIdentifier
	fileSetType       persist.FileSetType

	// track state of writer
	writeErr error
}

Then can reset with (and any new fields will auto get reset to default values):

defer func() {
	s.state = singleUseIndexWriterState{fileSetType: -1}
}()

}()

// s.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := s.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := s.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: s.fileSetIdentifier,
FileSetType: s.fileSetType,
},
FilesystemOptions: s.manager.opts,
newReaderFn: s.manager.newReaderFn,
newPersistentSegmentFn: s.manager.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

type indexPersistManager struct {
sync.Mutex

writersByBlockStart map[xtime.UnixNano]*singleUseIndexWriter
// segmentWriter holds the bulk of the re-usable in-mem resources so
// we want to share this across writers.
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter

// hooks used for testing
newReaderFn newIndexReaderFn
newPersistentSegmentFn newPersistentSegmentFn
newIndexWriterFn newIndexWriterFn

// options used by index writers
opts Options
}

type newIndexReaderFn func(Options) (IndexFileSetReader, error)
Expand All @@ -140,6 +220,8 @@ type newPersistentSegmentFn func(
m3ninxfs.Options,
) (m3ninxfs.Segment, error)

type newIndexWriterFn func(Options) (IndexFileSetWriter, error)

type persistManagerMetrics struct {
writeDurationMs tally.Gauge
throttleDurationMs tally.Gauge
Expand All @@ -163,11 +245,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
return nil, err
}

idxWriter, err := NewIndexWriter(opts)
if err != nil {
return nil, err
}

segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter(
opts.FSTWriterOptions())
if err != nil {
Expand All @@ -186,30 +263,35 @@ func NewPersistManager(opts Options) (persist.Manager, error) {
snapshotMetadataWriter: NewSnapshotMetadataWriter(opts),
},
indexPM: indexPersistManager{
writer: idxWriter,
segmentWriter: segmentWriter,
writersByBlockStart: make(map[xtime.UnixNano]*singleUseIndexWriter),
segmentWriter: segmentWriter,
// fs opts are used by underlying index writers
opts: opts,
},
status: persistManagerIdle,
metrics: newPersistManagerMetrics(scope),
}
pm.indexPM.newReaderFn = NewIndexReader
pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment
pm.indexPM.newIndexWriterFn = NewIndexWriter
pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm)

return pm, nil
}

func (pm *persistManager) reset() {
func (pm *persistManager) resetWithLock() {
pm.status = persistManagerIdle
pm.start = timeZero
pm.count = 0
pm.bytesWritten = 0
pm.worked = 0
pm.slept = 0
pm.indexPM.segmentWriter.Reset(nil)
pm.indexPM.writeErr = nil
pm.indexPM.initialized = false
pm.dataPM.snapshotID = nil

pm.indexPM.segmentWriter.Reset(nil)
for blockStart := range pm.indexPM.writersByBlockStart {
delete(pm.indexPM.writersByBlockStart, blockStart)
}
}

// StartIndexPersist is called by the databaseFlushManager to begin the persist process for
Expand Down Expand Up @@ -271,83 +353,27 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis
IndexVolumeType: opts.IndexVolumeType,
}

idxWriter, err := pm.ensureSingleIndexWriterForBlock(blockStart)
if err != nil {
return prepared, err
}
// create writer for required fileset file.
if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil {
if err := idxWriter.writer.Open(idxWriterOpts); err != nil {
return prepared, err
}

// track which file we are writing in the persist manager, so we
// know which file to read back on `closeIndex` being called.
pm.indexPM.fileSetIdentifier = fileSetID
pm.indexPM.fileSetType = opts.FileSetType
pm.indexPM.initialized = true
idxWriter.fileSetIdentifier = fileSetID
idxWriter.fileSetType = opts.FileSetType

// provide persistManager hooks into PreparedIndexPersist object
prepared.Persist = pm.persistIndex
prepared.Close = pm.closeIndex
prepared.Persist = idxWriter.persistIndex
prepared.Close = idxWriter.closeIndex

return prepared, nil
}

func (pm *persistManager) persistIndex(builder segment.Builder) error {
// FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path
markError := func(err error) {
pm.indexPM.writeErr = err
}
if err := pm.indexPM.writeErr; err != nil {
return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err)
}

if err := pm.indexPM.segmentWriter.Reset(builder); err != nil {
markError(err)
return err
}

if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil {
markError(err)
return err
}

return nil
}

func (pm *persistManager) closeIndex() ([]segment.Segment, error) {
// ensure StartIndexPersist was called
if !pm.indexPM.initialized {
return nil, errPersistManagerNotPersisting
}
pm.indexPM.initialized = false

// i.e. we're done writing all segments for PreparedIndexPersist.
// so we can close the writer.
if err := pm.indexPM.writer.Close(); err != nil {
return nil, err
}

// only attempt to retrieve data if we have not encountered errors during
// any writes.
if err := pm.indexPM.writeErr; err != nil {
return nil, err
}

// and then we get persistent segments backed by mmap'd data so the index
// can safely evict the segment's we have just persisted.
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{
ReaderOptions: IndexReaderOpenOptions{
Identifier: pm.indexPM.fileSetIdentifier,
FileSetType: pm.indexPM.fileSetType,
},
FilesystemOptions: pm.opts,
newReaderFn: pm.indexPM.newReaderFn,
newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn,
})
if err != nil {
return nil, err
}

return result.Segments, nil
}

// DoneIndex is called by the databaseFlushManager to finish the index persist process.
func (pm *persistManager) DoneIndex() error {
pm.Lock()
Expand All @@ -362,7 +388,7 @@ func (pm *persistManager) DoneIndex() error {
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()
pm.resetWithLock()

return nil
}
Expand Down Expand Up @@ -558,7 +584,7 @@ func (pm *persistManager) DoneFlush() error {
return errPersistManagerCannotDoneFlushNotFlush
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process.
Expand Down Expand Up @@ -594,21 +620,21 @@ func (pm *persistManager) DoneSnapshot(
return fmt.Errorf("error writing out snapshot metadata file: %v", err)
}

return pm.doneShared()
return pm.doneSharedWithLock()
}

// Close all resources.
func (pm *persistManager) Close() {
pm.runtimeOptsListener.Close()
}

func (pm *persistManager) doneShared() error {
func (pm *persistManager) doneSharedWithLock() error {
// Emit timing metrics
pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond))
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond))

// Reset state
pm.reset()
pm.resetWithLock()

return nil
}
Expand Down Expand Up @@ -644,3 +670,28 @@ func (pm *persistManager) SetRuntimeOptions(value runtime.Options) {
pm.currRateLimitOpts = value.PersistRateLimitOptions()
pm.Unlock()
}

// Ensure only a single index writer is ever created for a block, returns an error if an index writer
// already exists.
func (pm *persistManager) ensureSingleIndexWriterForBlock(blockStart time.Time) (*singleUseIndexWriter, error) {
pm.Lock()
defer pm.Unlock()

idxWriter, ok := pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)]
// Ensure that we have not already created an index writer for this block.
if ok {
return nil, errPersistManagerIndexWriterAlreadyExists
}

writer, err := pm.indexPM.newIndexWriterFn(pm.opts)
if err != nil {
return nil, err
}
idxWriter = &singleUseIndexWriter{
manager: &pm.indexPM,
writer: writer,
}
pm.indexPM.writersByBlockStart[xtime.ToUnixNano(blockStart)] = idxWriter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of caching these per block (which would require us to age out and invalidate results from cache) should we just allocate each time?

It doesn't look like these are that big, they just have a pointer to the writer and manager, the only thing they really create is the index writer which is fairly small.


return idxWriter, nil
}
Loading