Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Update flush times after each flush #3890

Merged
merged 30 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d16d7a5
[aggregator] Update flush times after each flush
arnikola Oct 29, 2021
d5e2dd8
Merge branch 'master' into arnikola/update-flush
arnikola Nov 4, 2021
b92ad82
Fix up tests, deprecate the flushTimesPersistEvery option
arnikola Nov 4, 2021
3eb513d
Update tests
arnikola Nov 4, 2021
50e6859
Excise remaining FlushTimesPersistEvery
arnikola Nov 4, 2021
24b9e81
Fix tests, lint
arnikola Nov 4, 2021
b0caeaf
Filtering through eager shutdown flag for back-compat
arnikola Nov 4, 2021
4c02198
Add timeout metric for alerting
arnikola Nov 4, 2021
8694a72
lint
arnikola Nov 4, 2021
3e2040b
Merge branch 'master' into arnikola/update-flush
arnikola Nov 4, 2021
c3b92b7
Always eager close aggregator
arnikola Nov 4, 2021
d9e5191
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 4, 2021
6537fa7
Remove logging
arnikola Nov 4, 2021
7b42594
Re-add FlushTimesPersistEvery option
arnikola Nov 5, 2021
284a47c
PR response
arnikola Nov 5, 2021
932a5ca
Do not have a default defaultFlushTimesPersistEvery, to match config
arnikola Nov 5, 2021
ec4100f
Touchups for clarity/saner defaults
arnikola Nov 5, 2021
6164727
Re-add param (oops)
arnikola Nov 5, 2021
5040f33
responsible
arnikola Nov 5, 2021
cf0c3bb
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
aefd482
Close
arnikola Nov 5, 2021
40346ba
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 5, 2021
c282209
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
9ba8ff0
Merge branch 'arnikola/update-flush' of github.com:m3db/m3 into arnik…
arnikola Nov 5, 2021
6152b61
response
arnikola Nov 5, 2021
666503f
wip
arnikola Nov 5, 2021
a29e989
Test cleanup
arnikola Nov 5, 2021
278d7d2
Response
arnikola Nov 5, 2021
8b480d3
Merge branch 'master' into arnikola/update-flush
arnikola Nov 5, 2021
86dc55d
Merge branch 'master' into arnikola/update-flush
arnikola Nov 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

# Upcoming

