Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add ability to limit max num of samples / concurrent queries #798

Merged
merged 57 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
1bc1f59
store: add ability to limit max samples / conc. queries
Feb 1, 2019
e87f763
store/bucket: account for the RawChunk case
Feb 5, 2019
1ab1dc6
store/bucket_e2e_test: adjust sample limit size
Feb 5, 2019
d7c3ade
store/bucket: add metric thanos_bucket_store_queries_limited_total
Feb 5, 2019
12db24a
store/bucket: register queriesLimited metric
Feb 5, 2019
9d0b8a7
store: make changes according to the review comments
Feb 8, 2019
9727072
docs/store: update
Feb 8, 2019
d9c733a
store: gating naming changes, add span/extra metric
Feb 8, 2019
c4ce735
store: improve error messages
Feb 8, 2019
30eef19
store/limiter: improve error messages
Feb 8, 2019
194394d
store/gate: time -> seconds
Feb 8, 2019
2e51c2e
store/bucket_e2e_test: narrow down the first query
Feb 8, 2019
58a14fa
store/bucket: check for negative maxConcurrent
Feb 8, 2019
4d1b7ed
cmd/store: clarify help message
Feb 8, 2019
3ae3733
Merge remote-tracking branch 'origin/master' into smpl
Feb 8, 2019
b149f74
pkg/store: hook thanos_bucket_store_queries_limited into Limiter
Feb 8, 2019
e79c56d
store/bucket_test: fix NewBucketStore call
Feb 8, 2019
1d07515
docs: update again
Feb 8, 2019
38a093b
store/gate: spelling fix
Feb 9, 2019
7b13f7e
store/gate: spelling fix #2
Feb 9, 2019
4540394
Merge remote-tracking branch 'origin/master' into feature/store_sampl…
GiedriusS Feb 15, 2019
3e532fe
store/bucket: remove pointless newline
GiedriusS Feb 15, 2019
cff979c
store/gate: generalize gate timing
Mar 14, 2019
e7ea64b
store/gate: convert the g.gateTiming metric into a histogram
Mar 14, 2019
23c7368
store/bucket: change comment wording
Mar 14, 2019
da575e4
store/bucket: remove type from maxSamplesPerChunk
Mar 14, 2019
e390846
store/bucket: rename metric into thanos_bucket_store_queries_dropped
Mar 14, 2019
24e8e1f
thanos/store: clarify help message
Mar 14, 2019
4012eca
store/gate: rename metric to thanos_bucket_store_queries_in_flight
Mar 14, 2019
e7be55d
store/gate: fix MustRegister() call
Mar 14, 2019
3e8150d
docs: update
Mar 14, 2019
5ec5ce9
store/bucket: clarify the name of the span
Mar 14, 2019
810a131
store/bucket: inline calculation into the function call
Mar 14, 2019
541f180
Merge remote-tracking branch 'origin' into fork_store_sample_limit
Mar 14, 2019
ae8e425
CHANGELOG: add item about this
Mar 14, 2019
de8a234
store/gate: reduce number of buckets
Mar 15, 2019
07b4658
store/bucket: rename metric to thanos_bucket_store_queries_dropped_total
Mar 15, 2019
61d6ecd
store/bucket: move defer out of code block
Mar 15, 2019
70b115d
store/gate: generalize gate for different kinds of subsystems
Mar 15, 2019
36f1153
store/limiter: remove non-nil check
Mar 15, 2019
9b74bbe
CHANGELOG: fixes
Mar 15, 2019
82bdb3c
store/limiter: convert failedCounter to non-ptr
Mar 15, 2019
4d8420f
store/limiter: remove invalid comment
Mar 15, 2019
590b9a6
*: update according to review comments
Mar 18, 2019
3f40bac
CHANGELOG: update
Mar 18, 2019
f4734e5
*: fix according to review
Mar 18, 2019
d6c1534
*: fix according to review
Mar 18, 2019
1147acd
*: make docs
Mar 18, 2019
1d0fad3
CHANGELOG: clean up
Mar 18, 2019
ef4a51e
CHANGELOG: update
GiedriusS Mar 18, 2019
48141fd
Merge remote-tracking branch 'origin' into feature/store_sample_limit
Mar 20, 2019
c9a7d83
*: queries_in_flight_total -> queries_in_flight
GiedriusS Mar 21, 2019
d71f1d8
Merge branch 'master' into smpl_limit
Mar 22, 2019
280a8ca
store/bucket: do not wraper samplesLimiter error
Mar 22, 2019
11c4b18
store/bucket: err -> errors.Wrap
Mar 22, 2019
31a8346
store: make store.grpc.series-max-concurrency 20 by default
GiedriusS Mar 23, 2019
6e98dfd
CHANGELOG: add warning about new limit
GiedriusS Mar 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("grpc-sample-limit",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe align the flags with how prom has flags? grpc-sample-limit > {query|store|storage}.grpc.read-sample-limit?

See storage.remote.read-sample-limit https://github.com/prometheus/prometheus/blob/master/cmd/prometheus/main.go#L206

"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: may overestimate the number of samples that would be needed to respond to a query.").
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Default("50000000").Uint()
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

