From 8ccb4b80fcab1cb11f0e816fa72d3762652d5193 Mon Sep 17 00:00:00 2001 From: Gediminas Date: Wed, 9 Sep 2020 11:56:25 +0300 Subject: [PATCH 1/7] [dbnode] Large tiles writer --- src/dbnode/persist/fs/write.go | 132 +++++---- src/dbnode/persist/fs/write_large_tiles.go | 260 ++++++++++++++++++ .../persist/fs/write_large_tiles_test.go | 224 +++++++++++++++ 3 files changed, 564 insertions(+), 52 deletions(-) create mode 100644 src/dbnode/persist/fs/write_large_tiles.go create mode 100644 src/dbnode/persist/fs/write_large_tiles_test.go diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index b7b6b3574a..414bf36b37 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -369,6 +369,10 @@ func (w *writer) close() error { return err } + return w.closeWOIndex() +} + +func (w *writer) closeWOIndex() error { if err := w.digestFdWithDigestContents.WriteDigests( w.infoFdWithDigest.Digest().Sum32(), w.indexFdWithDigest.Digest().Sum32(), @@ -427,7 +431,7 @@ func (w *writer) writeIndexRelatedFiles() error { return err } - return w.writeInfoFileContents(bloomFilter, summaries) + return w.writeInfoFileContents(bloomFilter, summaries, w.currIdx) } func (w *writer) writeIndexFileContents( @@ -464,40 +468,6 @@ func (w *writer) writeIndexFileContents( return err } - var encodedTags []byte - if numTags := tagsIter.Remaining(); numTags > 0 { - tagsEncoder.Reset() - if err := tagsEncoder.Encode(tagsIter); err != nil { - return err - } - - encodedTagsData, ok := tagsEncoder.Data() - if !ok { - return errWriterEncodeTagsDataNotAccessible - } - - encodedTags = encodedTagsData.Bytes() - } - - entry := schema.IndexEntry{ - Index: entry.index, - ID: id, - Size: int64(entry.size), - Offset: entry.dataFileOffset, - DataChecksum: int64(entry.dataChecksum), - EncodedTags: encodedTags, - } - - w.encoder.Reset() - if err := w.encoder.EncodeIndexEntry(entry); err != nil { - return err - } - - data := w.encoder.Bytes() - if _, err := w.indexFdWithDigest.Write(data); err != nil { - return err - } - // Add to the bloom filter, note this must be zero alloc or else this will // cause heavy GC churn as we flush millions of series at end of each // time window @@ -509,7 +479,10 @@ func (w *writer) writeIndexFileContents( w.indexEntries[i].indexFileOffset = offset } - offset += int64(len(data)) + offset, err = w.writeIndex(id, tagsIter, tagsEncoder, entry, offset) + if err != nil { + return err + } prevID = id } @@ -517,6 +490,51 @@ func (w *writer) writeIndexFileContents( return nil } +func (w *writer) writeIndex( + id []byte, + tagsIter ident.TagIterator, + tagsEncoder serialize.TagEncoder, + entry indexEntry, + offset int64, +) (int64, error) { + var encodedTags []byte + if numTags := tagsIter.Remaining(); numTags > 0 { + tagsEncoder.Reset() + if err := tagsEncoder.Encode(tagsIter); err != nil { + return offset, err + } + + encodedTagsData, ok := tagsEncoder.Data() + if !ok { + return offset, errWriterEncodeTagsDataNotAccessible + } + + encodedTags = encodedTagsData.Bytes() + } + + e := schema.IndexEntry{ + Index: entry.index, + ID: id, + Size: int64(entry.size), + Offset: entry.dataFileOffset, + DataChecksum: int64(entry.dataChecksum), + EncodedTags: encodedTags, + } + + w.encoder.Reset() + if err := w.encoder.EncodeIndexEntry(e); err != nil { + return offset, err + } + + data := w.encoder.Bytes() + if _, err := w.indexFdWithDigest.Write(data); err != nil { + return offset, err + } + + offset += int64(len(data)) + return offset, nil +} + func (w *writer) writeSummariesFileContents( summaryEvery int, ) (int, error) { @@ -525,29 +543,38 @@ func (w *writer) writeSummariesFileContents( if i%summaryEvery != 0 { continue } - - summary := schema.IndexSummary{ - Index: w.indexEntries[i].index, - ID: w.indexEntries[i].metadata.BytesID(), - IndexEntryOffset: w.indexEntries[i].indexFileOffset, - } - - w.encoder.Reset() - if err := w.encoder.EncodeIndexSummary(summary); err != nil { - return 0, err - } - - data := w.encoder.Bytes() - if _, err := w.summariesFdWithDigest.Write(data); err != nil { + err := w.writeSummariesEntry(w.indexEntries[i]) + if err != nil { return 0, err } - summaries++ } return summaries, nil } +func (w *writer) writeSummariesEntry( + entry indexEntry, +) error { + summary := schema.IndexSummary{ + Index: entry.index, + ID: entry.metadata.BytesID(), + IndexEntryOffset: entry.indexFileOffset, + } + + w.encoder.Reset() + if err := w.encoder.EncodeIndexSummary(summary); err != nil { + return err + } + + data := w.encoder.Bytes() + if _, err := w.summariesFdWithDigest.Write(data); err != nil { + return err + } + + return nil +} + func (w *writer) writeBloomFilterFileContents( bloomFilter *bloom.BloomFilter, ) error { @@ -557,6 +584,7 @@ func (w *writer) writeBloomFilterFileContents( func (w *writer) writeInfoFileContents( bloomFilter *bloom.BloomFilter, summaries int, + entriesCount int64, ) error { snapshotBytes, err := w.snapshotID.MarshalBinary() if err != nil { @@ -569,7 +597,7 @@ func (w *writer) writeInfoFileContents( SnapshotTime: xtime.ToNanoseconds(w.snapshotTime), SnapshotID: snapshotBytes, BlockSize: int64(w.blockSize), - Entries: w.currIdx, + Entries: entriesCount, MajorVersion: schema.MajorVersion, MinorVersion: schema.MinorVersion, Summaries: schema.IndexSummariesInfo{ diff --git a/src/dbnode/persist/fs/write_large_tiles.go b/src/dbnode/persist/fs/write_large_tiles.go new file mode 100644 index 0000000000..04c132c7ac --- /dev/null +++ b/src/dbnode/persist/fs/write_large_tiles.go @@ -0,0 +1,260 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "bytes" + "fmt" + "math" + "time" + + "github.com/m3db/bloom/v4" + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/serialize" +) + +type LargeTilesWriter interface { + Open() error + Write(ctx context.Context, encoder encoding.Encoder, id ident.ID, tags ident.TagIterator) error + Close() error +} + +type LargeTilesWriterOptions struct { + Options Options + NamespaceID ident.ID + ShardID uint32 + BlockStart time.Time + BlockSize time.Duration + VolumeIndex int + PlannedRecordsCount uint +} + +type largeTilesWriter struct { + opts LargeTilesWriterOptions + writer *writer + writerOpts DataWriterOpenOptions + data []checked.Bytes + currIdx int64 + prevIDBytes []byte + summaryEvery int64 + bloomFilter *bloom.BloomFilter + indexOffset int64 + tagsEncoder serialize.TagEncoder + summaries int +} + +func NewLargeTilesWriter(opts LargeTilesWriterOptions) (LargeTilesWriter, error) { + w, err := NewWriter(opts.Options) + if err != nil { + return nil, err + } + + writerOpts := DataWriterOpenOptions{ + BlockSize: opts.BlockSize, + Identifier: FileSetFileIdentifier{ + Namespace: opts.NamespaceID, + Shard: opts.ShardID, + BlockStart: opts.BlockStart, + VolumeIndex: opts.VolumeIndex, + }, + FileSetType: persist.FileSetFlushType, + } + + m, k := bloom.EstimateFalsePositiveRate( + opts.PlannedRecordsCount, + opts.Options.IndexBloomFilterFalsePositivePercent(), + ) + bloomFilter := bloom.NewBloomFilter(m, k) + + summariesApprox := float64(opts.PlannedRecordsCount) * opts.Options.IndexSummariesPercent() + summaryEvery := 0 + if summariesApprox > 0 { + summaryEvery = int(math.Floor(float64(opts.PlannedRecordsCount) / summariesApprox)) + } + + return &largeTilesWriter{ + opts: opts, + writer: w.(*writer), + writerOpts: writerOpts, + summaryEvery: int64(summaryEvery), + data: make([]checked.Bytes, 2), + bloomFilter: bloomFilter, + }, nil +} + +func (w *largeTilesWriter) Open() error { + if err := w.writer.Open(w.writerOpts); err != nil { + return err + } + w.tagsEncoder = w.writer.tagEncoderPool.Get() + w.indexOffset = 0 + w.summaries = 0 + w.prevIDBytes = nil + return nil +} + +func (w *largeTilesWriter) Write( + ctx context.Context, + encoder encoding.Encoder, + id ident.ID, + tags ident.TagIterator, +) error { + // Need to check if w.prevIDBytes != nil, otherwise we can never write an empty string ID + if w.prevIDBytes != nil && bytes.Compare(id.Bytes(), w.prevIDBytes) <= 0 { + return fmt.Errorf("ids must be written in lexicographic order, no duplicates, but got %s followed by %s", w.prevIDBytes, id) + } + w.prevIDBytes = append(w.prevIDBytes[:0], id.Bytes()...) + + stream, ok := encoder.Stream(ctx) + if !ok { + // None of the datapoints passed the predicate. + return nil + } + defer stream.Finalize() + segment, err := stream.Segment() + if err != nil { + return err + } + w.data[0] = segment.Head + w.data[1] = segment.Tail + checksum := segment.CalculateChecksum() + entry, err := w.writeData(w.data, checksum) + if err != nil { + return err + } + + if entry != nil { + return w.writeIndexRelated(id, tags, entry) + } + + return nil +} + +func (w *largeTilesWriter) writeData( + data []checked.Bytes, + dataChecksum uint32, +) (*indexEntry, error) { + var size int64 + for _, d := range data { + if d == nil { + continue + } + size += int64(d.Len()) + } + if size == 0 { + return nil, nil + } + + // Warning: metadata is not set here and should not be used anywhere below. + entry := &indexEntry{ + index: w.currIdx, + dataFileOffset: w.writer.currOffset, + size: uint32(size), + dataChecksum: dataChecksum, + } + for _, d := range data { + if d == nil { + continue + } + if err := w.writer.writeData(d.Bytes()); err != nil { + return nil, err + } + } + + w.currIdx++ + + return entry, nil +} + +func (w *largeTilesWriter) writeIndexRelated( + id ident.ID, + tags ident.TagIterator, + entry *indexEntry, +) error { + // Add to the bloom filter, note this must be zero alloc or else this will + // cause heavy GC churn as we flush millions of series at end of each + // time window + w.bloomFilter.Add(id.Bytes()) + + if entry.index%w.summaryEvery == 0 { + // Capture the offset for when we write this summary back, only capture + // for every summary we'll actually write to avoid a few memcopies + entry.indexFileOffset = w.indexOffset + } + + indexOffset, err := w.writer.writeIndex(id.Bytes(), tags, w.tagsEncoder, *entry, w.indexOffset) + if err != nil { + return err + } + w.indexOffset = indexOffset + + if entry.index%w.summaryEvery == 0 { + entry.metadata = persist.NewMetadataFromIDAndTagIterator(id, nil, persist.MetadataOptions{}) + err = w.writer.writeSummariesEntry(*entry) + if err != nil { + return err + } + w.summaries++ + } + + return nil +} + +func (w *largeTilesWriter) Close() error { + w.tagsEncoder.Finalize() + for i := range w.data { + w.data[i] = nil + } + w.prevIDBytes = nil + + // Write the bloom filter bitset out + if err := w.writer.writeBloomFilterFileContents(w.bloomFilter); err != nil { + return err + } + + if err := w.writer.writeInfoFileContents(w.bloomFilter, w.summaries, w.currIdx); err != nil { + return err + } + + err := w.writer.closeWOIndex() + if err != nil { + w.writer.err = err + return err + } + + // NB(xichen): only write out the checkpoint file if there are no errors + // encountered between calling writer.Open() and writer.Close(). + if err := writeCheckpointFile( + w.writer.checkpointFilePath, + w.writer.digestFdWithDigestContents.Digest().Sum32(), + w.writer.digestBuf, + w.writer.newFileMode, + ); err != nil { + w.writer.err = err + return err + } + + return nil +} diff --git a/src/dbnode/persist/fs/write_large_tiles_test.go b/src/dbnode/persist/fs/write_large_tiles_test.go new file mode 100644 index 0000000000..6c1fc43144 --- /dev/null +++ b/src/dbnode/persist/fs/write_large_tiles_test.go @@ -0,0 +1,224 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" + "github.com/stretchr/testify/require" +) + +type testLargeTileEntry struct { + testEntry + values []float64 +} + +func newTestLargeTilesWriter( + t *testing.T, + filePathPrefix string, + shard uint32, + timestamp time.Time, + nextVersion int, + plannedEntries uint, +) LargeTilesWriter { + writer, err := NewLargeTilesWriter( + LargeTilesWriterOptions{ + NamespaceID: testNs1ID, + ShardID: shard, + BlockStart: timestamp, + BlockSize: testBlockSize, + + VolumeIndex: nextVersion, + PlannedRecordsCount: plannedEntries, + Options: testDefaultOpts. + SetFilePathPrefix(filePathPrefix). + SetWriterBufferSize(testWriterBufferSize), + }, + ) + require.NoError(t, err) + return writer +} + +func TestIdsMustBeSorted(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + entries := []testLargeTileEntry{ + {testEntry{"baz", nil, nil}, []float64{65536}}, + {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, + } + + w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + defer w.Close() + err := writeTestLargeTilesData(t, w, testWriterStart, entries) + require.Error(t, err) + require.Equal(t, "ids must be written in lexicographic order, no duplicates, but got baz followed by bar", + err.Error()) +} + +func TestDoubleWritesAreNotAllowed(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + entries := []testLargeTileEntry{ + {testEntry{"baz", nil, nil}, []float64{65536}}, + {testEntry{"baz", nil, nil}, []float64{4.8, 5.2, 6}}, + } + + w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + defer w.Close() + err := writeTestLargeTilesData(t, w, testWriterStart, entries) + require.Error(t, err) + require.Equal(t, "ids must be written in lexicographic order, no duplicates, but got baz followed by baz", + err.Error()) +} + +func TestSimpleLargeTilesReadWrite(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + entries := []testLargeTileEntry{ + {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, + {testEntry{"baz", nil, nil}, []float64{65536}}, + {testEntry{"cat", nil, nil}, []float64{100000}}, + {testEntry{"foo", nil, nil}, []float64{1, 2, 3}}, + {testEntry{"foo+bar=baz,qux=qaz", map[string]string{ + "bar": "baz", + "qux": "qaz", + }, nil}, []float64{7, 8, 9}}, + } + + w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + err := writeTestLargeTilesData(t, w, testWriterStart, entries) + require.NoError(t, err) + err = w.Close() + require.NoError(t, err) + + expectEntries := make([]testEntry, 0, len(entries)) + for _, e := range entries { + expectEntries = append(expectEntries, e.testEntry) + } + + r := newTestReader(t, filePathPrefix) + readTestData(t, r, 0, testWriterStart, expectEntries) +} + +func TestLargeTilesInfoReadWrite(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + entries := []testLargeTileEntry{ + {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, + {testEntry{"baz", nil, nil}, []float64{65536}}, + {testEntry{"cat", nil, nil}, []float64{100000}}, + {testEntry{"foo", nil, nil}, []float64{1, 2, 3}}, + } + + w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 12) + err := writeTestLargeTilesData(t, w, testWriterStart, entries) + require.NoError(t, err) + err = w.Close() + require.NoError(t, err) + + readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil, persist.FileSetFlushType) + require.Equal(t, 1, len(readInfoFileResults)) + for _, result := range readInfoFileResults { + require.NoError(t, result.Err.Error()) + } + + infoFile := readInfoFileResults[0].Info + require.True(t, testWriterStart.Equal(xtime.FromNanoseconds(infoFile.BlockStart))) + require.Equal(t, testBlockSize, time.Duration(infoFile.BlockSize)) + require.Equal(t, int64(len(entries)), infoFile.Entries) +} + +func writeTestLargeTilesData( + t *testing.T, + w LargeTilesWriter, + blockStart time.Time, + entries []testLargeTileEntry, +) error { + return writeTestLargeTilesDataWithVolume(t, w, blockStart, entries) +} + +func writeTestLargeTilesDataWithVolume( + t *testing.T, + w LargeTilesWriter, + blockStart time.Time, + entries []testLargeTileEntry, +) error { + if err := w.Open(); err != nil { + return err + } + ctx := context.NewContext() + + encoder := m3tsz.NewEncoder(blockStart, nil, true, encoding.NewOptions()) + defer encoder.Close() + ctrl := gomock.NewController(t) + schema := namespace.NewMockSchemaDescr(ctrl) + encoder.SetSchema(schema) + var dp ts.Datapoint + + for i := range entries { + encoder.Reset(blockStart, 0, schema) + dp.Timestamp = blockStart + + for _, v := range entries[i].values { + dp.Value = v + if err := encoder.Encode(dp, xtime.Second, nil); err != nil { + return err + } + + dp.Timestamp = dp.Timestamp.Add(10 * time.Minute) + } + + stream, ok := encoder.Stream(ctx) + require.True(t, ok) + segment, err := stream.Segment() + if err != nil { + return err + } + entries[i].data = append(segment.Head.Bytes(), segment.Tail.Bytes()...) + stream.Finalize() + + tagIter := ident.NewTagsIterator(entries[i].Tags()) + if err := w.Write(ctx, encoder, ident.StringID(entries[i].id), tagIter); err != nil { + return err + } + } + return nil +} From 7706fd84ab5346668ffe8773bfb161501dc253e9 Mon Sep 17 00:00:00 2001 From: Gediminas Date: Wed, 9 Sep 2020 12:22:59 +0300 Subject: [PATCH 2/7] minor refactorings --- src/dbnode/persist/fs/write_large_tiles_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/dbnode/persist/fs/write_large_tiles_test.go b/src/dbnode/persist/fs/write_large_tiles_test.go index 6c1fc43144..fe1c37e672 100644 --- a/src/dbnode/persist/fs/write_large_tiles_test.go +++ b/src/dbnode/persist/fs/write_large_tiles_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/namespace" @@ -35,6 +34,8 @@ import ( "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" + + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -156,9 +157,7 @@ func TestLargeTilesInfoReadWrite(t *testing.T) { readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil, persist.FileSetFlushType) require.Equal(t, 1, len(readInfoFileResults)) - for _, result := range readInfoFileResults { - require.NoError(t, result.Err.Error()) - } + require.NoError(t, readInfoFileResults[0].Err.Error()) infoFile := readInfoFileResults[0].Info require.True(t, testWriterStart.Equal(xtime.FromNanoseconds(infoFile.BlockStart))) From d6a22b7b646dadff3fa5e1662db072e6bb0d0e71 Mon Sep 17 00:00:00 2001 From: Gediminas Date: Fri, 11 Sep 2020 14:11:03 +0300 Subject: [PATCH 3/7] minor refactoring --- src/dbnode/persist/fs/write.go | 15 +++++++-------- src/dbnode/persist/fs/write_large_tiles.go | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 414bf36b37..8c6fb1e556 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -479,10 +479,11 @@ func (w *writer) writeIndexFileContents( w.indexEntries[i].indexFileOffset = offset } - offset, err = w.writeIndex(id, tagsIter, tagsEncoder, entry, offset) + length, err := w.writeIndex(id, tagsIter, tagsEncoder, entry) if err != nil { return err } + offset += length prevID = id } @@ -495,18 +496,17 @@ func (w *writer) writeIndex( tagsIter ident.TagIterator, tagsEncoder serialize.TagEncoder, entry indexEntry, - offset int64, ) (int64, error) { var encodedTags []byte if numTags := tagsIter.Remaining(); numTags > 0 { tagsEncoder.Reset() if err := tagsEncoder.Encode(tagsIter); err != nil { - return offset, err + return 0, err } encodedTagsData, ok := tagsEncoder.Data() if !ok { - return offset, errWriterEncodeTagsDataNotAccessible + return 0, errWriterEncodeTagsDataNotAccessible } encodedTags = encodedTagsData.Bytes() @@ -523,16 +523,15 @@ func (w *writer) writeIndex( w.encoder.Reset() if err := w.encoder.EncodeIndexEntry(e); err != nil { - return offset, err + return 0, err } data := w.encoder.Bytes() if _, err := w.indexFdWithDigest.Write(data); err != nil { - return offset, err + return 0, err } - offset += int64(len(data)) - return offset, nil + return int64(len(data)), nil } func (w *writer) writeSummariesFileContents( diff --git a/src/dbnode/persist/fs/write_large_tiles.go b/src/dbnode/persist/fs/write_large_tiles.go index 04c132c7ac..7637c3cc48 100644 --- a/src/dbnode/persist/fs/write_large_tiles.go +++ b/src/dbnode/persist/fs/write_large_tiles.go @@ -204,11 +204,11 @@ func (w *largeTilesWriter) writeIndexRelated( entry.indexFileOffset = w.indexOffset } - indexOffset, err := w.writer.writeIndex(id.Bytes(), tags, w.tagsEncoder, *entry, w.indexOffset) + length, err := w.writer.writeIndex(id.Bytes(), tags, w.tagsEncoder, *entry) if err != nil { return err } - w.indexOffset = indexOffset + w.indexOffset += length if entry.index%w.summaryEvery == 0 { entry.metadata = persist.NewMetadataFromIDAndTagIterator(id, nil, persist.MetadataOptions{}) From 09136327dc85b2704919c7f9248e11094eab5085 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 14 Sep 2020 10:01:03 +0300 Subject: [PATCH 4/7] Skip tagsEncoder, use encodedTags directly --- src/dbnode/persist/fs/write.go | 8 ++++++++ src/dbnode/persist/fs/write_large_tiles.go | 14 +++++--------- src/dbnode/persist/fs/write_large_tiles_test.go | 14 ++++++++++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 8c6fb1e556..0eb0b37858 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -512,6 +512,14 @@ func (w *writer) writeIndex( encodedTags = encodedTagsData.Bytes() } + return w.writeIndexWithEncodedTags(id, encodedTags, entry) +} + +func (w *writer) writeIndexWithEncodedTags( + id []byte, + encodedTags []byte, + entry indexEntry, +) (int64, error) { e := schema.IndexEntry{ Index: entry.index, ID: id, diff --git a/src/dbnode/persist/fs/write_large_tiles.go b/src/dbnode/persist/fs/write_large_tiles.go index 7637c3cc48..7b51cae995 100644 --- a/src/dbnode/persist/fs/write_large_tiles.go +++ b/src/dbnode/persist/fs/write_large_tiles.go @@ -32,12 +32,11 @@ import ( "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" - "github.com/m3db/m3/src/x/serialize" ) type LargeTilesWriter interface { Open() error - Write(ctx context.Context, encoder encoding.Encoder, id ident.ID, tags ident.TagIterator) error + Write(ctx context.Context, encoder encoding.Encoder, id ident.ID, encodedTags []byte) error Close() error } @@ -61,7 +60,6 @@ type largeTilesWriter struct { summaryEvery int64 bloomFilter *bloom.BloomFilter indexOffset int64 - tagsEncoder serialize.TagEncoder summaries int } @@ -108,7 +106,6 @@ func (w *largeTilesWriter) Open() error { if err := w.writer.Open(w.writerOpts); err != nil { return err } - w.tagsEncoder = w.writer.tagEncoderPool.Get() w.indexOffset = 0 w.summaries = 0 w.prevIDBytes = nil @@ -119,7 +116,7 @@ func (w *largeTilesWriter) Write( ctx context.Context, encoder encoding.Encoder, id ident.ID, - tags ident.TagIterator, + encodedTags []byte, ) error { // Need to check if w.prevIDBytes != nil, otherwise we can never write an empty string ID if w.prevIDBytes != nil && bytes.Compare(id.Bytes(), w.prevIDBytes) <= 0 { @@ -146,7 +143,7 @@ func (w *largeTilesWriter) Write( } if entry != nil { - return w.writeIndexRelated(id, tags, entry) + return w.writeIndexRelated(id, encodedTags, entry) } return nil @@ -190,7 +187,7 @@ func (w *largeTilesWriter) writeData( func (w *largeTilesWriter) writeIndexRelated( id ident.ID, - tags ident.TagIterator, + encodedTags []byte, entry *indexEntry, ) error { // Add to the bloom filter, note this must be zero alloc or else this will @@ -204,7 +201,7 @@ func (w *largeTilesWriter) writeIndexRelated( entry.indexFileOffset = w.indexOffset } - length, err := w.writer.writeIndex(id.Bytes(), tags, w.tagsEncoder, *entry) + length, err := w.writer.writeIndexWithEncodedTags(id.Bytes(), encodedTags, *entry) if err != nil { return err } @@ -223,7 +220,6 @@ func (w *largeTilesWriter) writeIndexRelated( } func (w *largeTilesWriter) Close() error { - w.tagsEncoder.Finalize() for i := range w.data { w.data[i] = nil } diff --git a/src/dbnode/persist/fs/write_large_tiles_test.go b/src/dbnode/persist/fs/write_large_tiles_test.go index fe1c37e672..31780cbc1a 100644 --- a/src/dbnode/persist/fs/write_large_tiles_test.go +++ b/src/dbnode/persist/fs/write_large_tiles_test.go @@ -21,6 +21,8 @@ package fs import ( + "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" "os" "path/filepath" "testing" @@ -192,6 +194,9 @@ func writeTestLargeTilesDataWithVolume( encoder.SetSchema(schema) var dp ts.Datapoint + tagEncodingPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions()) + tagEncodingPool.Init() + for i := range entries { encoder.Reset(blockStart, 0, schema) dp.Timestamp = blockStart @@ -214,8 +219,13 @@ func writeTestLargeTilesDataWithVolume( entries[i].data = append(segment.Head.Bytes(), segment.Tail.Bytes()...) stream.Finalize() - tagIter := ident.NewTagsIterator(entries[i].Tags()) - if err := w.Write(ctx, encoder, ident.StringID(entries[i].id), tagIter); err != nil { + tagsIter := ident.NewTagsIterator(entries[i].Tags()) + tagEncoder := tagEncodingPool.Get() + err = tagEncoder.Encode(tagsIter) + require.NoError(t, err) + encodedTags, _ := tagEncoder.Data() + + if err := w.Write(ctx, encoder, ident.StringID(entries[i].id), encodedTags.Bytes()); err != nil { return err } } From 564e2d6dd06bf62605fada6563ef1d27a2789568 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Mon, 14 Sep 2020 10:08:02 +0300 Subject: [PATCH 5/7] Rename LargeTilesWriter to StreamingWriter --- ...rite_large_tiles.go => streaming_write.go} | 25 +++++---- ..._tiles_test.go => streaming_write_test.go} | 52 +++++++++---------- 2 files changed, 40 insertions(+), 37 deletions(-) rename src/dbnode/persist/fs/{write_large_tiles.go => streaming_write.go} (90%) rename src/dbnode/persist/fs/{write_large_tiles_test.go => streaming_write_test.go} (83%) diff --git a/src/dbnode/persist/fs/write_large_tiles.go b/src/dbnode/persist/fs/streaming_write.go similarity index 90% rename from src/dbnode/persist/fs/write_large_tiles.go rename to src/dbnode/persist/fs/streaming_write.go index 7b51cae995..7cb3d0f985 100644 --- a/src/dbnode/persist/fs/write_large_tiles.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -34,13 +34,16 @@ import ( "github.com/m3db/m3/src/x/ident" ) -type LargeTilesWriter interface { +// StreamingWriter writes int data fileset without intermediate buffering. +// Writes must be lexicographically ordered by the id. +type StreamingWriter interface { Open() error Write(ctx context.Context, encoder encoding.Encoder, id ident.ID, encodedTags []byte) error Close() error } -type LargeTilesWriterOptions struct { +// StreamingWriterOptions in the options for the StreamingWriter. +type StreamingWriterOptions struct { Options Options NamespaceID ident.ID ShardID uint32 @@ -50,8 +53,8 @@ type LargeTilesWriterOptions struct { PlannedRecordsCount uint } -type largeTilesWriter struct { - opts LargeTilesWriterOptions +type streamingWriter struct { + opts StreamingWriterOptions writer *writer writerOpts DataWriterOpenOptions data []checked.Bytes @@ -63,7 +66,7 @@ type largeTilesWriter struct { summaries int } -func NewLargeTilesWriter(opts LargeTilesWriterOptions) (LargeTilesWriter, error) { +func NewStreamingWriter(opts StreamingWriterOptions) (StreamingWriter, error) { w, err := NewWriter(opts.Options) if err != nil { return nil, err @@ -92,7 +95,7 @@ func NewLargeTilesWriter(opts LargeTilesWriterOptions) (LargeTilesWriter, error) summaryEvery = int(math.Floor(float64(opts.PlannedRecordsCount) / summariesApprox)) } - return &largeTilesWriter{ + return &streamingWriter{ opts: opts, writer: w.(*writer), writerOpts: writerOpts, @@ -102,7 +105,7 @@ func NewLargeTilesWriter(opts LargeTilesWriterOptions) (LargeTilesWriter, error) }, nil } -func (w *largeTilesWriter) Open() error { +func (w *streamingWriter) Open() error { if err := w.writer.Open(w.writerOpts); err != nil { return err } @@ -112,7 +115,7 @@ func (w *largeTilesWriter) Open() error { return nil } -func (w *largeTilesWriter) Write( +func (w *streamingWriter) Write( ctx context.Context, encoder encoding.Encoder, id ident.ID, @@ -149,7 +152,7 @@ func (w *largeTilesWriter) Write( return nil } -func (w *largeTilesWriter) writeData( +func (w *streamingWriter) writeData( data []checked.Bytes, dataChecksum uint32, ) (*indexEntry, error) { @@ -185,7 +188,7 @@ func (w *largeTilesWriter) writeData( return entry, nil } -func (w *largeTilesWriter) writeIndexRelated( +func (w *streamingWriter) writeIndexRelated( id ident.ID, encodedTags []byte, entry *indexEntry, @@ -219,7 +222,7 @@ func (w *largeTilesWriter) writeIndexRelated( return nil } -func (w *largeTilesWriter) Close() error { +func (w *streamingWriter) Close() error { for i := range w.data { w.data[i] = nil } diff --git a/src/dbnode/persist/fs/write_large_tiles_test.go b/src/dbnode/persist/fs/streaming_write_test.go similarity index 83% rename from src/dbnode/persist/fs/write_large_tiles_test.go rename to src/dbnode/persist/fs/streaming_write_test.go index 31780cbc1a..f2800df6f9 100644 --- a/src/dbnode/persist/fs/write_large_tiles_test.go +++ b/src/dbnode/persist/fs/streaming_write_test.go @@ -41,21 +41,21 @@ import ( "github.com/stretchr/testify/require" ) -type testLargeTileEntry struct { +type testStreamingEntry struct { testEntry values []float64 } -func newTestLargeTilesWriter( +func newTestStreamingWriter( t *testing.T, filePathPrefix string, shard uint32, timestamp time.Time, nextVersion int, plannedEntries uint, -) LargeTilesWriter { - writer, err := NewLargeTilesWriter( - LargeTilesWriterOptions{ +) StreamingWriter { + writer, err := NewStreamingWriter( + StreamingWriterOptions{ NamespaceID: testNs1ID, ShardID: shard, BlockStart: timestamp, @@ -77,14 +77,14 @@ func TestIdsMustBeSorted(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - entries := []testLargeTileEntry{ + entries := []testStreamingEntry{ {testEntry{"baz", nil, nil}, []float64{65536}}, {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, } - w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) defer w.Close() - err := writeTestLargeTilesData(t, w, testWriterStart, entries) + err := streamingWriteTestData(t, w, testWriterStart, entries) require.Error(t, err) require.Equal(t, "ids must be written in lexicographic order, no duplicates, but got baz followed by bar", err.Error()) @@ -95,25 +95,25 @@ func TestDoubleWritesAreNotAllowed(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - entries := []testLargeTileEntry{ + entries := []testStreamingEntry{ {testEntry{"baz", nil, nil}, []float64{65536}}, {testEntry{"baz", nil, nil}, []float64{4.8, 5.2, 6}}, } - w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) defer w.Close() - err := writeTestLargeTilesData(t, w, testWriterStart, entries) + err := streamingWriteTestData(t, w, testWriterStart, entries) require.Error(t, err) require.Equal(t, "ids must be written in lexicographic order, no duplicates, but got baz followed by baz", err.Error()) } -func TestSimpleLargeTilesReadWrite(t *testing.T) { +func TestSimpleReadStreamingWrite(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - entries := []testLargeTileEntry{ + entries := []testStreamingEntry{ {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, {testEntry{"baz", nil, nil}, []float64{65536}}, {testEntry{"cat", nil, nil}, []float64{100000}}, @@ -124,8 +124,8 @@ func TestSimpleLargeTilesReadWrite(t *testing.T) { }, nil}, []float64{7, 8, 9}}, } - w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) - err := writeTestLargeTilesData(t, w, testWriterStart, entries) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 5) + err := streamingWriteTestData(t, w, testWriterStart, entries) require.NoError(t, err) err = w.Close() require.NoError(t, err) @@ -139,20 +139,20 @@ func TestSimpleLargeTilesReadWrite(t *testing.T) { readTestData(t, r, 0, testWriterStart, expectEntries) } -func TestLargeTilesInfoReadWrite(t *testing.T) { +func TestInfoReadStreamingWrite(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - entries := []testLargeTileEntry{ + entries := []testStreamingEntry{ {testEntry{"bar", nil, nil}, []float64{4.8, 5.2, 6}}, {testEntry{"baz", nil, nil}, []float64{65536}}, {testEntry{"cat", nil, nil}, []float64{100000}}, {testEntry{"foo", nil, nil}, []float64{1, 2, 3}}, } - w := newTestLargeTilesWriter(t, filePathPrefix, 0, testWriterStart, 0, 12) - err := writeTestLargeTilesData(t, w, testWriterStart, entries) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 12) + err := streamingWriteTestData(t, w, testWriterStart, entries) require.NoError(t, err) err = w.Close() require.NoError(t, err) @@ -167,20 +167,20 @@ func TestLargeTilesInfoReadWrite(t *testing.T) { require.Equal(t, int64(len(entries)), infoFile.Entries) } -func writeTestLargeTilesData( +func streamingWriteTestData( t *testing.T, - w LargeTilesWriter, + w StreamingWriter, blockStart time.Time, - entries []testLargeTileEntry, + entries []testStreamingEntry, ) error { - return writeTestLargeTilesDataWithVolume(t, w, blockStart, entries) + return streamingWriteWithVolume(t, w, blockStart, entries) } -func writeTestLargeTilesDataWithVolume( +func streamingWriteWithVolume( t *testing.T, - w LargeTilesWriter, + w StreamingWriter, blockStart time.Time, - entries []testLargeTileEntry, + entries []testStreamingEntry, ) error { if err := w.Open(); err != nil { return err From 030a7c702e55c6aa7afede2445728232ff4f71d0 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 15 Sep 2020 21:11:12 +0300 Subject: [PATCH 6/7] Add FIXME wrt stegment.Tail.Finalize --- src/dbnode/persist/fs/streaming_write.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/persist/fs/streaming_write.go b/src/dbnode/persist/fs/streaming_write.go index 7cb3d0f985..018912060c 100644 --- a/src/dbnode/persist/fs/streaming_write.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -138,7 +138,7 @@ func (w *streamingWriter) Write( return err } w.data[0] = segment.Head - w.data[1] = segment.Tail + w.data[1] = segment.Tail // FIXME we fail to finalize the tail (does not get finalized automatically because FinalizeTail flag not set) checksum := segment.CalculateChecksum() entry, err := w.writeData(w.data, checksum) if err != nil { From 8ba1de2a87cff634586b06fcda441132c5ec391a Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Thu, 17 Sep 2020 22:18:14 +0300 Subject: [PATCH 7/7] Address PR feedback --- src/dbnode/persist/fs/read_write_test.go | 9 +- src/dbnode/persist/fs/streaming_write.go | 87 +++++++------------ src/dbnode/persist/fs/streaming_write_test.go | 24 ++++- src/dbnode/persist/fs/write.go | 38 +++++--- 4 files changed, 84 insertions(+), 74 deletions(-) diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index d7b3ccaac1..4978ffe7eb 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -29,13 +29,13 @@ import ( "testing" "time" - "github.com/m3db/bloom/v4" "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" + "github.com/m3db/bloom/v4" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -219,8 +219,13 @@ func readTestDataWithStreamingOpt( assert.NoError(t, err) // Make sure the bloom filter doesn't always return true assert.False(t, bloomFilter.Test([]byte("some_random_data"))) + + expectedEntries := uint(len(entries)) + if expectedEntries == 0 { + expectedEntries = 1 + } expectedM, expectedK := bloom.EstimateFalsePositiveRate( - uint(len(entries)), defaultIndexBloomFilterFalsePositivePercent) + expectedEntries, defaultIndexBloomFilterFalsePositivePercent) assert.Equal(t, expectedK, bloomFilter.K()) // EstimateFalsePositiveRate always returns at least 1, so skip this check // if len entries is 0 diff --git a/src/dbnode/persist/fs/streaming_write.go b/src/dbnode/persist/fs/streaming_write.go index 018912060c..0f43fa4b6f 100644 --- a/src/dbnode/persist/fs/streaming_write.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -26,19 +26,18 @@ import ( "math" "time" - "github.com/m3db/bloom/v4" - "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/persist" - "github.com/m3db/m3/src/x/checked" - "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/ident" + + "github.com/m3db/bloom/v4" ) -// StreamingWriter writes int data fileset without intermediate buffering. +// StreamingWriter writes into data fileset without intermediate buffering. // Writes must be lexicographically ordered by the id. type StreamingWriter interface { Open() error - Write(ctx context.Context, encoder encoding.Encoder, id ident.ID, encodedTags []byte) error + WriteAll(id ident.BytesID, encodedTags ts.EncodedTags, data [][]byte, dataChecksum uint32) error Close() error } @@ -57,7 +56,6 @@ type streamingWriter struct { opts StreamingWriterOptions writer *writer writerOpts DataWriterOpenOptions - data []checked.Bytes currIdx int64 prevIDBytes []byte summaryEvery int64 @@ -83,8 +81,12 @@ func NewStreamingWriter(opts StreamingWriterOptions) (StreamingWriter, error) { FileSetType: persist.FileSetFlushType, } + plannedRecordsCount := opts.PlannedRecordsCount + if plannedRecordsCount == 0 { + plannedRecordsCount = 1 + } m, k := bloom.EstimateFalsePositiveRate( - opts.PlannedRecordsCount, + plannedRecordsCount, opts.Options.IndexBloomFilterFalsePositivePercent(), ) bloomFilter := bloom.NewBloomFilter(m, k) @@ -100,7 +102,6 @@ func NewStreamingWriter(opts StreamingWriterOptions) (StreamingWriter, error) { writer: w.(*writer), writerOpts: writerOpts, summaryEvery: int64(summaryEvery), - data: make([]checked.Bytes, 2), bloomFilter: bloomFilter, }, nil } @@ -115,37 +116,24 @@ func (w *streamingWriter) Open() error { return nil } -func (w *streamingWriter) Write( - ctx context.Context, - encoder encoding.Encoder, - id ident.ID, - encodedTags []byte, +func (w *streamingWriter) WriteAll( + id ident.BytesID, + encodedTags ts.EncodedTags, + data [][]byte, + dataChecksum uint32, ) error { // Need to check if w.prevIDBytes != nil, otherwise we can never write an empty string ID - if w.prevIDBytes != nil && bytes.Compare(id.Bytes(), w.prevIDBytes) <= 0 { + if w.prevIDBytes != nil && bytes.Compare(id, w.prevIDBytes) <= 0 { return fmt.Errorf("ids must be written in lexicographic order, no duplicates, but got %s followed by %s", w.prevIDBytes, id) } - w.prevIDBytes = append(w.prevIDBytes[:0], id.Bytes()...) + w.prevIDBytes = append(w.prevIDBytes[:0], id...) - stream, ok := encoder.Stream(ctx) - if !ok { - // None of the datapoints passed the predicate. - return nil - } - defer stream.Finalize() - segment, err := stream.Segment() - if err != nil { - return err - } - w.data[0] = segment.Head - w.data[1] = segment.Tail // FIXME we fail to finalize the tail (does not get finalized automatically because FinalizeTail flag not set) - checksum := segment.CalculateChecksum() - entry, err := w.writeData(w.data, checksum) + entry, ok, err := w.writeData(data, dataChecksum) if err != nil { return err } - if entry != nil { + if ok { return w.writeIndexRelated(id, encodedTags, entry) } @@ -153,50 +141,43 @@ func (w *streamingWriter) Write( } func (w *streamingWriter) writeData( - data []checked.Bytes, + data [][]byte, dataChecksum uint32, -) (*indexEntry, error) { +) (indexEntry, bool, error) { var size int64 for _, d := range data { - if d == nil { - continue - } - size += int64(d.Len()) + size += int64(len(d)) } if size == 0 { - return nil, nil + return indexEntry{}, false, nil } - // Warning: metadata is not set here and should not be used anywhere below. - entry := &indexEntry{ + entry := indexEntry{ index: w.currIdx, dataFileOffset: w.writer.currOffset, size: uint32(size), dataChecksum: dataChecksum, } for _, d := range data { - if d == nil { - continue - } - if err := w.writer.writeData(d.Bytes()); err != nil { - return nil, err + if err := w.writer.writeData(d); err != nil { + return indexEntry{}, false, err } } w.currIdx++ - return entry, nil + return entry, true, nil } func (w *streamingWriter) writeIndexRelated( - id ident.ID, + id ident.BytesID, encodedTags []byte, - entry *indexEntry, + entry indexEntry, ) error { // Add to the bloom filter, note this must be zero alloc or else this will // cause heavy GC churn as we flush millions of series at end of each // time window - w.bloomFilter.Add(id.Bytes()) + w.bloomFilter.Add(id) if entry.index%w.summaryEvery == 0 { // Capture the offset for when we write this summary back, only capture @@ -204,15 +185,14 @@ func (w *streamingWriter) writeIndexRelated( entry.indexFileOffset = w.indexOffset } - length, err := w.writer.writeIndexWithEncodedTags(id.Bytes(), encodedTags, *entry) + length, err := w.writer.writeIndexWithEncodedTags(id, encodedTags, entry) if err != nil { return err } w.indexOffset += length if entry.index%w.summaryEvery == 0 { - entry.metadata = persist.NewMetadataFromIDAndTagIterator(id, nil, persist.MetadataOptions{}) - err = w.writer.writeSummariesEntry(*entry) + err = w.writer.writeSummariesEntry(id, entry) if err != nil { return err } @@ -223,9 +203,6 @@ func (w *streamingWriter) writeIndexRelated( } func (w *streamingWriter) Close() error { - for i := range w.data { - w.data[i] = nil - } w.prevIDBytes = nil // Write the bloom filter bitset out diff --git a/src/dbnode/persist/fs/streaming_write_test.go b/src/dbnode/persist/fs/streaming_write_test.go index f2800df6f9..ee7663c9fd 100644 --- a/src/dbnode/persist/fs/streaming_write_test.go +++ b/src/dbnode/persist/fs/streaming_write_test.go @@ -21,8 +21,6 @@ package fs import ( - "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/serialize" "os" "path/filepath" "testing" @@ -35,6 +33,8 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -167,6 +167,21 @@ func TestInfoReadStreamingWrite(t *testing.T) { require.Equal(t, int64(len(entries)), infoFile.Entries) } +func TestReadStreamingWriteEmptyFileset(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) + + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 0) + err := streamingWriteTestData(t, w, testWriterStart, nil) + require.NoError(t, err) + err = w.Close() + require.NoError(t, err) + + r := newTestReader(t, filePathPrefix) + readTestData(t, r, 0, testWriterStart, nil) +} + func streamingWriteTestData( t *testing.T, w StreamingWriter, @@ -217,6 +232,7 @@ func streamingWriteWithVolume( return err } entries[i].data = append(segment.Head.Bytes(), segment.Tail.Bytes()...) + dataChecksum := segment.CalculateChecksum() stream.Finalize() tagsIter := ident.NewTagsIterator(entries[i].Tags()) @@ -225,7 +241,9 @@ func streamingWriteWithVolume( require.NoError(t, err) encodedTags, _ := tagEncoder.Data() - if err := w.Write(ctx, encoder, ident.StringID(entries[i].id), encodedTags.Bytes()); err != nil { + data := [][]byte{entries[i].data} + + if err := w.WriteAll(ident.BytesID(entries[i].id), encodedTags.Bytes(), data, dataChecksum); err != nil { return err } } diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 0eb0b37858..9155e2cbdf 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -29,7 +29,6 @@ import ( "sort" "time" - "github.com/m3db/bloom/v4" "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" @@ -39,6 +38,7 @@ import ( "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" + "github.com/m3db/bloom/v4" "github.com/pborman/uuid" ) @@ -88,14 +88,18 @@ type writer struct { type indexEntry struct { index int64 - metadata persist.Metadata dataFileOffset int64 indexFileOffset int64 size uint32 dataChecksum uint32 } -type indexEntries []indexEntry +type indexEntryWithMetadata struct { + entry indexEntry + metadata persist.Metadata +} + +type indexEntries []indexEntryWithMetadata func (e indexEntries) releaseRefs() { // Close any metadata. @@ -103,7 +107,7 @@ func (e indexEntries) releaseRefs() { elem.metadata.Finalize() } // Apply memset zero loop optimization. - var zeroed indexEntry + var zeroed indexEntryWithMetadata for i := range e { e[i] = zeroed } @@ -297,12 +301,14 @@ func (w *writer) writeAll( return nil } - entry := indexEntry{ - index: w.currIdx, - metadata: metadata, - dataFileOffset: w.currOffset, - size: uint32(size), - dataChecksum: dataChecksum, + entry := indexEntryWithMetadata{ + entry: indexEntry{ + index: w.currIdx, + dataFileOffset: w.currOffset, + size: uint32(size), + dataChecksum: dataChecksum, + }, + metadata: metadata, } for _, d := range data { if d == nil { @@ -406,6 +412,9 @@ func (w *writer) writeIndexRelatedFiles() error { // Write the index entries and calculate the bloom filter n, p := uint(w.currIdx), w.bloomFilterFalsePositivePercent + if n == 0 { + n = 1 + } m, k := bloom.EstimateFalsePositiveRate(n, p) bloomFilter := bloom.NewBloomFilter(m, k) @@ -476,10 +485,10 @@ func (w *writer) writeIndexFileContents( if i%summaryEvery == 0 { // Capture the offset for when we write this summary back, only capture // for every summary we'll actually write to avoid a few memcopies - w.indexEntries[i].indexFileOffset = offset + w.indexEntries[i].entry.indexFileOffset = offset } - length, err := w.writeIndex(id, tagsIter, tagsEncoder, entry) + length, err := w.writeIndex(id, tagsIter, tagsEncoder, entry.entry) if err != nil { return err } @@ -550,7 +559,7 @@ func (w *writer) writeSummariesFileContents( if i%summaryEvery != 0 { continue } - err := w.writeSummariesEntry(w.indexEntries[i]) + err := w.writeSummariesEntry(w.indexEntries[i].metadata.BytesID(), w.indexEntries[i].entry) if err != nil { return 0, err } @@ -561,11 +570,12 @@ func (w *writer) writeSummariesFileContents( } func (w *writer) writeSummariesEntry( + id ident.BytesID, entry indexEntry, ) error { summary := schema.IndexSummary{ Index: entry.index, - ID: entry.metadata.BytesID(), + ID: id, IndexEntryOffset: entry.indexFileOffset, }