Skip to content

Commit

Permalink
Merge branch 'master' into nb/convert-prom
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Nov 21, 2021
2 parents 0a5f62e + 5051d07 commit 65a2017
Show file tree
Hide file tree
Showing 32 changed files with 867 additions and 351 deletions.
20 changes: 18 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
# Changelog

# Upcoming
# 1.4.1

## Bug Fixes
- **M3Coordinator**: Do not Close singleton MessageProcessors when closing connections. This fixes a panic introduced that affects M3Coordinator -> M3Aggregator communication. ([#3934](https://github.com/m3db/m3/pull/3934))

# 1.4.0

## Features
- **M3Query**: Add write endpoint support for M3-Map-Tags-JSON header in InfluxDB path ([#3816](https://github.com/m3db/m3/pull/3816))
- **M3Query**: Add support for `last_over_time` in M3Query engine ([#3884](https://github.com/m3db/m3/pull/3884))
- **M3Aggregator**: Add p75/p25 as aggregation options ([#3867](https://github.com/m3db/m3/pull/3867))

## Bug Fixes
- **M3DB**: Fix M3TSZ to be deterministic when encoding high precision values ([#3872](https://github.com/m3db/m3/pull/3872))
- **M3DB**: Gracefully handle reads including documents with stale index state ([#3905](https://github.com/m3db/m3/pull/3905))

## Performance
- **M3Aggregator**: Rework close and remove `persitFlushTimesEvery` semantics in leader flushing in favour of always persisting shard flush times on a successful flush for optimized graceful failovers. ([#3890](https://github.com/m3db/m3/pull/3890))
- **M3Aggregator**: Rework close and remove `persitFlushTimesEvery` semantics in leader flushing in favour of always persisting shard flush times on a successful flush for optimized graceful failovers ([#3890](https://github.com/m3db/m3/pull/3890))
- **M3DB**: Optimize `filesetFiles` function during bootstrapping for namespaces with long retentions to prevent CPU spikes ([#3900](https://github.com/m3db/m3/pull/3900))
- **M3DB**: Avoid loading blocks in memory for namespaces with snapshots disabled during bootstrapping to reduce memory usage ([#3919](https://github.com/m3db/m3/pull/3919))

# 1.3.0

Expand Down
57 changes: 25 additions & 32 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions src/aggregator/aggregator/elem_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ const (
)

var (
nan = math.NaN()
errElemClosed = errors.New("element is closed")
errAggregationClosed = errors.New("aggregation is closed")
errDuplicateForwardingSource = errors.New("duplicate forwarding source")
nan = math.NaN()
errElemClosed = errors.New("element is closed")
errAggregationClosed = errors.New("aggregation is closed")
errClosedBeforeResendEnabledMigration = errors.New("aggregation closed before resendEnabled migration")
errDuplicateForwardingSource = errors.New("duplicate forwarding source")
)

// isEarlierThanFn determines whether the timestamps of the metrics in a given
Expand All @@ -81,7 +82,6 @@ type timestampNanosFn func(windowStartNanos int64, resolution time.Duration) int
type createAggregationOptions struct {
// initSourceSet determines whether to initialize the source set.
initSourceSet bool
resendEnabled bool
}

// IDPrefixSuffixType configs if the id should be added with prefix or suffix
Expand Down Expand Up @@ -206,7 +206,7 @@ type consumeState struct {
prevStartTime xtime.UnixNano
// the dirty bit copied from the lockedAgg.
dirty bool
// copied from the timedAggregation
// the resendEnabled bit copied from the lockedAgg
resendEnabled bool
}

Expand All @@ -221,6 +221,10 @@ type flushState struct {
emittedValues []float64
// true if this aggregation has ever been flushed.
flushed bool
// true if the aggregation was flushed with resendEnabled. this is copied from the lockedAggregation at the time
// of flush. this value can change on a lockedAggregation while it's still open, so this only represents the state
// at the time of the last flush.
latestResendEnabled bool
}

var isDirty = func(state consumeState) bool {
Expand All @@ -236,7 +240,6 @@ func (f *flushState) close() {
type elemMetrics struct {
scope tally.Scope
updatedValues tally.Counter
retriedValues tally.Counter
flush map[flushKey]flushMetrics
mtx sync.RWMutex
}
Expand Down Expand Up @@ -364,7 +367,6 @@ func NewElemOptions(aggregatorOpts Options) ElemOptions {
aggregationOpts: raggregation.NewOptions(aggregatorOpts.InstrumentOptions()),
elemMetrics: &elemMetrics{
updatedValues: scope.Counter("updated-values"),
retriedValues: scope.Counter("retried-values"),
scope: scope,
flush: make(map[flushKey]flushMetrics),
},
Expand Down
17 changes: 9 additions & 8 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2909,9 +2909,10 @@ func TestExpireValues(t *testing.T) {

// Add test values.
for _, v := range test.values {
_, err := e.findOrCreate(int64(v), createAggregationOptions{
resendEnabled: test.resendEnabled,
})
_, err := e.findOrCreate(int64(v), createAggregationOptions{})
// need to manually seed the flush state since we don't call Consume(), which takes care of setting
// the flush state for expireValuesWithLock to use.
e.flushState[v] = flushState{latestResendEnabled: test.resendEnabled}
require.NoError(t, err)
}

Expand Down Expand Up @@ -3118,17 +3119,17 @@ func testGaugeElemWithData(
require.NoError(t, e.ResetSetData(data))
for i, aligned := range alignedstartAtNanos {
gauge := &lockedGaugeAggregation{
aggregation: newGaugeAggregation(raggregation.NewGauge(e.aggOpts)),
sourcesSeen: make(map[uint32]*bitset.BitSet),
aggregation: newGaugeAggregation(raggregation.NewGauge(e.aggOpts)),
sourcesSeen: make(map[uint32]*bitset.BitSet),
resendEnabled: resendEnabled,
}
gauge.dirty = true
// offset the timestamp by 1 so the gauge value can be updated using the aligned timestamp later.
gauge.aggregation.Update(time.Unix(0, aligned-1), gaugeVals[i], nil)
startAligned := xtime.UnixNano(aligned)
e.values[startAligned] = timedGauge{
startAt: startAligned,
lockedAgg: gauge,
resendEnabled: resendEnabled,
startAt: startAligned,
lockedAgg: gauge,
}
e.dirty = append(e.dirty, startAligned)
}
Expand Down
58 changes: 40 additions & 18 deletions src/aggregator/aggregator/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func newForwardedEntryMetrics(scope tally.Scope) forwardedEntryMetrics {

type entryMetrics struct {
resendEnabled tally.Counter
retriedValues tally.Counter
untimed untimedEntryMetrics
timed timedEntryMetrics
forwarded forwardedEntryMetrics
Expand All @@ -191,6 +192,7 @@ func NewEntryMetrics(scope tally.Scope) *entryMetrics {
forwardedEntryScope := scope.Tagged(map[string]string{"entry-type": "forwarded"})
return &entryMetrics{
resendEnabled: scope.Counter("resend-enabled"),
retriedValues: scope.Counter("retried-values"),
untimed: newUntimedEntryMetrics(untimedEntryScope),
timed: newTimedEntryMetrics(timedEntryScope),
forwarded: newForwardedEntryMetrics(forwardedEntryScope),
Expand Down Expand Up @@ -707,31 +709,51 @@ func (e *Entry) updateStagedMetadatasWithLock(
return nil
}

func (e *Entry) addUntimedWithLock(timestamp time.Time, mu unaggregated.MetricUnion) error {
func (e *Entry) addUntimedWithLock(serverTimestamp time.Time, mu unaggregated.MetricUnion) error {
var err error
for i := range e.aggregations {
ts := timestamp
resendEnabled := e.aggregations[i].resendEnabled
multierr.AppendInto(&err, e.addUntimedValueWithLock(
e.aggregations[i], serverTimestamp, mu, e.aggregations[i].resendEnabled, false))
}
return err
}

// addUntimedValueWithLock adds the untimed value to the aggregationValue.
// this method handles all the various cases of switching to use a client timestamp if resendEnabled is set for the
// rollup rule.
func (e *Entry) addUntimedValueWithLock(
aggValue aggregationValue,
serverTimestamp time.Time,
mu unaggregated.MetricUnion,
resendEnabled bool,
retry bool) error {
elem := aggValue.elem.Value.(metricElem)
resolution := aggValue.key.storagePolicy.Resolution().Window
if resendEnabled && mu.ClientTimeNanos > 0 {
// Migrate an originally untimed metric (server timestamp) to a "timed" metric (client timestamp) if
// resendEnabled is set on the rollup rule. Continuing to use untimed allows for a seamless transition since
// the Entry does not change.
if mu.ClientTimeNanos == 0 {
resendEnabled = false
e.metrics.resendEnabled.Inc(1)
err := e.checkTimestampForMetric(int64(mu.ClientTimeNanos), e.nowFn().UnixNano(), resolution)
if err != nil {
return err
}
if resendEnabled {
e.metrics.resendEnabled.Inc(1)
ts = mu.ClientTimeNanos.ToTime()
if multierr.AppendInto(
&err,
e.checkTimestampForMetric(
int64(mu.ClientTimeNanos),
e.nowFn().UnixNano(),
e.aggregations[i].key.storagePolicy.Resolution().Window),
) {
continue
}
err = elem.AddUnion(mu.ClientTimeNanos.ToTime(), mu, true)
if xerrors.Is(err, errClosedBeforeResendEnabledMigration) {
// this handles a race where the rule was just migrated to resendEnabled. if the client timestamp is
// delayed, most likely the aggregation has already been closed, since it did not previously have
// resendEnabled set. continue using the serverTimestamp and this will eventually resolve itself for future
// aggregations.
e.metrics.retriedValues.Inc(1)
return e.addUntimedValueWithLock(aggValue, serverTimestamp, mu, false, false)
}
multierr.AppendInto(&err, e.aggregations[i].elem.Value.(metricElem).AddUnion(ts, mu, resendEnabled))
return err
}
err := elem.AddUnion(serverTimestamp, mu, false)
if xerrors.Is(err, errAggregationClosed) && !retry {
// the aggregation just closed and we lost the race. roll the value into the next aggregation.
e.metrics.retriedValues.Inc(1)
return e.addUntimedValueWithLock(aggValue, serverTimestamp.Add(resolution), mu, false, true)
}
return err
}
Expand Down
Loading

0 comments on commit 65a2017

Please sign in to comment.