diff --git a/src/dbnode/persist/fs/streaming_write.go b/src/dbnode/persist/fs/streaming_write.go index 010021a464..4120ff4711 100644 --- a/src/dbnode/persist/fs/streaming_write.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -52,11 +52,14 @@ type StreamingWriter interface { // StreamingWriterOpenOptions in the options for the StreamingWriter. type StreamingWriterOpenOptions struct { - NamespaceID ident.ID - ShardID uint32 - BlockStart time.Time - BlockSize time.Duration - VolumeIndex int + NamespaceID ident.ID + ShardID uint32 + BlockStart time.Time + BlockSize time.Duration + VolumeIndex int + + // PlannedRecordsCount is an estimate of the number of series to be written. + // Must be greater than 0. PlannedRecordsCount uint } @@ -83,6 +86,11 @@ func NewStreamingWriter(opts Options) (StreamingWriter, error) { } func (w *streamingWriter) Open(opts StreamingWriterOpenOptions) error { + if opts.PlannedRecordsCount <= 0 { + return fmt.Errorf( + "PlannedRecordsCount must be positive, got %d", opts.PlannedRecordsCount) + } + writerOpts := DataWriterOpenOptions{ BlockSize: opts.BlockSize, Identifier: FileSetFileIdentifier{ @@ -105,9 +113,10 @@ func (w *streamingWriter) Open(opts StreamingWriterOpenOptions) error { w.bloomFilter = bloom.NewBloomFilter(m, k) summariesApprox := float64(opts.PlannedRecordsCount) * w.options.IndexSummariesPercent() - w.summaryEvery = 0 + w.summaryEvery = 1 if summariesApprox > 0 { - w.summaryEvery = int64(math.Floor(float64(opts.PlannedRecordsCount) / summariesApprox)) + w.summaryEvery = int64(math.Max(1, + math.Floor(float64(opts.PlannedRecordsCount)/summariesApprox))) } if err := w.writer.Open(writerOpts); err != nil { @@ -182,7 +191,8 @@ func (w *streamingWriter) writeIndexRelated( // time window w.bloomFilter.Add(id) - if entry.index%w.summaryEvery == 0 { + writeSummary := w.summaryEvery == 0 || entry.index%w.summaryEvery == 0 + if writeSummary { // Capture the offset for when we write this summary back, only capture // for every summary we'll actually write to avoid a few memcopies entry.indexFileOffset = w.indexOffset @@ -194,7 +204,7 @@ func (w *streamingWriter) writeIndexRelated( } w.indexOffset += length - if entry.index%w.summaryEvery == 0 { + if writeSummary { err = w.writer.writeSummariesEntry(id, entry) if err != nil { return err diff --git a/src/dbnode/persist/fs/streaming_write_test.go b/src/dbnode/persist/fs/streaming_write_test.go index bb43793853..6482f2a2d2 100644 --- a/src/dbnode/persist/fs/streaming_write_test.go +++ b/src/dbnode/persist/fs/streaming_write_test.go @@ -60,14 +60,14 @@ func newTestStreamingWriter( require.NoError(t, err) writerOpenOpts := StreamingWriterOpenOptions{ - NamespaceID: testNs1ID, - ShardID: shard, - BlockStart: timestamp, - BlockSize: testBlockSize, + NamespaceID: testNs1ID, + ShardID: shard, + BlockStart: timestamp, + BlockSize: testBlockSize, - VolumeIndex: nextVersion, - PlannedRecordsCount: plannedEntries, - } + VolumeIndex: nextVersion, + PlannedRecordsCount: plannedEntries, + } err = writer.Open(writerOpenOpts) require.NoError(t, err) @@ -174,7 +174,7 @@ func TestReadStreamingWriteEmptyFileset(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 0) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 1) err := streamingWriteTestData(t, w, testWriterStart, nil) require.NoError(t, err) err = w.Close() @@ -184,12 +184,31 @@ func TestReadStreamingWriteEmptyFileset(t *testing.T) { readTestData(t, r, 0, testWriterStart, nil) } +func TestReadStreamingWriteReject0PlannedRecordsCount(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) // nolint: errcheck + + writer, err := NewStreamingWriter(testDefaultOpts. + SetFilePathPrefix(filePathPrefix). + SetWriterBufferSize(testWriterBufferSize)) + require.NoError(t, err) + + writerOpenOpts := StreamingWriterOpenOptions{ + NamespaceID: testNs1ID, + BlockSize: testBlockSize, + PlannedRecordsCount: 0, + } + err = writer.Open(writerOpenOpts) + require.EqualError(t, err, "PlannedRecordsCount must be positive, got 0") +} + func TestStreamingWriterAbort(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 0) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 1) err := streamingWriteTestData(t, w, testWriterStart, nil) require.NoError(t, err) err = w.Abort() diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 56d61d776d..f8e39f7084 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2688,8 +2688,8 @@ func (s *dbShard) AggregateTiles( }() var ( - sourceNsID = sourceNs.ID() - maxEntries = 0 + sourceNsID = sourceNs.ID() + plannedSeriesCount = 1 ) for sourceBlockPos, blockReader := range blockReaders { @@ -2718,8 +2718,8 @@ func (s *dbShard) AggregateTiles( } entries := blockReader.Entries() - if entries > maxEntries { - maxEntries = entries + if entries > plannedSeriesCount { + plannedSeriesCount = entries } openBlockReaders = append(openBlockReaders, blockReader) @@ -2737,7 +2737,7 @@ func (s *dbShard) AggregateTiles( BlockStart: opts.Start, BlockSize: s.namespace.Options().RetentionOptions().BlockSize(), VolumeIndex: nextVolume, - PlannedRecordsCount: uint(maxEntries), + PlannedRecordsCount: uint(plannedSeriesCount), } if err = writer.Open(writerOpenOpts); err != nil { return 0, err