Skip to content

Commit

Permalink
add implementation for tombstone cache
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
Ben Ye committed Jun 20, 2022
1 parent 515e097 commit 00cf8cc
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 191 deletions.
185 changes: 87 additions & 98 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

tombstonesMtx sync.RWMutex
tombstones map[ulid.ULID]*tombstone.Tombstone

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

Expand Down Expand Up @@ -558,6 +555,10 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
}
}

if err := s.SyncTombstones(ctx); err != nil {
return errors.Wrap(err, "sync tombstones")
}

return nil
}

Expand Down Expand Up @@ -791,7 +792,7 @@ func blockSeries(
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
tombstones []*tombstone.Tombstone,
tombstoneCache *tombstone.MemTombstonesCache,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
if err != nil {
Expand Down Expand Up @@ -824,6 +825,7 @@ func blockSeries(
)
PostingsLoop:
for _, id := range ps {
intervals := tombstoneCache.GetIntervalsByRef(id)
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime)
if err != nil {
return nil, nil, errors.Wrap(err, "read series")
Expand All @@ -834,34 +836,13 @@ PostingsLoop:
}

s := seriesEntry{}
if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil {
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}
var tombstoneIntervals promtombstones.Intervals
for _, ts := range tombstones {
for _, matcher := range *ts.Matchers {
if val := lset.Get(matcher.Name); val != "" {
if !matcher.Matches(val) {
continue
}
if skipChunks {
continue PostingsLoop
}
tombstoneIntervals = tombstoneIntervals.Add(promtombstones.Interval{Mint: ts.MinTime, Maxt: ts.MaxTime})
}
}
}

if !skipChunks {
// Schedule loading chunks.
s.refs = make([]chunks.ChunkRef, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
ChunkMetasLoop:
for j, meta := range chks {
for _, it := range tombstoneIntervals {
if meta.OverlapsClosedInterval(it.Mint, it.Maxt) {
continue ChunkMetasLoop
}
if (promtombstones.Interval{Mint: meta.MinTime, Maxt: meta.MaxTime}.IsSubrange(intervals)) {
continue
}

// seriesEntry s is appended to res, but not at every outer loop iteration,
Expand All @@ -887,6 +868,9 @@ PostingsLoop:
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
}
}
if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil {
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

s.lset = labelpb.ExtendSortedLabels(lset, extLset)
res = append(res, s)
Expand Down Expand Up @@ -1055,37 +1039,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}

s.mtx.RLock()
s.tombstonesMtx.RLock()
tombstones := s.tombstones
s.tombstonesMtx.RUnlock()

// Filter tombstones by request time range.
matchedTombstones := filterTombstonesByTimeRange(tombstones, req.MinTime, req.MaxTime)

for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
if !ok {
continue
}
// Filter tombstones by block set external labels.
blockSetTombstones := make([]*tombstone.Tombstone, 0, len(matchedTombstones))
for _, ts := range matchedTombstones {
matchers, ok := ts.MatchLabels(bs.labels)
if !ok {
continue
}
// If no matchers are left after removing external label matchers,
// we drop the tombstone.
if matchers == nil || len(*matchers) == 0 {
continue
}
blockSetTombstones = append(blockSetTombstones, &tombstone.Tombstone{
Matchers: matchers,
MinTime: ts.MinTime,
MaxTime: ts.MaxTime,
ULID: ts.ULID,
})
}

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

Expand All @@ -1097,26 +1055,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
b := b
gctx := gctx

// Filter tombstones at block level by block metadata.
blockTombstones := make([]*tombstone.Tombstone, 0, len(blockSetTombstones))
for _, ts := range blockSetTombstones {
matchers, ok := ts.MatchMeta(b.meta)
if !ok {
continue
}
// If no matchers are left after removing external label matchers,
// we drop the tombstone.
if matchers == nil || len(*matchers) == 0 {
continue
}
blockTombstones = append(blockTombstones, &tombstone.Tombstone{
Matchers: matchers,
MinTime: ts.MinTime,
MaxTime: ts.MaxTime,
ULID: ts.ULID,
})
}

if s.enableSeriesResponseHints {
// Keep track of queried blocks.
resHints.AddQueriedBlock(b.meta.ULID)
Expand Down Expand Up @@ -1153,7 +1091,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
blockTombstones,
b.tombstoneCache,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1356,7 +1294,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

result = strutil.MergeSlices(res, extRes)
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, b.tombstoneCache)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1487,7 +1425,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
result = res
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, b.tombstoneCache)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1545,9 +1483,76 @@ func (s *BucketStore) SyncTombstones(ctx context.Context) error {
if err != nil {
return err
}
s.tombstonesMtx.Lock()
s.tombstones = tombstones
s.tombstonesMtx.Unlock()

s.mtx.RLock()
defer s.mtx.RUnlock()
for _, block := range s.blocks {
for _, id := range block.tombstoneCache.GetTombstoneIDs() {
if _, ok := tombstones[id]; !ok {
block.tombstoneCache.Delete(id)
}
}
}
for _, bs := range s.blockSets {
for tid, t := range tombstones {
if _, ok := t.MatchLabels(bs.labels); !ok {
continue
}
blocks := bs.getFor(t.MinTime, t.MaxTime, downsample.ResLevel2, nil)
for _, block := range blocks {
matchers, ok := t.MatchMeta(block.meta)
// Impossible as we get matches blocks already.
if !ok {
continue
}
if _, ok := block.tombstoneCache.Get(tid); ok {
continue
}

memTombstone := promtombstones.NewMemTombstones()
indexr := block.indexReader()
ps, err := indexr.ExpandedPostings(ctx, *matchers)
if err != nil {
continue
}
if len(ps) == 0 {
block.tombstoneCache.Set(tid, memTombstone)
continue
}
// Preload all series index data.
if err := indexr.PreloadSeries(ctx, ps); err != nil {
continue
}

// Transform all series into the response types and mark their relevant chunks
// for preloading.
var (
symbolizedLset []symbolizedLabel
chks []chunks.Meta
)
PostingsLoop:
for _, id := range ps {
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, false, t.MinTime, t.MaxTime)
if err != nil {
continue
}
if !ok {
// No matching chunks for this time duration, skip series.
continue
}
for _, chk := range chks {
if chk.OverlapsClosedInterval(t.MinTime, t.MaxTime) {
// Delete only until the current values and not beyond.
tmin, tmax := tombstone.ClampInterval(t.MinTime, t.MaxTime, chks[0].MinTime, chks[len(chks)-1].MaxTime)
memTombstone.AddInterval(id, promtombstones.Interval{Mint: tmin, Maxt: tmax})
continue PostingsLoop
}
}
}
block.tombstoneCache.Set(tid, memTombstone)
}
}
}
return nil
}

Expand Down Expand Up @@ -1709,6 +1714,8 @@ type bucketBlock struct {
// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
relabelLabels labels.Labels

tombstoneCache *tombstone.MemTombstonesCache
}

func newBucketBlock(
Expand Down Expand Up @@ -1740,6 +1747,7 @@ func newBucketBlock(
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
}),
tombstoneCache: tombstone.NewMemTombstoneCache(),
}
sort.Sort(b.extLset)
sort.Sort(b.relabelLabels)
Expand Down Expand Up @@ -2841,22 +2849,3 @@ func (s queryStats) merge(o *queryStats) *queryStats {
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) {
return pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, maxChunkPoolBytes)
}

