Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Streaming writer #2618

Merged
merged 13 commits into from
Sep 18, 2020
132 changes: 80 additions & 52 deletions src/dbnode/persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ func (w *writer) close() error {
return err
}

return w.closeWOIndex()
}

func (w *writer) closeWOIndex() error {
if err := w.digestFdWithDigestContents.WriteDigests(
w.infoFdWithDigest.Digest().Sum32(),
w.indexFdWithDigest.Digest().Sum32(),
Expand Down Expand Up @@ -427,7 +431,7 @@ func (w *writer) writeIndexRelatedFiles() error {
return err
}

return w.writeInfoFileContents(bloomFilter, summaries)
return w.writeInfoFileContents(bloomFilter, summaries, w.currIdx)
}

func (w *writer) writeIndexFileContents(
Expand Down Expand Up @@ -464,40 +468,6 @@ func (w *writer) writeIndexFileContents(
return err
}

var encodedTags []byte
if numTags := tagsIter.Remaining(); numTags > 0 {
tagsEncoder.Reset()
if err := tagsEncoder.Encode(tagsIter); err != nil {
return err
}

encodedTagsData, ok := tagsEncoder.Data()
if !ok {
return errWriterEncodeTagsDataNotAccessible
}

encodedTags = encodedTagsData.Bytes()
}

entry := schema.IndexEntry{
Index: entry.index,
ID: id,
Size: int64(entry.size),
Offset: entry.dataFileOffset,
DataChecksum: int64(entry.dataChecksum),
EncodedTags: encodedTags,
}

w.encoder.Reset()
if err := w.encoder.EncodeIndexEntry(entry); err != nil {
return err
}

data := w.encoder.Bytes()
if _, err := w.indexFdWithDigest.Write(data); err != nil {
return err
}

// Add to the bloom filter, note this must be zero alloc or else this will
// cause heavy GC churn as we flush millions of series at end of each
// time window
Expand All @@ -509,14 +479,62 @@ func (w *writer) writeIndexFileContents(
w.indexEntries[i].indexFileOffset = offset
}

offset += int64(len(data))
offset, err = w.writeIndex(id, tagsIter, tagsEncoder, entry, offset)
robskillington marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

prevID = id
}

return nil
}

func (w *writer) writeIndex(
id []byte,
tagsIter ident.TagIterator,
tagsEncoder serialize.TagEncoder,
entry indexEntry,
offset int64,
) (int64, error) {
var encodedTags []byte
if numTags := tagsIter.Remaining(); numTags > 0 {
tagsEncoder.Reset()
if err := tagsEncoder.Encode(tagsIter); err != nil {
return offset, err
}

encodedTagsData, ok := tagsEncoder.Data()
if !ok {
return offset, errWriterEncodeTagsDataNotAccessible
}

encodedTags = encodedTagsData.Bytes()
}

e := schema.IndexEntry{
Index: entry.index,
ID: id,
Size: int64(entry.size),
Offset: entry.dataFileOffset,
DataChecksum: int64(entry.dataChecksum),
EncodedTags: encodedTags,
}

w.encoder.Reset()
if err := w.encoder.EncodeIndexEntry(e); err != nil {
return offset, err
}

data := w.encoder.Bytes()
if _, err := w.indexFdWithDigest.Write(data); err != nil {
return offset, err
}

offset += int64(len(data))
return offset, nil
linasm marked this conversation as resolved.
Show resolved Hide resolved
}

func (w *writer) writeSummariesFileContents(
summaryEvery int,
) (int, error) {
Expand All @@ -525,29 +543,38 @@ func (w *writer) writeSummariesFileContents(
if i%summaryEvery != 0 {
continue
}

summary := schema.IndexSummary{
Index: w.indexEntries[i].index,
ID: w.indexEntries[i].metadata.BytesID(),
IndexEntryOffset: w.indexEntries[i].indexFileOffset,
}

w.encoder.Reset()
if err := w.encoder.EncodeIndexSummary(summary); err != nil {
return 0, err
}

data := w.encoder.Bytes()
if _, err := w.summariesFdWithDigest.Write(data); err != nil {
err := w.writeSummariesEntry(w.indexEntries[i])
if err != nil {
return 0, err
}

summaries++
}

return summaries, nil
}

func (w *writer) writeSummariesEntry(
entry indexEntry,
) error {
summary := schema.IndexSummary{
Index: entry.index,
ID: entry.metadata.BytesID(),
IndexEntryOffset: entry.indexFileOffset,
}

w.encoder.Reset()
if err := w.encoder.EncodeIndexSummary(summary); err != nil {
return err
}

data := w.encoder.Bytes()
if _, err := w.summariesFdWithDigest.Write(data); err != nil {
return err
}

return nil
}

func (w *writer) writeBloomFilterFileContents(
bloomFilter *bloom.BloomFilter,
) error {
Expand All @@ -557,6 +584,7 @@ func (w *writer) writeBloomFilterFileContents(
func (w *writer) writeInfoFileContents(
bloomFilter *bloom.BloomFilter,
summaries int,
entriesCount int64,
) error {
snapshotBytes, err := w.snapshotID.MarshalBinary()
if err != nil {
Expand All @@ -569,7 +597,7 @@ func (w *writer) writeInfoFileContents(
SnapshotTime: xtime.ToNanoseconds(w.snapshotTime),
SnapshotID: snapshotBytes,
BlockSize: int64(w.blockSize),
Entries: w.currIdx,
Entries: entriesCount,
MajorVersion: schema.MajorVersion,
MinorVersion: schema.MinorVersion,
Summaries: schema.IndexSummariesInfo{
Expand Down
Loading