Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ratelimits: Remove a metric and some labels that we're not finding useful #7902

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 4 additions & 17 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ type Limiter struct {
source Source
clk clock.Clock

spendLatency *prometheus.HistogramVec
overrideUsageGauge *prometheus.GaugeVec
spendLatency *prometheus.HistogramVec
}

// NewLimiter returns a new *Limiter. The provided source must be safe for
Expand All @@ -52,17 +51,10 @@ func NewLimiter(clk clock.Clock, source Source, stats prometheus.Registerer) (*L
}, []string{"limit", "decision"})
stats.MustRegister(spendLatency)

overrideUsageGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ratelimits_override_usage",
Help: "Proportion of override limit used, by limit name and bucket key.",
}, []string{"limit", "bucket_key"})
stats.MustRegister(overrideUsageGauge)

return &Limiter{
source: source,
clk: clk,
spendLatency: spendLatency,
overrideUsageGauge: overrideUsageGauge,
source: source,
clk: clk,
spendLatency: spendLatency,
}, nil
}

Expand Down Expand Up @@ -284,11 +276,6 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
storedTAT, bucketExists := tats[txn.bucketKey]
d := maybeSpend(l.clk, txn, storedTAT)

if txn.limit.isOverride() {
utilization := float64(txn.limit.Burst-d.remaining) / float64(txn.limit.Burst)
l.overrideUsageGauge.WithLabelValues(txn.limit.name.String(), txn.limit.overrideKey).Set(utilization)
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved
}

if d.allowed && (storedTAT != d.newTAT) && txn.spend {
if !bucketExists {
newBuckets[txn.bucketKey] = d.newTAT
Expand Down
13 changes: 0 additions & 13 deletions ratelimits/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"

"github.com/letsencrypt/boulder/config"
berrors "github.com/letsencrypt/boulder/errors"
Expand Down Expand Up @@ -60,12 +59,6 @@ func TestLimiter_CheckWithLimitOverrides(t *testing.T) {
testCtx, limiters, txnBuilder, clk, testIP := setup(t)
for name, l := range limiters {
t.Run(name, func(t *testing.T) {
// Verify our overrideUsageGauge is being set correctly. 0.0 == 0%
// of the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit": NewRegistrationsPerIPAddress.String(),
"bucket_key": joinWithColon(NewRegistrationsPerIPAddress.EnumString(), tenZeroZeroTwo)}, 0)

overriddenBucketKey, err := newIPAddressBucketKey(NewRegistrationsPerIPAddress, net.ParseIP(tenZeroZeroTwo))
test.AssertNotError(t, err, "should not error")
overriddenLimit, err := txnBuilder.getLimit(NewRegistrationsPerIPAddress, overriddenBucketKey)
Expand All @@ -87,12 +80,6 @@ func TestLimiter_CheckWithLimitOverrides(t *testing.T) {
test.AssertEquals(t, d.remaining, int64(0))
test.AssertEquals(t, d.resetIn, time.Second)

// Verify our overrideUsageGauge is being set correctly. 1.0 == 100%
// of the bucket has been consumed.
test.AssertMetricWithLabelsEquals(t, l.overrideUsageGauge, prometheus.Labels{
"limit_name": NewRegistrationsPerIPAddress.String(),
"bucket_key": joinWithColon(NewRegistrationsPerIPAddress.EnumString(), tenZeroZeroTwo)}, 1.0)

// Verify our RetryIn is correct. 1 second == 1000 milliseconds and
// 1000/40 = 25 milliseconds per request.
test.AssertEquals(t, d.retryIn, time.Millisecond*25)
Expand Down
131 changes: 4 additions & 127 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ratelimits
import (
"context"
"errors"
"net"
"time"

"github.com/jmhodges/clock"
Expand All @@ -16,76 +15,22 @@ var _ Source = (*RedisSource)(nil)

// RedisSource is a ratelimits source backed by sharded Redis.
type RedisSource struct {
client *redis.Ring
clk clock.Clock
latency *prometheus.HistogramVec
client *redis.Ring
clk clock.Clock
}

// NewRedisSource returns a new Redis backed source using the provided
// *redis.Ring client.
func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
latency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "ratelimits_latency",
Help: "Histogram of Redis call latencies labeled by call=[set|get|delete|ping] and result=[success|error]",
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBucketsRange(0.0005, 3, 8),
},
[]string{"call", "result"},
)
stats.MustRegister(latency)

return &RedisSource{
client: client,
clk: clk,
latency: latency,
}
}

var errMixedSuccess = errors.New("some keys not found")

// resultForError returns a string representing the result of the operation
// based on the provided error.
func resultForError(err error) string {
if errors.Is(errMixedSuccess, err) {
// Indicates that some of the keys in a batchset operation were not found.
return "mixedSuccess"
} else if errors.Is(redis.Nil, err) {
// Bucket key does not exist.
return "notFound"
} else if errors.Is(err, context.DeadlineExceeded) {
// Client read or write deadline exceeded.
return "deadlineExceeded"
} else if errors.Is(err, context.Canceled) {
// Caller canceled the operation.
return "canceled"
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
// Dialer timed out connecting to Redis.
return "timeout"
}
var redisErr redis.Error
if errors.Is(err, redisErr) {
// An internal error was returned by the Redis server.
return "redisError"
}
return "failed"
}

func (r *RedisSource) observeLatency(call string, latency time.Duration, err error) {
result := "success"
if err != nil {
result = resultForError(err)
client: client,
clk: clk,
}
r.latency.With(prometheus.Labels{"call": call, "result": result}).Observe(latency.Seconds())
}

// BatchSet stores TATs at the specified bucketKeys using a pipelined Redis
// Transaction in order to reduce the number of round-trips to each Redis shard.
func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time) error {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for bucketKey, tat := range buckets {
// Set a TTL of TAT + 10 minutes to account for clock skew.
Expand All @@ -94,25 +39,14 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchset", r.clk.Since(start), err)
return err
}

totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchset_entry", perSetLatency, nil)
}

r.observeLatency("batchset", totalLatency, nil)
return nil
}

// BatchSetNotExisting attempts to set TATs for the specified bucketKeys if they
// do not already exist. Returns a map indicating which keys already existed.
func (r *RedisSource) BatchSetNotExisting(ctx context.Context, buckets map[string]time.Time) (map[string]bool, error) {
start := r.clk.Now()

pipeline := r.client.Pipeline()
cmds := make(map[string]*redis.BoolCmd, len(buckets))
for bucketKey, tat := range buckets {
Expand All @@ -122,74 +56,49 @@ func (r *RedisSource) BatchSetNotExisting(ctx context.Context, buckets map[strin
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchsetnotexisting", r.clk.Since(start), err)
return nil, err
}

alreadyExists := make(map[string]bool, len(buckets))
totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for bucketKey, cmd := range cmds {
success, err := cmd.Result()
if err != nil {
r.observeLatency("batchsetnotexisting_entry", perSetLatency, err)
return nil, err
}
if !success {
alreadyExists[bucketKey] = true
}
r.observeLatency("batchsetnotexisting_entry", perSetLatency, nil)
}

r.observeLatency("batchsetnotexisting", totalLatency, nil)
return alreadyExists, nil
}

// BatchIncrement updates TATs for the specified bucketKeys using a pipelined
// Redis Transaction in order to reduce the number of round-trips to each Redis
// shard.
func (r *RedisSource) BatchIncrement(ctx context.Context, buckets map[string]increment) error {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for bucketKey, incr := range buckets {
pipeline.IncrBy(ctx, bucketKey, incr.cost.Nanoseconds())
pipeline.Expire(ctx, bucketKey, incr.ttl)
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.observeLatency("batchincrby", r.clk.Since(start), err)
return err
}

totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchincrby_entry", perSetLatency, nil)
}

r.observeLatency("batchincrby", totalLatency, nil)
return nil
}

// Get retrieves the TAT at the specified bucketKey. If the bucketKey does not
// exist, ErrBucketNotFound is returned.
func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, error) {
start := r.clk.Now()

tatNano, err := r.client.Get(ctx, bucketKey).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, ErrBucketNotFound
}
// An error occurred while retrieving the TAT.
beautifulentropy marked this conversation as resolved.
Show resolved Hide resolved
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, err
}

