Skip to content

Commit

Permalink
Implement a rateIntegrator struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
igorpeshansky committed Jul 28, 2024
1 parent e4a0026 commit 57b5017
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 25 deletions.
56 changes: 31 additions & 25 deletions receiver/dcgmreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type dcgmScraper struct {
mb *metadata.MetricsBuilder
// Aggregate cumulative values.
aggregates struct {
energyConsumptionFallback map[uint]float64 // ...from power usage rate.
pcieTxTotal map[uint]int64 // ...from pcie tx.
pcieRxTotal map[uint]int64 // ...from pcie rx.
nvlinkTxTotal map[uint]int64 // ...from nvlink tx.
nvlinkRxTotal map[uint]int64 // ...from nvlink rx.
energyConsumptionFallback *defaultMap[uint, *rateIntegrator[float64]] // ...from power usage rate.
pcieTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie tx.
pcieRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie rx.
nvlinkTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink tx.
nvlinkRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink rx.
}
}

Expand Down Expand Up @@ -81,17 +81,23 @@ func (s *dcgmScraper) initClient() error {
return nil
}

func newRateIntegrator[V int64 | float64]() *rateIntegrator[V] {
ri := new(rateIntegrator[V])
ri.Reset()
return ri
}

func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
startTime := pcommon.NewTimestampFromTime(time.Now())
mbConfig := metadata.DefaultMetricsBuilderConfig()
mbConfig.Metrics = s.config.Metrics
s.mb = metadata.NewMetricsBuilder(
mbConfig, s.settings, metadata.WithStartTime(startTime))
s.aggregates.energyConsumptionFallback = make(map[uint]float64)
s.aggregates.pcieTxTotal = make(map[uint]int64)
s.aggregates.pcieRxTotal = make(map[uint]int64)
s.aggregates.nvlinkTxTotal = make(map[uint]int64)
s.aggregates.nvlinkRxTotal = make(map[uint]int64)
s.aggregates.energyConsumptionFallback = newDefaultMap[uint](newRateIntegrator[float64])
s.aggregates.pcieTxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.pcieRxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkTxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkRxTotal = newDefaultMap[uint](newRateIntegrator[int64])

return nil
}
Expand Down Expand Up @@ -260,32 +266,32 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok {
pcieTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.pcieTxTotal[gpuIndex] += pcieTx /* delta to cumulative */
s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieTxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionTransmit)
s.aggregates.pcieTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTxTotal.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok {
pcieRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.pcieRxTotal[gpuIndex] += pcieRx /* delta to cumulative */
s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieRxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionReceive)
s.aggregates.pcieRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieRxTotal.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok {
nvlinkTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.nvlinkTxTotal[gpuIndex] += nvlinkTx /* delta to cumulative */
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkTxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionTransmit)
s.aggregates.nvlinkTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTxTotal.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok {
nvlinkRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.nvlinkRxTotal[gpuIndex] += nvlinkRx /* delta to cumulative */
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkRxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionReceive)
s.aggregates.nvlinkRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkRxTotal.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok {
energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
} else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback
powerUsage := metric.asFloat64() * (s.config.CollectionInterval.Seconds()) /* rate to delta */
s.aggregates.energyConsumptionFallback[gpuIndex] += powerUsage /* delta to cumulative */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, s.aggregates.energyConsumptionFallback[gpuIndex])
s.aggregates.energyConsumptionFallback.Get(gpuIndex).Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumptionFallback.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
}
if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok {
s.mb.RecordGpuDcgmTemperatureDataPoint(now, float64(metric.asInt64()))
Expand Down
52 changes: 52 additions & 0 deletions receiver/dcgmreceiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,62 @@ package dcgmreceiver

import (
"fmt"
"time"

"github.com/NVIDIA/go-dcgm/pkg/dcgm"
)

var nowUnixMicro = func() int64 { return time.Now().UnixNano() / 1e3 }

// rateIntegrator converts timestamped values that represent rates into
// cumulative values. It assumes the rate stays constant since the last
// timestamp.
type rateIntegrator[V int64 | float64] struct {
lastTimestamp int64
aggregatedRateUs V // the integration of the rate over microsecond timestamps.
}

func (ri *rateIntegrator[V]) Reset() {
ri.lastTimestamp = nowUnixMicro()
ri.aggregatedRateUs = V(0)
}

func (ri *rateIntegrator[V]) Update(ts int64, v V) {
// Drop stale points.
if ts <= ri.lastTimestamp {
return
}
// v is the rate per second, and timestamps are in microseconds, so the
// delta will be 1e6 times the actual increment.
ri.aggregatedRateUs += v * V(ts-ri.lastTimestamp)
ri.lastTimestamp = ts
}

func (ri *rateIntegrator[V]) Value() (int64, V) {
return ri.lastTimestamp, ri.aggregatedRateUs / V(1e6)
}

type defaultMap[K comparable, V any] struct {
m map[K]V
f func() V
}

func newDefaultMap[K comparable, V any](f func() V) *defaultMap[K, V] {
return &defaultMap[K, V]{
m: make(map[K]V),
f: f,
}
}

func (m *defaultMap[K, V]) Get(k K) V {
if v, ok := m.m[k]; ok {
return v
}
v := m.f()
m.m[k] = v
return v
}

var (
errBlankValue = fmt.Errorf("unspecified blank value")
errDataNotFound = fmt.Errorf("data not found")
Expand Down
84 changes: 84 additions & 0 deletions receiver/dcgmreceiver/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build gpu
// +build gpu

package dcgmreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func testRateIntegrator[V int64 | float64](t *testing.T) {
origNowUnixMicro := nowUnixMicro
nowUnixMicro = func() int64 { return 10 }
defer func() { nowUnixMicro = origNowUnixMicro }()

type P struct {
ts int64
v V
}
p := func(ts int64, v V) P { return P{ts, v} }

var ri rateIntegrator[V]

ri.Reset()
require.Equal(t, P{10, 0}, p(ri.Value()))
// Ensure updates affect aggregated values.
ri.Update(15, 1e6)
assert.Equal(t, P{15, 5}, p(ri.Value()))
// Ensure stale points are ignored.
ri.Update(12, 1e8)
assert.Equal(t, P{15, 5}, p(ri.Value()))
ri.Update(15, 1.e8)
assert.Equal(t, P{15, 5}, p(ri.Value()))
// Ensure updates affect aggregated values.
ri.Update(20, 2.e6)
assert.Equal(t, P{20, 15}, p(ri.Value()))
// Ensure zero rates don't change the aggregated value.
ri.Update(25, 0)
assert.Equal(t, P{25, 15}, p(ri.Value()))

// Ensure the value is cleared on reset.
ri.Reset()
assert.Equal(t, P{10, 0}, p(ri.Value()))
}

func TestRateIntegratorInt64(t *testing.T) {
testRateIntegrator[int64](t)
}

func TestRateIntegratorFloat64(t *testing.T) {
testRateIntegrator[float64](t)
}

func TestDefaultMap(t *testing.T) {
called := false
m := newDefaultMap[int, int64](func() int64 {
called = true
return 8
})
_, ok := m.m[3]
assert.False(t, ok)
assert.False(t, called)
v := m.Get(3)
assert.True(t, called)
assert.Equal(t, int64(8), v)
_, ok = m.m[3]
assert.True(t, ok)
}

0 comments on commit 57b5017

Please sign in to comment.