-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial query limit overriding #3090
Changes from 42 commits
9e3b280
d5ef8b4
5ecdb41
ab8384b
06ad766
6390dcf
c9b0ad0
0bba45e
b90cc5c
7916470
5c6ae76
3285a62
49b6513
6df725b
e7e23aa
6c4bca4
8f281b9
7b0d797
520f2de
2ad8c09
bc50860
f2476c5
9f4d9b1
33a7bf4
32f148f
66a0a42
a39700f
0e56187
2dbef2c
fb29267
90bef8a
d3747ec
03f987f
053739b
2272d5e
dddecfb
0bc4306
8e249e6
c8507c0
027aaae
40948e0
8b9b602
98efb93
6be4a30
db34398
02fcd56
8351c89
731ec5f
ffcea5e
6c42ae2
a86d55f
b86fdcf
7dc8689
3ec7673
6a8f9ed
c05b6c9
79cd2e4
0a31c53
38cabbd
5f81a75
a9621f7
f488d68
c5e0683
316a7d3
e3480a8
6e15b82
450acce
5208b2c
f880627
4cdea4c
9fc90b8
9b422ad
a3c0857
34a5e3e
bdd3ad4
aa89830
314e43f
24b7e87
8596169
bbc474e
0967ae6
6f540d6
21732ef
4a4d5cd
5e00b23
a943da0
b659237
7fc778a
e07440d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,13 +22,15 @@ package limits | |
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
xerrors "github.com/m3db/m3/src/x/errors" | ||
"github.com/m3db/m3/src/x/instrument" | ||
|
||
"github.com/uber-go/tally" | ||
"go.uber.org/atomic" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
|
@@ -45,15 +47,19 @@ type lookbackLimit struct { | |
name string | ||
options LookbackLimitOptions | ||
metrics lookbackLimitMetrics | ||
logger *zap.Logger | ||
recent *atomic.Int64 | ||
stopCh chan struct{} | ||
lock sync.RWMutex | ||
} | ||
|
||
type lookbackLimitMetrics struct { | ||
recentCount tally.Gauge | ||
recentMax tally.Gauge | ||
total tally.Counter | ||
exceeded tally.Counter | ||
optionsMax tally.Gauge | ||
optionsLookback tally.Gauge | ||
recentCount tally.Gauge | ||
recentMax tally.Gauge | ||
total tally.Counter | ||
exceeded tally.Counter | ||
|
||
sourceLogger SourceLogger | ||
} | ||
|
@@ -67,7 +73,7 @@ var ( | |
func DefaultLookbackLimitOptions() LookbackLimitOptions { | ||
return LookbackLimitOptions{ | ||
// Default to no limit. | ||
Limit: 0, | ||
Limit: nil, | ||
Lookback: defaultLookback, | ||
} | ||
} | ||
|
@@ -110,6 +116,7 @@ func newLookbackLimit( | |
name: name, | ||
options: opts, | ||
metrics: newLookbackLimitMetrics(instrumentOpts, name, sourceLoggerBuilder), | ||
logger: instrumentOpts.Logger(), | ||
recent: atomic.NewInt64(0), | ||
stopCh: make(chan struct{}), | ||
} | ||
|
@@ -171,6 +178,36 @@ func (q *queryLimits) AnyExceeded() error { | |
return q.bytesReadLimit.exceeded() | ||
} | ||
|
||
func (q *lookbackLimit) Options() LookbackLimitOptions { | ||
return q.options | ||
} | ||
|
||
// Override overrides the limit set on construction. | ||
rallen090 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (q *lookbackLimit) Update(opts LookbackLimitOptions) error { | ||
if err := opts.validate(); err != nil { | ||
return err | ||
} | ||
|
||
q.lock.Lock() | ||
defer q.lock.Unlock() | ||
|
||
old := q.options | ||
q.options = opts | ||
|
||
// If the lookback changed, replace the background goroutine that manages the periodic resetting. | ||
if q.options.Lookback != old.Lookback { | ||
q.stop() | ||
q.start() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could q.start possibly fail? Scary to imagine if the resetting goroutine doesn't get started, meaning the limits will run amok There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like it shouldn't be able to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why don't we change the tick value of the existing goroutine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had tried to do this but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just create a new ticker? don't really care, just throwing out ideas There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that's what we're doing instead of |
||
} | ||
|
||
q.logger.Info("query limit options updated", | ||
zap.String("name", q.name), | ||
zap.Any("new", opts), | ||
zap.Any("old", old)) | ||
|
||
return nil | ||
} | ||
|
||
// Inc increments the current value and returns an error if above the limit. | ||
func (q *lookbackLimit) Inc(val int, source []byte) error { | ||
if val < 0 { | ||
|
@@ -199,7 +236,15 @@ func (q *lookbackLimit) exceeded() error { | |
} | ||
|
||
func (q *lookbackLimit) checkLimit(recent int64) error { | ||
if q.options.Limit > 0 && recent > q.options.Limit { | ||
q.lock.RLock() | ||
limit := q.options.Limit | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is a lock necessary here if we grab a snapshot and use the snapshot? I imagine there will be a ton of activity on this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's just a read lock, which should be fine. and that's basically what is happening, snapshoting the current value with the lock. |
||
q.lock.RUnlock() | ||
|
||
if limit == nil { | ||
return nil | ||
} | ||
|
||
if recent >= *limit { | ||
q.metrics.exceeded.Inc(1) | ||
return xerrors.NewInvalidParamsError(NewQueryLimitExceededError(fmt.Sprintf( | ||
"query aborted due to limit: name=%s, limit=%d, current=%d, within=%s", | ||
|
@@ -210,7 +255,9 @@ func (q *lookbackLimit) checkLimit(recent int64) error { | |
|
||
func (q *lookbackLimit) start() { | ||
ticker := time.NewTicker(q.options.Lookback) | ||
ticker.Reset(q.options.Lookback) | ||
go func() { | ||
q.logger.Info("query limit interval started", zap.String("name", q.name)) | ||
for { | ||
select { | ||
case <-ticker.C: | ||
|
@@ -221,10 +268,16 @@ func (q *lookbackLimit) start() { | |
} | ||
} | ||
}() | ||
|
||
q.metrics.optionsMax.Update(float64(*q.options.Limit)) | ||
q.metrics.optionsLookback.Update(q.options.Lookback.Seconds()) | ||
} | ||
|
||
func (q *lookbackLimit) stop() { | ||
close(q.stopCh) | ||
q.stopCh = make(chan struct{}) | ||
|
||
q.logger.Info("query limit interval stopped", zap.String("name", q.name)) | ||
} | ||
|
||
func (q *lookbackLimit) current() int64 { | ||
|
@@ -244,8 +297,8 @@ func (q *lookbackLimit) reset() { | |
} | ||
|
||
func (opts LookbackLimitOptions) validate() error { | ||
if opts.Limit < 0 { | ||
return fmt.Errorf("query limit requires limit >= 0 (%d)", opts.Limit) | ||
if opts.Limit != nil && *opts.Limit < 0 { | ||
return fmt.Errorf("query limit requires limit >= 0 or nil (%d)", *opts.Limit) | ||
} | ||
if opts.Lookback <= 0 { | ||
return fmt.Errorf("query limit requires lookback > 0 (%d)", opts.Lookback) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
migrating these will probably be more annoying than migrating static config when we want to rename these. Should we land on the desired names now?