diff --git a/receiver/dcgmreceiver/scraper.go b/receiver/dcgmreceiver/scraper.go index 7929a4f53..36232fa8d 100644 --- a/receiver/dcgmreceiver/scraper.go +++ b/receiver/dcgmreceiver/scraper.go @@ -40,11 +40,11 @@ type dcgmScraper struct { mb *metadata.MetricsBuilder // Aggregate cumulative values. aggregates struct { - energyConsumptionFallback map[uint]float64 // ...from power usage rate. - pcieTxTotal map[uint]int64 // ...from pcie tx. - pcieRxTotal map[uint]int64 // ...from pcie rx. - nvlinkTxTotal map[uint]int64 // ...from nvlink tx. - nvlinkRxTotal map[uint]int64 // ...from nvlink rx. + energyConsumptionFallback *defaultMap[uint, *rateIntegrator[float64]] // ...from power usage rate. + pcieTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie tx. + pcieRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie rx. + nvlinkTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink tx. + nvlinkRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink rx. } } @@ -81,17 +81,23 @@ func (s *dcgmScraper) initClient() error { return nil } +func newRateIntegrator[V int64 | float64]() *rateIntegrator[V] { + ri := new(rateIntegrator[V]) + ri.Reset() + return ri +} + func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { startTime := pcommon.NewTimestampFromTime(time.Now()) mbConfig := metadata.DefaultMetricsBuilderConfig() mbConfig.Metrics = s.config.Metrics s.mb = metadata.NewMetricsBuilder( mbConfig, s.settings, metadata.WithStartTime(startTime)) - s.aggregates.energyConsumptionFallback = make(map[uint]float64) - s.aggregates.pcieTxTotal = make(map[uint]int64) - s.aggregates.pcieRxTotal = make(map[uint]int64) - s.aggregates.nvlinkTxTotal = make(map[uint]int64) - s.aggregates.nvlinkRxTotal = make(map[uint]int64) + s.aggregates.energyConsumptionFallback = newDefaultMap[uint](newRateIntegrator[float64]) + s.aggregates.pcieTxTotal = newDefaultMap[uint](newRateIntegrator[int64]) + s.aggregates.pcieRxTotal = newDefaultMap[uint](newRateIntegrator[int64]) + s.aggregates.nvlinkTxTotal = newDefaultMap[uint](newRateIntegrator[int64]) + s.aggregates.nvlinkRxTotal = newDefaultMap[uint](newRateIntegrator[int64]) return nil } @@ -260,32 +266,32 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok { - pcieTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.pcieTxTotal[gpuIndex] += pcieTx /* delta to cumulative */ - s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieTxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionTransmit) + s.aggregates.pcieTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) + _, pcieTx := s.aggregates.pcieTxTotal.Get(gpuIndex).Value() + s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok { - pcieRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.pcieRxTotal[gpuIndex] += pcieRx /* delta to cumulative */ - s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieRxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionReceive) + s.aggregates.pcieRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) + _, pcieRx := s.aggregates.pcieRxTotal.Get(gpuIndex).Value() + s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok { - nvlinkTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.nvlinkTxTotal[gpuIndex] += nvlinkTx /* delta to cumulative */ - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkTxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionTransmit) + s.aggregates.nvlinkTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) + _, nvlinkTx := s.aggregates.nvlinkTxTotal.Get(gpuIndex).Value() + s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok { - nvlinkRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.nvlinkRxTotal[gpuIndex] += nvlinkRx /* delta to cumulative */ - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkRxTotal[gpuIndex], metadata.AttributeNetworkIoDirectionReceive) + s.aggregates.nvlinkRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64()) + _, nvlinkRx := s.aggregates.nvlinkRxTotal.Get(gpuIndex).Value() + s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok { energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */ s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback - powerUsage := metric.asFloat64() * (s.config.CollectionInterval.Seconds()) /* rate to delta */ - s.aggregates.energyConsumptionFallback[gpuIndex] += powerUsage /* delta to cumulative */ - s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, s.aggregates.energyConsumptionFallback[gpuIndex]) + s.aggregates.energyConsumptionFallback.Get(gpuIndex).Update(metric.timestamp, metric.asFloat64()) + _, energyUsed := s.aggregates.energyConsumptionFallback.Get(gpuIndex).Value() + s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok { s.mb.RecordGpuDcgmTemperatureDataPoint(now, float64(metric.asInt64())) diff --git a/receiver/dcgmreceiver/util.go b/receiver/dcgmreceiver/util.go index 1f6de1233..973a76b86 100644 --- a/receiver/dcgmreceiver/util.go +++ b/receiver/dcgmreceiver/util.go @@ -19,10 +19,62 @@ package dcgmreceiver import ( "fmt" + "time" "github.com/NVIDIA/go-dcgm/pkg/dcgm" ) +var nowUnixMicro = func() int64 { return time.Now().UnixNano() / 1e3 } + +// rateIntegrator converts timestamped values that represent rates into +// cumulative values. It assumes the rate stays constant since the last +// timestamp. +type rateIntegrator[V int64 | float64] struct { + lastTimestamp int64 + aggregatedRateUs V // the integration of the rate over microsecond timestamps. +} + +func (ri *rateIntegrator[V]) Reset() { + ri.lastTimestamp = nowUnixMicro() + ri.aggregatedRateUs = V(0) +} + +func (ri *rateIntegrator[V]) Update(ts int64, v V) { + // Drop stale points. + if ts <= ri.lastTimestamp { + return + } + // v is the rate per second, and timestamps are in microseconds, so the + // delta will be 1e6 times the actual increment. + ri.aggregatedRateUs += v * V(ts-ri.lastTimestamp) + ri.lastTimestamp = ts +} + +func (ri *rateIntegrator[V]) Value() (int64, V) { + return ri.lastTimestamp, ri.aggregatedRateUs / V(1e6) +} + +type defaultMap[K comparable, V any] struct { + m map[K]V + f func() V +} + +func newDefaultMap[K comparable, V any](f func() V) *defaultMap[K, V] { + return &defaultMap[K, V]{ + m: make(map[K]V), + f: f, + } +} + +func (m *defaultMap[K, V]) Get(k K) V { + if v, ok := m.m[k]; ok { + return v + } + v := m.f() + m.m[k] = v + return v +} + var ( errBlankValue = fmt.Errorf("unspecified blank value") errDataNotFound = fmt.Errorf("data not found") diff --git a/receiver/dcgmreceiver/util_test.go b/receiver/dcgmreceiver/util_test.go new file mode 100644 index 000000000..6fab169c5 --- /dev/null +++ b/receiver/dcgmreceiver/util_test.go @@ -0,0 +1,84 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build gpu +// +build gpu + +package dcgmreceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testRateIntegrator[V int64 | float64](t *testing.T) { + origNowUnixMicro := nowUnixMicro + nowUnixMicro = func() int64 { return 10 } + defer func() { nowUnixMicro = origNowUnixMicro }() + + type P struct { + ts int64 + v V + } + p := func(ts int64, v V) P { return P{ts, v} } + + var ri rateIntegrator[V] + + ri.Reset() + require.Equal(t, P{10, 0}, p(ri.Value())) + // Ensure updates affect aggregated values. + ri.Update(15, 1e6) + assert.Equal(t, P{15, 5}, p(ri.Value())) + // Ensure stale points are ignored. + ri.Update(12, 1e8) + assert.Equal(t, P{15, 5}, p(ri.Value())) + ri.Update(15, 1.e8) + assert.Equal(t, P{15, 5}, p(ri.Value())) + // Ensure updates affect aggregated values. + ri.Update(20, 2.e6) + assert.Equal(t, P{20, 15}, p(ri.Value())) + // Ensure zero rates don't change the aggregated value. + ri.Update(25, 0) + assert.Equal(t, P{25, 15}, p(ri.Value())) + + // Ensure the value is cleared on reset. + ri.Reset() + assert.Equal(t, P{10, 0}, p(ri.Value())) +} + +func TestRateIntegratorInt64(t *testing.T) { + testRateIntegrator[int64](t) +} + +func TestRateIntegratorFloat64(t *testing.T) { + testRateIntegrator[float64](t) +} + +func TestDefaultMap(t *testing.T) { + called := false + m := newDefaultMap[int, int64](func() int64 { + called = true + return 8 + }) + _, ok := m.m[3] + assert.False(t, ok) + assert.False(t, called) + v := m.Get(3) + assert.True(t, called) + assert.Equal(t, int64(8), v) + _, ok = m.m[3] + assert.True(t, ok) +}