Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Update flush times after each flush #3890

Merged
merged 30 commits into from
Nov 6, 2021
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d16d7a5
[aggregator] Update flush times after each flush
arnikola Oct 29, 2021
d5e2dd8
Merge branch 'master' into arnikola/update-flush
arnikola Nov 4, 2021
b92ad82
Fix up tests, deprecate the flushTimesPersistEvery option
arnikola Nov 4, 2021
3eb513d
Update tests
arnikola Nov 4, 2021
50e6859
Excise remaining FlushTimesPersistEvery
arnikola Nov 4, 2021
24b9e81
Fix tests, lint
arnikola Nov 4, 2021
b0caeaf
Filtering through eager shutdown flag for back-compat
arnikola Nov 4, 2021
4c02198
Add timeout metric for alerting
arnikola Nov 4, 2021
8694a72
lint
arnikola Nov 4, 2021
3e2040b
Merge branch 'master' into arnikola/update-flush
arnikola Nov 4, 2021
c3b92b7
Always eager close aggregator
arnikola Nov 4, 2021
d9e5191
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 4, 2021
6537fa7
Remove logging
arnikola Nov 4, 2021
7b42594
Re-add FlushTimesPersistEvery option
arnikola Nov 5, 2021
284a47c
PR response
arnikola Nov 5, 2021
932a5ca
Do not have a default defaultFlushTimesPersistEvery, to match config
arnikola Nov 5, 2021
ec4100f
Touchups for clarity/saner defaults
arnikola Nov 5, 2021
6164727
Re-add param (oops)
arnikola Nov 5, 2021
5040f33
responsible
arnikola Nov 5, 2021
cf0c3bb
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
aefd482
Close
arnikola Nov 5, 2021
40346ba
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 5, 2021
c282209
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
9ba8ff0
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 5, 2021
6152b61
response
arnikola Nov 5, 2021
666503f
wip
arnikola Nov 5, 2021
a29e989
Test cleanup
arnikola Nov 5, 2021
278d7d2
Response
arnikola Nov 5, 2021
8b480d3
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
86dc55d
Merge branch 'master' into arnikola/update-flush
arnikola Nov 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Response
arnikola committed Nov 5, 2021
commit 278d7d2ab8b9bb69c6ec274d06478a23283f9cbd
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -172,6 +172,8 @@ func (agg *aggregator) Open() error {
return err
}
if agg.checkInterval > 0 {
// NB: tick updates some metrics on how many series the aggregator currently
// has of each type, and expires old metrics from the local metric lists.
go agg.tick()
}