maxConcurrent := cmd.Flag("grpc-concurrent-limit", "Maximum number of concurrent Series calls. 0 means no limit.").Default("20").Int()
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above grpc-concurrent-limit > query.grpc.max-concurrency?

See query.max-concurrency https://github.com/prometheus/prometheus/blob/master/cmd/prometheus/main.go#L233

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename to store.grpc.max-concurrency as we are talking about Thanos Store here :P


objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Expand Down Expand Up @@ -63,6 +69,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
peer,
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
debugLogging,
*syncInterval,
Expand All @@ -87,6 +95,8 @@ func runStore(
peer cluster.Peer,
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -117,6 +127,8 @@ func runStore(
dataDir,
indexCacheSizeBytes,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
Expand Down
8 changes: 8 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ Flags:
--index-cache-size=250MB Maximum size of items held in the index cache.
--chunk-pool-size=2GB Maximum size of concurrently allocatable bytes
for chunks.
--grpc-sample-limit=50000000
Maximum amount of samples returned via a single
Series call. 0 means no limit. NOTE: may
overestimate the number of samples that would
be needed to respond to a query.
--grpc-concurrent-limit=20
Maximum number of concurrent Series calls. 0
means no limit.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Expand Down
76 changes: 67 additions & 9 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ import (
"google.golang.org/grpc/status"
)

// Approximately this is the max number of samples that we may have in any given chunk. This is needed
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
// for precalculating the number of samples that we may have to retrieve and decode for any given query
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone could tell that's too verbose, but I actually like this 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 someone will definitely start wondering why 120 is here just like I have at the beginning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is fine 👍

// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
const maxSamplesPerChunk uint64 = 120
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

type bucketStoreMetrics struct {
blocksLoaded prometheus.Gauge
blockLoads prometheus.Counter
Expand All @@ -57,6 +65,8 @@ type bucketStoreMetrics struct {
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesLimited prometheus.Counter
queriesLimit prometheus.Gauge
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -132,6 +142,15 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
},
})

m.queriesLimited = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_limited",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this metric name is off. I think dropped should be somewhere, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will rename :)

Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_limit",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus' team use prometheus_engine_queries_concurrent_max. Naming is hard but I figure we could be consistent with them ?

https://github.com/prometheus/prometheus/blob/8155cc49924cd48ec552506b4e5519ebafa8b722/promql/engine.go#L234-L239

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Should rename this to thanos_bucket_store_queries_concurrent_max.

Help: "Number of maximum concurrent queries.",
})