r.observeLatency("get", r.clk.Since(start), nil)
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -198,82 +107,50 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
// shard. If a bucketKey does not exist, it WILL NOT be included in the returned
// map.
func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[string]time.Time, error) {
start := r.clk.Now()

pipeline := r.client.Pipeline()
for _, bucketKey := range bucketKeys {
pipeline.Get(ctx, bucketKey)
}
results, err := pipeline.Exec(ctx)
if err != nil && !errors.Is(err, redis.Nil) {
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}

totalLatency := r.clk.Since(start)
perEntryLatency := totalLatency / time.Duration(len(bucketKeys))

tats := make(map[string]time.Time, len(bucketKeys))
notFoundCount := 0
for i, result := range results {
tatNano, err := result.(*redis.StringCmd).Int64()
if err != nil {
if !errors.Is(err, redis.Nil) {
// This should never happen as any errors should have been
// caught after the pipeline.Exec() call.
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}
// Bucket key does not exist.
r.observeLatency("batchget_entry", perEntryLatency, err)
notFoundCount++
continue
}
tats[bucketKeys[i]] = time.Unix(0, tatNano).UTC()
r.observeLatency("batchget_entry", perEntryLatency, nil)
}

var batchErr error
if notFoundCount < len(results) {
// Some keys were not found.
batchErr = errMixedSuccess
} else if notFoundCount == len(results) {
// All keys were not found.
batchErr = redis.Nil
}

r.observeLatency("batchget", totalLatency, batchErr)
return tats, nil
}

// Delete deletes the TAT at the specified bucketKey ('name:id'). A nil return
// value does not indicate that the bucketKey existed.
func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {
start := r.clk.Now()

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.observeLatency("delete", r.clk.Since(start), err)
return err
}

r.observeLatency("delete", r.clk.Since(start), nil)
return nil
}

// Ping checks that each shard of the *redis.Ring is reachable using the PING
// command.
func (r *RedisSource) Ping(ctx context.Context) error {
start := r.clk.Now()

err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.observeLatency("ping", r.clk.Since(start), err)
return err
}

r.observeLatency("ping", r.clk.Since(start), nil)
return nil
}
Loading