39 changes: 11 additions & 28 deletions src/aggregator/aggregator/flush_mgr_options.go
Original file line number Diff line number Diff line change
@@ -98,12 +98,6 @@ type FlushManagerOptions interface {
// FlushTimesManager returns the flush times manager.
FlushTimesManager() FlushTimesManager

// SetFlushTimesPersistEvery sets how frequently the flush times are stored in kv.
SetFlushTimesPersistEvery(value time.Duration) FlushManagerOptions

// FlushTimesPersistEvery returns how frequently the flush times are stored in kv.
FlushTimesPersistEvery() time.Duration

// SetMaxBufferSize sets the maximum duration data are buffered for without getting
// flushed or discarded to handle transient KV issues or for backing out of active
// topology changes.
@@ -128,18 +122,17 @@ type FlushManagerOptions interface {
}

type flushManagerOptions struct {
clockOpts clock.Options
instrumentOpts instrument.Options
checkEvery time.Duration
jitterEnabled bool
maxJitterFn FlushJitterFn
workerPool sync.WorkerPool
placementManager PlacementManager
electionManager ElectionManager
flushTimesManager FlushTimesManager
flushTimesPersistEvery time.Duration
maxBufferSize time.Duration
forcedFlushWindowSize time.Duration
clockOpts clock.Options
instrumentOpts instrument.Options
checkEvery time.Duration
jitterEnabled bool
maxJitterFn FlushJitterFn
workerPool sync.WorkerPool
placementManager PlacementManager
electionManager ElectionManager
flushTimesManager FlushTimesManager
maxBufferSize time.Duration
forcedFlushWindowSize time.Duration

bufferForPastTimedMetric time.Duration
}
@@ -251,16 +244,6 @@ func (o *flushManagerOptions) FlushTimesManager() FlushTimesManager {
return o.flushTimesManager
}

func (o *flushManagerOptions) SetFlushTimesPersistEvery(value time.Duration) FlushManagerOptions {
opts := *o
opts.flushTimesPersistEvery = value
return &opts
}

func (o *flushManagerOptions) FlushTimesPersistEvery() time.Duration {
return o.flushTimesPersistEvery
}

func (o *flushManagerOptions) SetMaxBufferSize(value time.Duration) FlushManagerOptions {
opts := *o
opts.maxBufferSize = value
2 changes: 1 addition & 1 deletion src/aggregator/aggregator/leader_flush_mgr.go
Original file line number Diff line number Diff line change
@@ -195,7 +195,7 @@ func (mgr *leaderFlushManager) persistFlushFn(buckets []*flushBucket) func() {

allShards := shards.All()
flushTimes := mgr.prepareFlushTimesWithLock(buckets, allShards)
if err := mgr.flushTimesManager.StoreAsync(flushTimes); err != nil {
if err := mgr.flushTimesManager.StoreSync(flushTimes); err != nil {
mgr.metrics.flushTimesUpdateError.Inc(1)
mgr.logger.Error("unable to store flush times", zap.Error(err))
}
12 changes: 4 additions & 8 deletions src/aggregator/aggregator/leader_flush_mgr_test.go
Original file line number Diff line number Diff line change
@@ -300,7 +300,7 @@ func TestLeaderFlushManagerPrepareWithFlushAndPersist(t *testing.T) {

flushTimesManager := NewMockFlushTimesManager(ctrl)
flushTimesManager.EXPECT().
StoreAsync(gomock.Any()).
StoreSync(gomock.Any()).
DoAndReturn(func(value *schema.ShardSetFlushTimes) error {
storeAsyncCount++
stored = value
@@ -309,9 +309,7 @@ func TestLeaderFlushManagerPrepareWithFlushAndPersist(t *testing.T) {
placementManager := NewMockPlacementManager(ctrl)
placementManager.EXPECT().Shards().Return(shard.NewShards(nil), nil).Times(2)

opts := NewFlushManagerOptions().
SetJitterEnabled(false).
SetFlushTimesPersistEvery(time.Second)
opts := NewFlushManagerOptions().SetJitterEnabled(false)

mgr := newLeaderFlushManager(doneCh, opts).(*leaderFlushManager)
mgr.nowFn = nowFn
@@ -354,7 +352,7 @@ func TestLeaderFlushManagerPrepareWithRedirectedShard(t *testing.T) {

flushTimesManager := NewMockFlushTimesManager(ctrl)
flushTimesManager.EXPECT().
StoreAsync(gomock.Any()).
StoreSync(gomock.Any()).
DoAndReturn(func(value *schema.ShardSetFlushTimes) error {
storeAsyncCount++
stored = value
@@ -365,9 +363,7 @@ func TestLeaderFlushManagerPrepareWithRedirectedShard(t *testing.T) {
Return(shard.NewShards([]shard.Shard{redirectedShard}), nil).
AnyTimes()

opts := NewFlushManagerOptions().
SetJitterEnabled(false).
SetFlushTimesPersistEvery(time.Second)
opts := NewFlushManagerOptions().SetJitterEnabled(false)

mgr := newLeaderFlushManager(doneCh, opts).(*leaderFlushManager)
mgr.nowFn = nowFn
9 changes: 4 additions & 5 deletions src/cmd/services/m3aggregator/config/aggregator.go
Original file line number Diff line number Diff line change
@@ -802,9 +802,9 @@ type flushManagerConfiguration struct {
// Number of workers per CPU.
NumWorkersPerCPU float64 `yaml:"numWorkersPerCPU" validate:"min=0.0,max=1.0"`

// How frequently the flush times are persisted.
// If unset or set to 0, this will update kv flush times after every flush.
FlushTimesPersistEvery time.Duration `yaml:"flushTimesPersistEvery"`
// DeprecatedFlushTimesPersistEvery controlled how often flush times were
// persisted, but is now deprecated.
DeprecatedFlushTimesPersistEvery time.Duration `yaml:"flushTimesPersistEvery"`

// Maximum buffer size.
MaxBufferSize time.Duration `yaml:"maxBufferSize"`
@@ -825,8 +825,7 @@ func (c flushManagerConfiguration) NewFlushManagerOptions(
SetPlacementManager(placementManager).
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesManager).
SetBufferForPastTimedMetric(bufferForPastTimedMetric).
SetFlushTimesPersistEvery(c.FlushTimesPersistEvery)
SetBufferForPastTimedMetric(bufferForPastTimedMetric)
if c.CheckEvery != 0 {
opts = opts.SetCheckEvery(c.CheckEvery)
}
7 changes: 3 additions & 4 deletions src/cmd/services/m3aggregator/config/defaults.go
Original file line number Diff line number Diff line change
@@ -274,10 +274,9 @@ var (
MaxJitterPercent: 0.25,
},
},
NumWorkersPerCPU: 0.5,
FlushTimesPersistEvery: 10 * time.Second,
MaxBufferSize: 5 * time.Minute,
ForcedFlushWindowSize: 10 * time.Second,
NumWorkersPerCPU: 0.5,
MaxBufferSize: 5 * time.Minute,
ForcedFlushWindowSize: 10 * time.Second,
},
Flush: handler.FlushConfiguration{
Handlers: []handler.FlushHandlerConfiguration{