// filterTombstonesByTimeRange filters tombstones map by time range and returns a
// list of tomebstones sorted by min time.
func filterTombstonesByTimeRange(tombstones map[ulid.ULID]*tombstone.Tombstone, mint, maxt int64) []*tombstone.Tombstone {
matchedTombstones := make([]*tombstone.Tombstone, 0, len(tombstones))
for _, ts := range tombstones {
if !ts.OverlapsClosedInterval(mint, maxt) {
continue
}
matchedTombstones = append(matchedTombstones, ts)
}
sort.Slice(matchedTombstones, func(i, j int) bool {
if matchedTombstones[i].MinTime == matchedTombstones[j].MinTime {
return matchedTombstones[i].MaxTime < matchedTombstones[j].MaxTime
}
return matchedTombstones[i].MinTime < matchedTombstones[j].MinTime
})
return matchedTombstones
}
89 changes: 0 additions & 89 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,92 +2327,3 @@ func BenchmarkDownsampledBlockSeries(b *testing.B) {
}
}
}

func TestFilterTombstonesByTimeRange(t *testing.T) {
ulid1 := ulid.MustNew(0, nil)
ulid2 := ulid.MustNew(5, nil)
ulid3 := ulid.MustNew(10, nil)
ulid4 := ulid.MustNew(15, nil)
ts1 := &tombstone.Tombstone{
ULID: ulid1,
MinTime: 0,
MaxTime: 1,
}
ts2 := &tombstone.Tombstone{
ULID: ulid2,
MinTime: 5,
MaxTime: 10,
}
ts3 := &tombstone.Tombstone{
ULID: ulid3,
MinTime: 8,
MaxTime: 20,
}
ts4 := &tombstone.Tombstone{
ULID: ulid4,
MinTime: 11,
MaxTime: 20,
}
for _, tcase := range []struct {
name string
tombstones map[ulid.ULID]*tombstone.Tombstone
mint int64
maxt int64
expected []*tombstone.Tombstone
}{
{
name: "empty tombstones",
tombstones: map[ulid.ULID]*tombstone.Tombstone{},
mint: 0,
maxt: 10,
expected: []*tombstone.Tombstone{},
},
{
name: "one tombstone in timeperiod",
tombstones: map[ulid.ULID]*tombstone.Tombstone{
ulid1: ts1,
},
mint: 0,
maxt: 10,
expected: []*tombstone.Tombstone{ts1},
},
{
name: "multiple tombstones in timeperiod",
tombstones: map[ulid.ULID]*tombstone.Tombstone{
ulid1: ts1,
ulid2: ts2,
},
mint: 0,
maxt: 10,
expected: []*tombstone.Tombstone{ts1, ts2},
},
{
name: "multiple tombstones overlapped in timeperiod",
tombstones: map[ulid.ULID]*tombstone.Tombstone{
ulid1: ts1,
ulid2: ts2,
ulid3: ts3,
},
mint: 0,
maxt: 10,
expected: []*tombstone.Tombstone{ts1, ts2, ts3},
},
{
name: "ulid4 not in period",
tombstones: map[ulid.ULID]*tombstone.Tombstone{
ulid1: ts1,
ulid2: ts2,
ulid3: ts3,
ulid4: ts4,
},
mint: 0,
maxt: 10,
expected: []*tombstone.Tombstone{ts1, ts2, ts3},
},
} {
t.Run(tcase.name, func(t *testing.T) {
got := filterTombstonesByTimeRange(tcase.tombstones, tcase.mint, tcase.maxt)
testutil.Equals(t, tcase.expected, got)
})
}
}
Loading

0 comments on commit 00cf8cc

Please sign in to comment.