if reg != nil {
reg.MustRegister(
m.blockLoads,
Expand All @@ -148,6 +167,8 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.seriesMergeDuration,
m.resultSeriesCount,
m.chunkSizeBytes,
m.queriesLimited,
m.queriesLimit,
)
}
return &m
Expand All @@ -173,7 +194,12 @@ type BucketStore struct {
// Number of goroutines to use when syncing blocks from object storage.
blockSyncConcurrency int

partitioner partitioner
// Query gate which limits the maximum amount of concurrent queries.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
queryGate *Gate

// Samples limiter which limits the number of samples per each Series() call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samples limiter which > samplesLimiter

samplesLimiter *Limiter
partitioner partitioner
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -185,12 +211,19 @@ func NewBucketStore(
dir string,
indexCacheSizeBytes uint64,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
}

if maxConcurrent < 0 {
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

indexCache, err := newIndexCache(reg, indexCacheSizeBytes)
if err != nil {
return nil, errors.Wrap(err, "create index cache")
Expand All @@ -202,6 +235,7 @@ func NewBucketStore(

const maxGapSize = 512 * 1024

metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
bucket: bucket,
Expand All @@ -212,14 +246,18 @@ func NewBucketStore(
blockSets: map[uint64]*bucketBlockSet{},
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
queryGate: NewGate(maxConcurrent, reg),
samplesLimiter: NewLimiter(maxSampleCount, &metrics.queriesLimited),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
}
s.metrics = newBucketStoreMetrics(reg)
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -472,14 +510,15 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}

func (s *BucketStore) blockSeries(
func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []labels.Matcher,
req *storepb.SeriesRequest,
samplesLimiter *Limiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand Down Expand Up @@ -557,7 +596,7 @@ func (s *BucketStore) blockSeries(
}

// Preload all chunks that were marked in the previous stage.
if err := chunkr.preload(); err != nil {
if err := chunkr.preload(samplesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload chunks")
}

Expand Down Expand Up @@ -661,10 +700,17 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels
}

// Series implements the storepb.StoreServer interface.
// TODO(bwplotka): It buffers all chunks in memory and only then streams to client.
// 1. Either count chunk sizes and error out too big query.
// 2. Stream posting -> series -> chunk all together.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
{
span, _ := tracing.StartSpan(srv.Context(), "bucket_store_gate")
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
err := s.queryGate.IsMyTurn(srv.Context())
span.Finish()
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
defer s.queryGate.Done()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to actually do this. As defer will end until the whole function is done?

Copy link
Member Author

@GiedriusS GiedriusS Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then the counter would go down at exactly this point and then another query could come through even though the old one is still being parsed. Or am I wrong? Imagine this situation:

query -> gate -> downloading/parsing/etc.
               query -> gate -> downloading/parsing/etc.

We could start parsing another query if it came just after the gate had been passed in another query. Perhaps I am missing something :?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore me, I confused this with span.finish. Can you at least put this defer outside of code block? It is quite confusing as at first glance it shows the intent of this defer to be end at }. (:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still not resolved:

Can you at least put this defer outside of code block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, missed that one

}

matchers, err := translateMatchers(req.Matchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -703,13 +749,14 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Add(func() error {
part, pstats, err := s.blockSeries(ctx,
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
b.meta.Thanos.Labels,
indexr,
chunkr,
blockMatchers,
req,
s.samplesLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1585,11 +1632,22 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
}

// preload all added chunk IDs. Must be called before the first call to Chunk is made.
func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) preload(samplesLimiter *Limiter) error {
const maxChunkSize = 16000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could perhaps move this to the top along with maxSamplesPerChunk?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No necessarily as it only is used here


var g run.Group

numChunks := uint64(0)
for _, offsets := range r.preloads {
for range offsets {
numChunks++
}
}
numSamples := numChunks * maxSamplesPerChunk
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
if err := samplesLimiter.Check(numSamples); err != nil {
return errors.Wrapf(err, "exceeded samples limit")
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}

for seq, offsets := range r.preloads {
sort.Slice(offsets, func(i, j int) bool {
return offsets[i] < offsets[j]
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *storeSuite) Close() {
s.wg.Wait()
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -87,7 +87,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, false, 20)
store, err := NewBucketStore(log.NewLogfmtLogger(os.Stderr), nil, bkt, dir, 100, 0, maxSampleCount, 20, false, 20)
testutil.Ok(t, err)

s.store = store
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0)
defer s.Close()

testBucketStore_e2e(t, ctx, s)
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0)
defer s.Close()

GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
testBucketStore_e2e(t, ctx, s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestBucketStore_Info(t *testing.T) {
dir, err := ioutil.TempDir("", "prometheus-test")
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, false, 20)
bucketStore, err := NewBucketStore(nil, nil, nil, dir, 2e5, 2e5, 0, 0, false, 20)
testutil.Ok(t, err)

resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{})
Expand Down
54 changes: 54 additions & 0 deletions pkg/store/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package store

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/gate"
)

// Gate wraps the Prometheus gate with extra metrics.
type Gate struct {
g *gate.Gate
currentQueries prometheus.Gauge
gateTiming prometheus.Summary
}

// NewGate returns a new gate.
func NewGate(maxConcurrent int, reg prometheus.Registerer) *Gate {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
g := &Gate{
g: gate.New(maxConcurrent),
}
g.currentQueries = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_total",
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
Help: "Total number of currently executing queries.",
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
})
g.gateTiming = prometheus.NewSummary(prometheus.SummaryOpts{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summaries should be banned. They are quite expensive and non aggregatable. Can we use histogram? (:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Name: "thanos_bucket_store_gate_seconds",
Help: "How many seconds it took for a query to pass through the gate.",
})

if reg != nil {
reg.MustRegister(g.currentQueries, g.gateTiming)
}

return g
}

// IsMyTurn iniates a new query and waits until it's our turn to fulfill a query request.
func (g *Gate) IsMyTurn(ctx context.Context) error {
g.currentQueries.Inc()
start := time.Now()
if err := g.g.Start(ctx); err != nil {
return err
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
g.gateTiming.Observe(float64(time.Now().Sub(start)))
return nil
}

// Done finishes a query.
func (g *Gate) Done() {
g.currentQueries.Dec()
g.g.Done()
}
33 changes: 33 additions & 0 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package store

import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)

// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64

// A ptr to a counter metric which we will increase if Check() fails.
failedCounter *prometheus.Counter
}

// NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
func NewLimiter(limit uint64, ctr *prometheus.Counter) *Limiter {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
return &Limiter{limit: limit, failedCounter: ctr}
}

// Check checks if the passed number exceeds the limits or not.
func (l *Limiter) Check(num uint64) error {
if l.limit == 0 {
return nil
}
if num > l.limit {
if l.failedCounter != nil {
(*l.failedCounter).Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why casting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: failedCounter is a pointer so this needs to be derefenced before Inc() is called. We need a pointer here because in this way we decouple the counter from the metrics registry, and it makes it much easier to use in the future if the need will arise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still not get this, will ask offline

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it know - we hold pointer to interface. It's interesting as this interface holds always *counter, so I think we are good without this extra ref pointer: result := &counter{desc: desc, labelPairs: desc.constLabelPairs}

Also can we get rid of nil checking everytime we do Check? I don't think it's necessary - I think we can ensure it's always set

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yeah, we should just remove the nil checks since the caller ensures that. This happens also in some other places, will clean this up.

}
return errors.Errorf("limit %v violated (got %v)", l.limit, num)
}
return nil
}