## Performance
- **M3Aggregator**: Rework close and remove `persitFlushTimesEvery` semantics in leader flushing in favour of always persisting shard flush times on a successful flush for optimized graceful failovers. ([#3890](https://github.com/m3db/m3/pull/3890))

# 1.3.0

## Features
Expand Down
3 changes: 1 addition & 2 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ aggregator:
maxJitterPercent: 0.5
- flushInterval: 1h
maxJitterPercent: 0.25
numWorkersPerCPU: 0.5
flushTimesPersistEvery: 10s
arnikola marked this conversation as resolved.
Show resolved Hide resolved
numWorkersPerCPU: 0.5
maxBufferSize: 5m
forcedFlushWindowSize: 10s
flush:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ aggregator:
maxJitterPercent: 0.5
- flushInterval: 1h
maxJitterPercent: 0.25
numWorkersPerCPU: 0.5
flushTimesPersistEvery: 10s
numWorkersPerCPU: 0.5
maxBufferSize: 5m
forcedFlushWindowSize: 10s
flush:
Expand Down
125 changes: 24 additions & 101 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package aggregator
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
Expand Down Expand Up @@ -94,20 +93,6 @@ type Aggregator interface {
Close() error
}

type tickShardFn func(
shard *aggregatorShard,
perShardTickDuration time.Duration,
doneCh <-chan struct{},
) tickResult

func tickShard(
shard *aggregatorShard,
perShardTickDuration time.Duration,
doneCh <-chan struct{},
) tickResult {
return shard.Tick(perShardTickDuration, doneCh)
}

// aggregator stores aggregations of different types of metrics (e.g., counter,
// timer, gauges) and periodically flushes them out.
type aggregator struct {
Expand All @@ -116,7 +101,6 @@ type aggregator struct {
opts Options
nowFn clock.NowFn
shardFn sharding.ShardFn
tickShardFn tickShardFn
checkInterval time.Duration
placementManager PlacementManager
flushTimesManager FlushTimesManager
Expand All @@ -135,8 +119,7 @@ type aggregator struct {
currPlacement placement.Placement
currNumShards atomic.Int32
state aggregatorState
doneCh chan struct{}
wg sync.WaitGroup
sleepFn sleepFn
shardsPendingClose atomic.Int32
metrics aggregatorMetrics
logger *zap.Logger
Expand All @@ -163,10 +146,9 @@ func NewAggregator(opts Options) Aggregator {
passthroughWriter: opts.PassthroughWriter(),
adminClient: opts.AdminClient(),
resignTimeout: opts.ResignTimeout(),
doneCh: make(chan struct{}),
sleepFn: time.Sleep,
metrics: newAggregatorMetrics(scope, timerOpts, opts.MaxAllowedForwardingDelayFn()),
logger: logger,
tickShardFn: tickShard,
}

return agg
Expand All @@ -190,19 +172,21 @@ func (agg *aggregator) Open() error {
return err
}
if agg.checkInterval > 0 {
agg.wg.Add(1)
go agg.tick()
}

agg.wg.Add(1)
// NB: placement tick watches the placement manager, and initializes a
// topology change if the placement is updated. This changes which shards this
// aggregator is responsible for, and initiates leader elections. In the
// scenario where a placement change is received when this aggregator is
// closed, it's fine to ignore the result of the placement update, as applying
// the change only affects the current aggregator that is being closed anyway.
go agg.placementTick()
arnikola marked this conversation as resolved.
Show resolved Hide resolved
agg.state = aggregatorOpen
return nil
}

func (agg *aggregator) placementTick() {
defer agg.wg.Done()

ticker := time.NewTicker(placementCheckInterval)
defer ticker.Stop()

Expand All @@ -212,8 +196,6 @@ func (agg *aggregator) placementTick() {
select {
case <-ticker.C:
case <-agg.placementManager.C():
case <-agg.doneCh:
return
}

placement, err := agg.placementManager.Placement()
Expand Down Expand Up @@ -402,53 +384,12 @@ func (agg *aggregator) Close() error {
}
agg.state = aggregatorClosed

var (
lastOpCompleted = time.Now()
closeLogger = agg.logger.With(zap.String("closing", "aggregator"))

logCloseOperation = func(op string) {
currTime := time.Now()
closeLogger.Debug(fmt.Sprintf("closed %s", op),
zap.String("took", currTime.Sub(lastOpCompleted).String()))
lastOpCompleted = currTime
}
)

closeLogger.Info("signaling aggregator done")
close(agg.doneCh)

// Waiting for the ticking goroutines to return.
// Doing this outside of agg.Lock to avoid potential deadlocks.
agg.Unlock()
agg.wg.Wait()
agg.Lock()

logCloseOperation("ticking wait groups")
for _, shardID := range agg.shardIDs {
agg.shards[shardID].Close()
}

logCloseOperation("aggregator shards")
if agg.shardSetOpen {
agg.closeShardSetWithLock()
}

logCloseOperation("flush shard sets")

agg.flushHandler.Close()
logCloseOperation("flush handler")

agg.passthroughWriter.Close()
logCloseOperation("passthrough writer")

if agg.adminClient != nil {
closeLogger.Info("closing admin client")
agg.adminClient.Close()
logCloseOperation("admin client")
}

closeLogger.Info("done")
return nil
// NB: closing the flush manager is the only really necessary step for
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice comment!

// gracefully closing an aggregator leader, as this will ensure that any
// currently running flush completes, and updates the shared shard flush
// times map in etcd, allowing the follower that will be promoted to leader
// to avoid re-computing and re-flushing this data.
return agg.flushManager.Close()
}

func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
Expand Down Expand Up @@ -486,7 +427,8 @@ func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
func (agg *aggregator) processPlacementWithLock(
newPlacement placement.Placement,
) error {
// If someone has already processed the placement ahead of us, do nothing.
// If someone has already processed the placement ahead of us, or if the
// aggregator was closed before the placement update started, do nothing.
if !agg.shouldProcessPlacementWithLock(newPlacement) {
return nil
}
Expand Down Expand Up @@ -533,6 +475,10 @@ func (agg *aggregator) processPlacementWithLock(
func (agg *aggregator) shouldProcessPlacementWithLock(
newPlacement placement.Placement,
) bool {
if agg.state == aggregatorClosed {
return false
}

// If there is no placement yet, or the placement has been updated,
// process this placement.
if agg.currPlacement == nil || agg.currPlacement != newPlacement {
Expand Down Expand Up @@ -757,15 +703,8 @@ func (agg *aggregator) closeShardsAsync(shards []*aggregatorShard) {
}

func (agg *aggregator) tick() {
defer agg.wg.Done()

for {
select {
case <-agg.doneCh:
return
default:
agg.tickInternal()
}
agg.tickInternal()
arnikola marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -777,7 +716,7 @@ func (agg *aggregator) tickInternal() {
agg.metrics.shards.owned.Update(float64(numShards))
agg.metrics.shards.pendingClose.Update(float64(agg.shardsPendingClose.Load()))
if numShards == 0 {
agg.waitForInterval(agg.checkInterval)
agg.sleepFn(agg.checkInterval)
return
}
var (
Expand All @@ -786,29 +725,13 @@ func (agg *aggregator) tickInternal() {
tickResult tickResult
)
for _, shard := range ownedShards {
select {
// NB: if doneCh has been signaled, no need to continue ticking shards and
// it's valid to early abort ticking.
case <-agg.doneCh:
agg.logger.Info("recevied interrupt on tick; aborting")
return
default:
}

shardTickResult := agg.tickShardFn(shard, perShardTickDuration, agg.doneCh)
shardTickResult := shard.Tick(perShardTickDuration)
tickResult = tickResult.merge(shardTickResult)
}
tickDuration := agg.nowFn().Sub(start)
agg.metrics.tick.Report(tickResult, tickDuration)
if tickDuration < agg.checkInterval {
agg.waitForInterval(agg.checkInterval - tickDuration)
}
}

func (agg *aggregator) waitForInterval(wait time.Duration) {
select {
case <-agg.doneCh:
case <-time.After(wait):
agg.sleepFn(agg.checkInterval - tickDuration)
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/aggregator/aggregator/aggregator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 0 additions & 38 deletions src/aggregator/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,44 +841,6 @@ func TestAggregatorTick(t *testing.T) {
require.NoError(t, agg.Close())
}

func TestAggregatorTickCancelled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

flushTimesManager := NewMockFlushTimesManager(ctrl)
flushTimesManager.EXPECT().Reset().Return(nil).AnyTimes()
flushTimesManager.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes()
flushTimesManager.EXPECT().Get().Return(nil, nil).AnyTimes()
flushTimesManager.EXPECT().Close().Return(nil).AnyTimes()

agg, _ := testAggregator(t, ctrl)
agg.flushTimesManager = flushTimesManager
require.NoError(t, agg.Open())

var (
tickedCh = make(chan struct{})
numTicked = 0
doneAfterTicks = 2
)

agg.tickShardFn = func(*aggregatorShard, time.Duration, <-chan struct{}) tickResult {
numTicked++
if doneAfterTicks == 2 {
close(tickedCh)
}

time.Sleep(time.Millisecond * 50)
return tickResult{}
}

go func() {
<-tickedCh
require.NoError(t, agg.Close())
}()

require.Equal(t, 2, doneAfterTicks)
}

func TestAggregatorShardSetNotOpenNilInstance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
28 changes: 12 additions & 16 deletions src/aggregator/aggregator/flush_mgr_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ import (
)

const (
defaultCheckEvery = time.Second
defaultJitterEnabled = true
defaultFlushTimesPersistEvery = 10 * time.Second
defaultMaxBufferSize = 5 * time.Minute
defaultForcedFlushWindowSize = 10 * time.Second
defaultCheckEvery = time.Second
defaultJitterEnabled = true
defaultMaxBufferSize = 5 * time.Minute
defaultForcedFlushWindowSize = 10 * time.Second
)

var (
defaultWorkerPoolSize = int(math.Max(float64(runtime.GOMAXPROCS(0)/8), 1.0))
)
var defaultWorkerPoolSize = int(math.Max(float64(runtime.GOMAXPROCS(0)/8), 1.0))

// FlushJitterFn determines the jitter based on the flush interval.
type FlushJitterFn func(flushInterval time.Duration) time.Duration
Expand Down Expand Up @@ -152,14 +149,13 @@ func NewFlushManagerOptions() FlushManagerOptions {
workerPool := sync.NewWorkerPool(defaultWorkerPoolSize)
workerPool.Init()
return &flushManagerOptions{
clockOpts: clock.NewOptions(),
instrumentOpts: instrument.NewOptions(),
checkEvery: defaultCheckEvery,
jitterEnabled: defaultJitterEnabled,
workerPool: workerPool,
flushTimesPersistEvery: defaultFlushTimesPersistEvery,
maxBufferSize: defaultMaxBufferSize,
forcedFlushWindowSize: defaultForcedFlushWindowSize,
clockOpts: clock.NewOptions(),
instrumentOpts: instrument.NewOptions(),
checkEvery: defaultCheckEvery,
jitterEnabled: defaultJitterEnabled,
workerPool: workerPool,
maxBufferSize: defaultMaxBufferSize,
forcedFlushWindowSize: defaultForcedFlushWindowSize,

bufferForPastTimedMetric: defaultTimedMetricBuffer,
}
Expand Down
Loading