Skip to content

Commit

Permalink
Tags NoFinalize; additional comments
Browse files Browse the repository at this point in the history
  • Loading branch information
justinjc committed Jun 7, 2019
1 parent d318ad9 commit a115331
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
12 changes: 11 additions & 1 deletion src/dbnode/storage/fs_merge_with_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (m *fsMergeWithMem) fetchBlocks(
return nil, false, nil
}

// The data passed to ForEachRemaining (through the fs.ForEachRemainingFn) is
// basically a copy that will be finalized when the context is closed, but the
// ID and tags are expected to live for as long as the caller of the MergeWith
// requires them, so they should either be NoFinalize() or passed as copies.
func (m *fsMergeWithMem) ForEachRemaining(
ctx context.Context,
blockStart xtime.UnixNano,
Expand All @@ -109,10 +113,16 @@ func (m *fsMergeWithMem) ForEachRemaining(

for seriesElement := seriesList.Front(); seriesElement != nil; seriesElement = seriesElement.Next() {
seriesID := seriesElement.Value
tags, err := m.shard.TagsFromSeriesID(seriesID)
tags, ok, err := m.shard.TagsFromSeriesID(seriesID)
if err != nil {
return err
}
if !ok {
// Receiving not ok means that the series was not found, for some
// reason like it falling out of retention, therefore we skip this
// series and continue.
continue
}

mergeWithData, hasData, err := m.fetchBlocks(ctx, seriesID, blockStart, nsCtx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (b *dbBuffer) Snapshot(
mergedStream = streams[0]
} else {
// We may need to merge again here because the regular merge method does
// not merge buckets that have different versions.
// not merge warm and cold buckets or buckets that have different versions.
sr := make([]xio.SegmentReader, 0, numStreams)
for _, stream := range streams {
sr = append(sr, stream)
Expand Down
15 changes: 11 additions & 4 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ func (s *dbShard) Stream(

// IsBlockRetrievable implements series.QueryableBlockRetriever
func (s *dbShard) IsBlockRetrievable(blockStart time.Time) bool {
return s.hasWarmFlushed(blockStart)
}

func (s *dbShard) hasWarmFlushed(blockStart time.Time) bool {
flushState := s.FlushState(blockStart)
return statusIsRetrievable(flushState.WarmStatus)
}
Expand Down Expand Up @@ -1091,6 +1095,9 @@ func (s *dbShard) newShardEntry(
default:
return nil, errNewShardEntryTagsTypeInvalid
}
// Don't put tags back in a pool since the merge logic may still have a
// handle on these.
seriesTags.NoFinalize()

series := s.seriesPool.Get()
series.Reset(seriesID, seriesTags, s.seriesBlockRetriever,
Expand Down Expand Up @@ -1959,7 +1966,7 @@ func (s *dbShard) ColdFlush(
// Cold flushes can only happen on blockStarts that have been
// warm flushed, because warm flush logic does not currently
// perform any merging logic.
if !s.IsBlockRetrievable(t.ToTime()) {
if !s.hasWarmFlushed(t.ToTime()) {
return
}

Expand Down Expand Up @@ -2160,15 +2167,15 @@ func (s *dbShard) Repair(
return repairer.Repair(ctx, nsCtx, tr, s)
}

func (s *dbShard) TagsFromSeriesID(seriesID ident.ID) (ident.Tags, error) {
func (s *dbShard) TagsFromSeriesID(seriesID ident.ID) (ident.Tags, bool, error) {
s.RLock()
entry, _, err := s.lookupEntryWithLock(seriesID)
s.RUnlock()
if entry == nil || err != nil {
return ident.Tags{}, err
return ident.Tags{}, false, err
}

return entry.Series.Tags(), nil
return entry.Series.Tags(), true, nil
}

func (s *dbShard) BootstrapState() BootstrapState {
Expand Down
7 changes: 4 additions & 3 deletions src/dbnode/storage/storage_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ type databaseShard interface {
) (repair.MetadataComparisonResult, error)

// TagsFromSeriesID returns the series tags from a series ID.
TagsFromSeriesID(seriesID ident.ID) (ident.Tags, error)
TagsFromSeriesID(seriesID ident.ID) (ident.Tags, bool, error)
}

// namespaceIndex indexes namespace writes.
Expand Down

0 comments on commit a115331

Please sign in to comment.