Skip to content

Commit

Permalink
chore(bloom): remove unused code from blooms (#14539)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Oct 21, 2024
1 parent dbb3b6e commit bf54cf1
Show file tree
Hide file tree
Showing 51 changed files with 138 additions and 857 deletions.
6 changes: 3 additions & 3 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (i *blockLoadingIter) loadNext() bool {
blockRefs := i.overlapping.At()

loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := iter.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
filtered := iter.NewFilterIter(loader, i.filter)

iters := make([]iter.PeekIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = iter.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
i.iter = iter.NewDedupingIter(
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
Expand Down Expand Up @@ -346,7 +346,7 @@ func overlappingBlocksIter(inputs []bloomshipper.BlockRef) iter.Iterator[[]bloom
// can we assume sorted blocks?
peekIter := iter.NewPeekIter(iter.NewSliceIter(inputs))

return iter.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
return iter.NewDedupingIter(
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func TestBatchedLoader(t *testing.T) {
)
}

loader := newBatchedLoader[int, int, int](
loader := newBatchedLoader(
tc.ctx,
fetchers,
tc.inputs,
tc.mapper,
tc.batchSize,
)

got, err := v2.Collect[int](loader)
got, err := v2.Collect(loader)
if tc.err {
require.Error(t, err)
return
Expand Down
15 changes: 4 additions & 11 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Builder struct {
metrics *Metrics
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
chunkLoader ChunkLoader

Expand Down Expand Up @@ -74,18 +73,12 @@ func New(
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores("bloom-builder", schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}

metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
logger: logger,
Expand Down Expand Up @@ -386,7 +379,7 @@ func (b *Builder) processTask(
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := iter.NewCounterIter[*v1.Series](seriesItr)
seriesItrWithCounter := iter.NewCounterIter(seriesItr)

gen := NewSimpleBloomGenerator(
tenant,
Expand Down Expand Up @@ -416,7 +409,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to build block: %w", err)
}

logger := log.With(logger, "block", built.BlockRef.String())
logger := log.With(logger, "block", built.String())

if err := client.PutBlock(
ctx,
Expand Down Expand Up @@ -461,7 +454,7 @@ func (b *Builder) processTask(
}
meta.MetaRef = ref

logger = log.With(logger, "meta", meta.MetaRef.String())
logger = log.With(logger, "meta", meta.String())

if err := client.PutMeta(ctx, meta); err != nil {
level.Error(logger).Log("msg", "failed to write meta", "err", err)
Expand Down Expand Up @@ -490,7 +483,7 @@ func (b *Builder) loadWorkForGap(
table config.DayTable,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
seriesItr := iter.NewCancelableIter(ctx, iter.NewSliceIter(gap.Series))

// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
Expand Down
28 changes: 0 additions & 28 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package builder
import (
"context"
"fmt"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -17,29 +16,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

// inclusive range
type Keyspace struct {
min, max model.Fingerprint
}

func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
if other.max < k.min {
return v1.Before
} else if other.min > k.max {
return v1.After
}
return v1.Overlap
}

// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results iter.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
Expand Down Expand Up @@ -247,12 +225,6 @@ func (b *LazyBlockBuilderIterator) Err() error {
return b.err
}

// IndexLoader loads an index. This helps us do things like
// load TSDBs for a specific period excluding multitenant (pre-compacted) indices
type indexLoader interface {
Index() (tsdb.Index, error)
}

// ChunkItersByFingerprint models the chunks belonging to a fingerprint
type ChunkItersByFingerprint struct {
fp model.Fingerprint
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro

minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock

itr := v2.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
itr := v2.NewSliceIter(data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

Expand Down Expand Up @@ -134,8 +134,8 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v2.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v2.NewSliceIter[v1.SeriesWithBlooms](data),
storeItr := v2.NewMapIter(
v2.NewSliceIter(data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return &swb.Series.Series
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](series)), nil
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators[*v1.Series](
v1.EqualIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
Expand Down
6 changes: 0 additions & 6 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/queue"
)

const (
Expand Down Expand Up @@ -211,7 +209,3 @@ func NewMetrics(
}),
}
}

func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics {
return queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
}
8 changes: 4 additions & 4 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func New(
}

// Queue to manage tasks
queueMetrics := NewQueueMetrics(r)
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
Expand Down Expand Up @@ -591,14 +591,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.String())
return nil, errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.String())
}

level.Debug(logger).Log(
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloombuild/planner/strategies/splitkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ func blockPlansForGaps(
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})

peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
peekingBlocks := iter.NewPeekIter(
iter.NewSliceIter(
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
itr := iter.NewDedupingIter(
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
Expand All @@ -224,7 +224,7 @@ func blockPlansForGaps(
peekingBlocks,
)

deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
deduped, err := iter.Collect(itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}
Expand Down
125 changes: 0 additions & 125 deletions pkg/bloombuild/planner/util.go

This file was deleted.

Loading

0 comments on commit bf54cf1

Please sign in to comment.