-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Support checkpointing in cold flusher and preparing index writer per block #3040
Changes from all commits
6d8b1e8
c2b1a92
ea6977d
9740105
72e2297
750e7de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,22 +115,106 @@ type dataPersistManager struct { | |
snapshotID uuid.UUID | ||
} | ||
|
||
type indexPersistManager struct { | ||
writer IndexFileSetWriter | ||
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter | ||
|
||
type singleUseIndexWriterState struct { | ||
// identifiers required to know which file to open | ||
// after persistence is over | ||
fileSetIdentifier FileSetFileIdentifier | ||
fileSetType persist.FileSetType | ||
|
||
// track state of writers | ||
writeErr error | ||
initialized bool | ||
// track state of writer | ||
writeErr error | ||
} | ||
|
||
// Support writing to multiple index blocks/filesets during index persist. | ||
// This allows us to prepare an index fileset writer per block start. | ||
type singleUseIndexWriter struct { | ||
// back-ref to the index persist manager so we can share resources there | ||
manager *indexPersistManager | ||
writer IndexFileSetWriter | ||
|
||
state singleUseIndexWriterState | ||
} | ||
|
||
func (s *singleUseIndexWriter) persistIndex(builder segment.Builder) error { | ||
// Lock the index persist manager as we're sharing the segment builder as a resource. | ||
s.manager.Lock() | ||
defer s.manager.Unlock() | ||
|
||
markError := func(err error) { | ||
s.state.writeErr = err | ||
} | ||
if err := s.state.writeErr; err != nil { | ||
return fmt.Errorf("encountered error: %w, skipping further attempts to persist data", err) | ||
} | ||
|
||
if err := s.manager.segmentWriter.Reset(builder); err != nil { | ||
markError(err) | ||
return err | ||
} | ||
|
||
if err := s.writer.WriteSegmentFileSet(s.manager.segmentWriter); err != nil { | ||
markError(err) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *singleUseIndexWriter) closeIndex() ([]segment.Segment, error) { | ||
s.manager.Lock() | ||
defer s.manager.Unlock() | ||
|
||
// This writer will be thrown away after we're done persisting. | ||
defer func() { | ||
s.state = singleUseIndexWriterState{fileSetType: -1} | ||
s.manager = nil | ||
s.writer = nil | ||
}() | ||
Comment on lines
+167
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: If this is the case honestly we could probably just avoid even doing anything at the end here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yea, that makes sense, I think we should keep this for hygiene but I suppose I'm being overly defensive here haha. |
||
|
||
// s.e. we're done writing all segments for PreparedIndexPersist. | ||
// so we can close the writer. | ||
if err := s.writer.Close(); err != nil { | ||
return nil, err | ||
} | ||
|
||
// only attempt to retrieve data if we have not encountered errors during | ||
// any writes. | ||
if err := s.state.writeErr; err != nil { | ||
return nil, err | ||
} | ||
|
||
// and then we get persistent segments backed by mmap'd data so the index | ||
// can safely evict the segment's we have just persisted. | ||
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ | ||
ReaderOptions: IndexReaderOpenOptions{ | ||
Identifier: s.state.fileSetIdentifier, | ||
FileSetType: s.state.fileSetType, | ||
}, | ||
FilesystemOptions: s.manager.opts, | ||
newReaderFn: s.manager.newReaderFn, | ||
newPersistentSegmentFn: s.manager.newPersistentSegmentFn, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return result.Segments, nil | ||
} | ||
|
||
type indexPersistManager struct { | ||
sync.Mutex | ||
|
||
// segmentWriter holds the bulk of the re-usable in-mem resources so | ||
// we want to share this across writers. | ||
segmentWriter m3ninxpersist.MutableSegmentFileSetWriter | ||
|
||
// hooks used for testing | ||
newReaderFn newIndexReaderFn | ||
newPersistentSegmentFn newPersistentSegmentFn | ||
newIndexWriterFn newIndexWriterFn | ||
|
||
// options used by index writers | ||
opts Options | ||
} | ||
|
||
type newIndexReaderFn func(Options) (IndexFileSetReader, error) | ||
|
@@ -140,6 +224,8 @@ type newPersistentSegmentFn func( | |
m3ninxfs.Options, | ||
) (m3ninxfs.Segment, error) | ||
|
||
type newIndexWriterFn func(Options) (IndexFileSetWriter, error) | ||
|
||
type persistManagerMetrics struct { | ||
writeDurationMs tally.Gauge | ||
throttleDurationMs tally.Gauge | ||
|
@@ -163,11 +249,6 @@ func NewPersistManager(opts Options) (persist.Manager, error) { | |
return nil, err | ||
} | ||
|
||
idxWriter, err := NewIndexWriter(opts) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
segmentWriter, err := m3ninxpersist.NewMutableSegmentFileSetWriter( | ||
opts.FSTWriterOptions()) | ||
if err != nil { | ||
|
@@ -186,30 +267,31 @@ func NewPersistManager(opts Options) (persist.Manager, error) { | |
snapshotMetadataWriter: NewSnapshotMetadataWriter(opts), | ||
}, | ||
indexPM: indexPersistManager{ | ||
writer: idxWriter, | ||
segmentWriter: segmentWriter, | ||
// fs opts are used by underlying index writers | ||
opts: opts, | ||
}, | ||
status: persistManagerIdle, | ||
metrics: newPersistManagerMetrics(scope), | ||
} | ||
pm.indexPM.newReaderFn = NewIndexReader | ||
pm.indexPM.newPersistentSegmentFn = m3ninxpersist.NewSegment | ||
pm.indexPM.newIndexWriterFn = NewIndexWriter | ||
pm.runtimeOptsListener = opts.RuntimeOptionsManager().RegisterListener(pm) | ||
|
||
return pm, nil | ||
} | ||
|
||
func (pm *persistManager) reset() { | ||
func (pm *persistManager) resetWithLock() error { | ||
pm.status = persistManagerIdle | ||
pm.start = timeZero | ||
pm.count = 0 | ||
pm.bytesWritten = 0 | ||
pm.worked = 0 | ||
pm.slept = 0 | ||
pm.indexPM.segmentWriter.Reset(nil) | ||
pm.indexPM.writeErr = nil | ||
pm.indexPM.initialized = false | ||
pm.dataPM.snapshotID = nil | ||
|
||
return pm.indexPM.segmentWriter.Reset(nil) | ||
} | ||
|
||
// StartIndexPersist is called by the databaseFlushManager to begin the persist process for | ||
|
@@ -271,83 +353,32 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis | |
IndexVolumeType: opts.IndexVolumeType, | ||
} | ||
|
||
writer, err := pm.indexPM.newIndexWriterFn(pm.opts) | ||
if err != nil { | ||
return prepared, err | ||
} | ||
idxWriter := &singleUseIndexWriter{ | ||
manager: &pm.indexPM, | ||
writer: writer, | ||
state: singleUseIndexWriterState{ | ||
// track which file we are writing in the persist manager, so we | ||
// know which file to read back on `closeIndex` being called. | ||
fileSetIdentifier: fileSetID, | ||
fileSetType: opts.FileSetType, | ||
}, | ||
} | ||
// create writer for required fileset file. | ||
if err := pm.indexPM.writer.Open(idxWriterOpts); err != nil { | ||
if err := idxWriter.writer.Open(idxWriterOpts); err != nil { | ||
return prepared, err | ||
} | ||
|
||
// track which file we are writing in the persist manager, so we | ||
// know which file to read back on `closeIndex` being called. | ||
pm.indexPM.fileSetIdentifier = fileSetID | ||
pm.indexPM.fileSetType = opts.FileSetType | ||
pm.indexPM.initialized = true | ||
|
||
// provide persistManager hooks into PreparedIndexPersist object | ||
prepared.Persist = pm.persistIndex | ||
prepared.Close = pm.closeIndex | ||
prepared.Persist = idxWriter.persistIndex | ||
prepared.Close = idxWriter.closeIndex | ||
|
||
return prepared, nil | ||
} | ||
|
||
func (pm *persistManager) persistIndex(builder segment.Builder) error { | ||
// FOLLOWUP(prateek): need to use-rate limiting runtime options in this code path | ||
markError := func(err error) { | ||
pm.indexPM.writeErr = err | ||
} | ||
if err := pm.indexPM.writeErr; err != nil { | ||
return fmt.Errorf("encountered error: %v, skipping further attempts to persist data", err) | ||
} | ||
|
||
if err := pm.indexPM.segmentWriter.Reset(builder); err != nil { | ||
markError(err) | ||
return err | ||
} | ||
|
||
if err := pm.indexPM.writer.WriteSegmentFileSet(pm.indexPM.segmentWriter); err != nil { | ||
markError(err) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (pm *persistManager) closeIndex() ([]segment.Segment, error) { | ||
// ensure StartIndexPersist was called | ||
if !pm.indexPM.initialized { | ||
return nil, errPersistManagerNotPersisting | ||
} | ||
pm.indexPM.initialized = false | ||
|
||
// i.e. we're done writing all segments for PreparedIndexPersist. | ||
// so we can close the writer. | ||
if err := pm.indexPM.writer.Close(); err != nil { | ||
return nil, err | ||
} | ||
|
||
// only attempt to retrieve data if we have not encountered errors during | ||
// any writes. | ||
if err := pm.indexPM.writeErr; err != nil { | ||
return nil, err | ||
} | ||
|
||
// and then we get persistent segments backed by mmap'd data so the index | ||
// can safely evict the segment's we have just persisted. | ||
result, err := ReadIndexSegments(ReadIndexSegmentsOptions{ | ||
ReaderOptions: IndexReaderOpenOptions{ | ||
Identifier: pm.indexPM.fileSetIdentifier, | ||
FileSetType: pm.indexPM.fileSetType, | ||
}, | ||
FilesystemOptions: pm.opts, | ||
newReaderFn: pm.indexPM.newReaderFn, | ||
newPersistentSegmentFn: pm.indexPM.newPersistentSegmentFn, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return result.Segments, nil | ||
} | ||
|
||
// DoneIndex is called by the databaseFlushManager to finish the index persist process. | ||
func (pm *persistManager) DoneIndex() error { | ||
pm.Lock() | ||
|
@@ -362,9 +393,7 @@ func (pm *persistManager) DoneIndex() error { | |
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) | ||
|
||
// Reset state | ||
pm.reset() | ||
|
||
return nil | ||
return pm.resetWithLock() | ||
} | ||
|
||
// StartFlushPersist is called by the databaseFlushManager to begin the persist process. | ||
|
@@ -558,7 +587,7 @@ func (pm *persistManager) DoneFlush() error { | |
return errPersistManagerCannotDoneFlushNotFlush | ||
} | ||
|
||
return pm.doneShared() | ||
return pm.doneSharedWithLock() | ||
} | ||
|
||
// DoneSnapshot is called by the databaseFlushManager to finish the snapshot persist process. | ||
|
@@ -594,23 +623,21 @@ func (pm *persistManager) DoneSnapshot( | |
return fmt.Errorf("error writing out snapshot metadata file: %v", err) | ||
} | ||
|
||
return pm.doneShared() | ||
return pm.doneSharedWithLock() | ||
} | ||
|
||
// Close all resources. | ||
func (pm *persistManager) Close() { | ||
pm.runtimeOptsListener.Close() | ||
} | ||
|
||
func (pm *persistManager) doneShared() error { | ||
func (pm *persistManager) doneSharedWithLock() error { | ||
// Emit timing metrics | ||
pm.metrics.writeDurationMs.Update(float64(pm.worked / time.Millisecond)) | ||
pm.metrics.throttleDurationMs.Update(float64(pm.slept / time.Millisecond)) | ||
|
||
// Reset state | ||
pm.reset() | ||
|
||
return nil | ||
return pm.resetWithLock() | ||
} | ||
|
||
func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also hold the
s.manager.Lock
for same level of lock granularity as the persistIndex method? Even if not required today, someone might add something to the close method that could be used concurrently by mistake later.