Skip to content

Commit

Permalink
[agg] Use timestamp (not start aligned) for expiring forward versions (
Browse files Browse the repository at this point in the history
…#3922)

The forward_writer uses a version map indexed by timestamp, which is not
start aligned. When the flusher passes the set of times to expire, it
needs to convert the start aligned time to a timestamp.

This manifests as a significant memory leak for sparse workloads where
the timestamp of N != start aligned time of N+1. In those cases the
versions are never GCd when they expire out of the buffer.
  • Loading branch information
ryanhall07 authored Nov 16, 2021
1 parent d2d4306 commit a4f7af1
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 12 deletions.
14 changes: 11 additions & 3 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ func TestCounterElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
require.False(t, e.Consume(ts, isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
verifyForwardedMetrics(t, expectedForwardedRes, *forwardRes)
expectedOnFlushedRes[0].expiredTimes = append(expectedOnFlushedRes[0].expiredTimes,
xtime.ToUnixNano(time.Unix(220, 0)), xtime.ToUnixNano(time.Unix(230, 0)))
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 1, len(e.values))
Expand All @@ -700,6 +702,7 @@ func TestCounterElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
expectedOnFlushedRes[0].expiredTimes = nil
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
Expand Down Expand Up @@ -1221,6 +1224,8 @@ func TestTimerElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
verifyForwardedMetrics(t, expectedForwardedRes, *forwardRes)
expectedOnFlushedRes[0].expiredTimes = append(expectedOnFlushedRes[0].expiredTimes,
xtime.ToUnixNano(time.Unix(220, 0)), xtime.ToUnixNano(time.Unix(230, 0)))
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 1, len(e.values))
Expand All @@ -1237,6 +1242,7 @@ func TestTimerElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
expectedOnFlushedRes[0].expiredTimes = nil
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
Expand Down Expand Up @@ -1966,6 +1972,8 @@ func TestGaugeElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
verifyForwardedMetrics(t, expectedForwardedRes, *forwardRes)
expectedOnFlushedRes[0].expiredTimes = append(expectedOnFlushedRes[0].expiredTimes,
xtime.ToUnixNano(time.Unix(220, 0)), xtime.ToUnixNano(time.Unix(230, 0)))
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 1, len(e.values))
Expand All @@ -1982,6 +1990,7 @@ func TestGaugeElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, standardMetricTargetNanos,
localFn, forwardFn, onForwardedFlushedFn, 0, consumeType))
expectedOnFlushedRes[0].expiredTimes = nil
verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
Expand Down Expand Up @@ -2949,6 +2958,7 @@ type testForwardedMetricWithMetadata struct {

type testOnForwardedFlushedData struct {
aggregationKey aggregationKey
expiredTimes []xtime.UnixNano
}

func testFlushLocalMetricFn() (
Expand Down Expand Up @@ -3010,6 +3020,7 @@ func testOnForwardedFlushedFn() (
) {
result = append(result, testOnForwardedFlushedData{
aggregationKey: aggregationKey,
expiredTimes: expiredTimes,
})
}, &result
}
Expand Down Expand Up @@ -3275,5 +3286,9 @@ func verifyOnForwardedFlushResult(t *testing.T, expected, actual []testOnForward
require.Equal(t, len(expected), len(actual))
for i := 0; i < len(expected); i++ {
require.True(t, expected[i].aggregationKey.Equal(actual[i].aggregationKey))
require.Equal(t, len(expected[i].expiredTimes), len(actual[i].expiredTimes))
for j := 0; j < len(expected[i].expiredTimes); j++ {
require.Equal(t, expected[i].expiredTimes[j], actual[i].expiredTimes[j])
}
}
}
14 changes: 11 additions & 3 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ type GenericElem struct {
dirty []xtime.UnixNano

// internal/no need for synchronization: small buffers to avoid memory allocations during consumption
toConsume []consumeState
flushStateToExpire []xtime.UnixNano
toConsume []consumeState
flushStateToExpire []xtime.UnixNano
forwardTimesToExpire []xtime.UnixNano
// end internal state

// min time in the values map. allows for iterating through map.
Expand Down Expand Up @@ -460,7 +461,14 @@ func (e *GenericElem) Consume(

if e.parsedPipeline.HasRollup {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
onForwardedFlushedFn(e.onForwardedAggregationWrittenFn, forwardedAggregationKey, e.flushStateToExpire)
e.forwardTimesToExpire = e.forwardTimesToExpire[:0]
for _, startTime := range e.flushStateToExpire {
// the forward writer uses the timestamp of the aggregation, so need to convert the start aligned time
// to a timestamp.
e.forwardTimesToExpire = append(e.forwardTimesToExpire,
xtime.UnixNano(timestampNanosFn(int64(startTime), resolution)))
}
onForwardedFlushedFn(e.onForwardedAggregationWrittenFn, forwardedAggregationKey, e.forwardTimesToExpire)
}

return canCollect
Expand Down
14 changes: 11 additions & 3 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a4f7af1

Please sign in to comment.