diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fe7cac41..5ff1a6dd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [FEATURE] Add `log.BufferedLogger` type. #338 * [FEATURE] Add `flagext.ParseFlagsAndArguments()` and `flagext.ParseFlagsWithoutArguments()` utilities. #341 +* [FEATURE] Add `log.RateLimitedLogger` for limiting the rate of logging. The `logger_rate_limit_discarded_log_lines_total` metrics traces the total number of discarded log lines per level. #352 * [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients. * [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274 * [ENHANCEMENT] Add middleware package. #38 diff --git a/log/ratelimit.go b/log/ratelimit.go new file mode 100644 index 000000000..9fc592f4a --- /dev/null +++ b/log/ratelimit.go @@ -0,0 +1,128 @@ +package log + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/time/rate" +) + +const ( + infoLevel = "info" + debugLevel = "debug" + warnLevel = "warning" + errorLevel = "error" +) + +type RateLimitedLogger struct { + next Interface + limiter *rate.Limiter + + discardedInfoLogLinesCounter prometheus.Counter + discardedDebugLogLinesCounter prometheus.Counter + discardedWarnLogLinesCounter prometheus.Counter + discardedErrorLogLinesCounter prometheus.Counter +} + +// NewRateLimitedLogger returns a logger.Interface that is limited to the given number of logs per second, +// with the given burst size. +func NewRateLimitedLogger(logger Interface, logsPerSecond rate.Limit, burstSize int, reg prometheus.Registerer) Interface { + discardedLogLinesCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "logger_rate_limit_discarded_log_lines_total", + Help: "Total number of discarded log lines per level.", + }, []string{"level"}) + + return &RateLimitedLogger{ + next: logger, + limiter: rate.NewLimiter(logsPerSecond, burstSize), + discardedInfoLogLinesCounter: discardedLogLinesCounter.WithLabelValues(infoLevel), + discardedDebugLogLinesCounter: discardedLogLinesCounter.WithLabelValues(debugLevel), + discardedWarnLogLinesCounter: discardedLogLinesCounter.WithLabelValues(warnLevel), + discardedErrorLogLinesCounter: discardedLogLinesCounter.WithLabelValues(errorLevel), + } +} + +func (l *RateLimitedLogger) Debugf(format string, args ...interface{}) { + if l.limiter.Allow() { + l.next.Debugf(format, args...) + } else { + l.discardedDebugLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Debugln(args ...interface{}) { + if l.limiter.Allow() { + l.next.Debugln(args...) + } else { + l.discardedDebugLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Infof(format string, args ...interface{}) { + if l.limiter.Allow() { + l.next.Infof(format, args...) + } else { + l.discardedInfoLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Infoln(args ...interface{}) { + if l.limiter.Allow() { + l.next.Infoln(args...) + } else { + l.discardedInfoLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Errorf(format string, args ...interface{}) { + if l.limiter.Allow() { + l.next.Errorf(format, args...) + } else { + l.discardedErrorLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Errorln(args ...interface{}) { + if l.limiter.Allow() { + l.next.Errorln(args...) + } else { + l.discardedErrorLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Warnf(format string, args ...interface{}) { + if l.limiter.Allow() { + l.next.Warnf(format, args...) + } else { + l.discardedWarnLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) Warnln(args ...interface{}) { + if l.limiter.Allow() { + l.next.Warnln(args...) + } else { + l.discardedWarnLogLinesCounter.Inc() + } +} + +func (l *RateLimitedLogger) WithField(key string, value interface{}) Interface { + return &RateLimitedLogger{ + next: l.next.WithField(key, value), + limiter: l.limiter, + discardedInfoLogLinesCounter: l.discardedInfoLogLinesCounter, + discardedDebugLogLinesCounter: l.discardedDebugLogLinesCounter, + discardedWarnLogLinesCounter: l.discardedWarnLogLinesCounter, + discardedErrorLogLinesCounter: l.discardedErrorLogLinesCounter, + } +} + +func (l *RateLimitedLogger) WithFields(f Fields) Interface { + return &RateLimitedLogger{ + next: l.next.WithFields(f), + limiter: l.limiter, + discardedInfoLogLinesCounter: l.discardedInfoLogLinesCounter, + discardedDebugLogLinesCounter: l.discardedDebugLogLinesCounter, + discardedWarnLogLinesCounter: l.discardedWarnLogLinesCounter, + discardedErrorLogLinesCounter: l.discardedErrorLogLinesCounter, + } +} diff --git a/log/ratelimit_test.go b/log/ratelimit_test.go new file mode 100644 index 000000000..fa0282a0d --- /dev/null +++ b/log/ratelimit_test.go @@ -0,0 +1,193 @@ +package log + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateLimitedLoggerLogs(t *testing.T) { + buf := bytes.NewBuffer(nil) + c := newCounterLogger(buf) + reg := prometheus.NewPedanticRegistry() + r := NewRateLimitedLogger(c, 1, 1, reg) + + r.Errorln("Error will be logged") + assert.Equal(t, 1, c.count) + + logContains := []string{"error", "Error will be logged"} + c.assertContains(t, logContains) +} + +func TestRateLimitedLoggerLimits(t *testing.T) { + buf := bytes.NewBuffer(nil) + c := newCounterLogger(buf) + reg := prometheus.NewPedanticRegistry() + r := NewRateLimitedLogger(c, 2, 2, reg) + + r.Errorln("error 1 will be logged") + assert.Equal(t, 1, c.count) + c.assertContains(t, []string{"error", "error 1 will be logged"}) + + r.Infoln("info 1 will be logged") + assert.Equal(t, 2, c.count) + c.assertContains(t, []string{"info", "info 1 will be logged"}) + + r.Debugln("debug 1 will be discarded") + assert.Equal(t, 2, c.count) + c.assertNotContains(t, "debug 1 will be discarded") + + r.Warnln("warning 1 will be discarded") + assert.Equal(t, 2, c.count) + c.assertNotContains(t, "warning 1 will be discarded") + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level. + # TYPE logger_rate_limit_discarded_log_lines_total counter + logger_rate_limit_discarded_log_lines_total{level="info"} 0 + logger_rate_limit_discarded_log_lines_total{level="debug"} 1 + logger_rate_limit_discarded_log_lines_total{level="warning"} 1 + logger_rate_limit_discarded_log_lines_total{level="error"} 0 + `))) + + // we wait 1 second, so the next group of lines can be logged + time.Sleep(time.Second) + r.Debugln("debug 2 will be logged") + assert.Equal(t, 3, c.count) + c.assertContains(t, []string{"debug", "debug 2 will be logged"}) + + r.Infoln("info 2 will be logged") + assert.Equal(t, 4, c.count) + c.assertContains(t, []string{"info", "info 2 will be logged"}) + + r.Errorln("error 2 will be discarded") + assert.Equal(t, 4, c.count) + c.assertNotContains(t, "error 2 will be discarded") + + r.Warnln("warning 2 will be discarded") + assert.Equal(t, 4, c.count) + c.assertNotContains(t, "warning 2 will be discarded") + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level. + # TYPE logger_rate_limit_discarded_log_lines_total counter + logger_rate_limit_discarded_log_lines_total{level="info"} 0 + logger_rate_limit_discarded_log_lines_total{level="debug"} 1 + logger_rate_limit_discarded_log_lines_total{level="error"} 1 + logger_rate_limit_discarded_log_lines_total{level="warning"} 2 + `))) +} + +func TestRateLimitedLoggerWithFields(t *testing.T) { + buf := bytes.NewBuffer(nil) + c := newCounterLogger(buf) + reg := prometheus.NewPedanticRegistry() + logger := NewRateLimitedLogger(c, 0.0001, 1, reg) + loggerWithFields := logger.WithField("key", "value") + + loggerWithFields.Errorln("Error will be logged") + assert.Equal(t, 1, c.count) + c.assertContains(t, []string{"key", "value", "error", "Error will be logged"}) + + logger.Infoln("Info will not be logged") + c.assertNotContains(t, "Info will not be logged") + + loggerWithFields.Debugln("Debug will not be logged") + c.assertNotContains(t, "Debug will not be logged") + + loggerWithFields.Warnln("Warning will not be logged") + c.assertNotContains(t, "Warning will not be logged") + assert.Equal(t, 1, c.count) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP logger_rate_limit_discarded_log_lines_total Total number of discarded log lines per level. + # TYPE logger_rate_limit_discarded_log_lines_total counter + logger_rate_limit_discarded_log_lines_total{level="info"} 1 + logger_rate_limit_discarded_log_lines_total{level="debug"} 1 + logger_rate_limit_discarded_log_lines_total{level="warning"} 1 + logger_rate_limit_discarded_log_lines_total{level="error"} 0 + `))) +} + +type counterLogger struct { + logger Interface + buf *bytes.Buffer + count int +} + +func (c *counterLogger) Debugf(format string, args ...interface{}) { + c.logger.Debugf(format, args...) + c.count++ +} + +func (c *counterLogger) Debugln(args ...interface{}) { + c.logger.Debugln(args...) + c.count++ +} + +func (c *counterLogger) Infof(format string, args ...interface{}) { + c.logger.Infof(format, args...) + c.count++ +} + +func (c *counterLogger) Infoln(args ...interface{}) { + c.logger.Infoln(args...) + c.count++ +} + +func (c *counterLogger) Warnf(format string, args ...interface{}) { + c.logger.Warnf(format, args...) + c.count++ +} + +func (c *counterLogger) Warnln(args ...interface{}) { + c.logger.Warnln(args...) + c.count++ +} + +func (c *counterLogger) Errorf(format string, args ...interface{}) { + c.logger.Errorf(format, args...) + c.count++ +} + +func (c *counterLogger) Errorln(args ...interface{}) { + c.logger.Errorln(args...) + c.count++ +} + +func (c *counterLogger) WithField(key string, value interface{}) Interface { + c.logger = c.logger.WithField(key, value) + return c +} + +func (c *counterLogger) WithFields(fields Fields) Interface { + c.logger = c.logger.WithFields(fields) + return c +} + +func (c *counterLogger) assertContains(t *testing.T, logContains []string) { + for _, content := range logContains { + require.True(t, bytes.Contains(c.buf.Bytes(), []byte(content))) + } +} + +func (c *counterLogger) assertNotContains(t *testing.T, content string) { + require.False(t, bytes.Contains(c.buf.Bytes(), []byte(content))) +} + +func newCounterLogger(buf *bytes.Buffer) *counterLogger { + logrusLogger := logrus.New() + logrusLogger.Out = buf + logrusLogger.Level = logrus.DebugLevel + return &counterLogger{ + logger: Logrus(logrusLogger), + buf: buf, + } +}