From eef47299f471b63425e9ce08ca8d7c0d21e6236f Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 26 Jul 2024 19:53:21 -0400 Subject: [PATCH] Implement a rateIntegrator struct. --- receiver/dcgmreceiver/scraper.go | 50 ++++++------- receiver/dcgmreceiver/util.go | 32 +++++++++ receiver/dcgmreceiver/util_test.go | 111 +++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 25 deletions(-) create mode 100644 receiver/dcgmreceiver/util_test.go diff --git a/receiver/dcgmreceiver/scraper.go b/receiver/dcgmreceiver/scraper.go index 69fd033bf..e6daeda28 100644 --- a/receiver/dcgmreceiver/scraper.go +++ b/receiver/dcgmreceiver/scraper.go @@ -40,11 +40,11 @@ type dcgmScraper struct { mb *metadata.MetricsBuilder // Aggregate cumulative values. aggregates struct { - energyConsumptionFallback float64 // ...from power usage rate. - pcieTxTotal int64 // ...from pcie tx. - pcieRxTotal int64 // ...from pcie rx. - nvlinkTxTotal int64 // ...from nvlink tx. - nvlinkRxTotal int64 // ...from nvlink rx. + energyConsumptionFallback rateIntegrator[float64] // ...from power usage rate. + pcieTxTotal rateIntegrator[int64] // ...from pcie tx. + pcieRxTotal rateIntegrator[int64] // ...from pcie rx. + nvlinkTxTotal rateIntegrator[int64] // ...from nvlink tx. + nvlinkRxTotal rateIntegrator[int64] // ...from nvlink rx. } } @@ -87,11 +87,11 @@ func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { mbConfig.Metrics = s.config.Metrics s.mb = metadata.NewMetricsBuilder( mbConfig, s.settings, metadata.WithStartTime(startTime)) - s.aggregates.energyConsumptionFallback = 0.0 - s.aggregates.pcieTxTotal = 0 - s.aggregates.pcieRxTotal = 0 - s.aggregates.nvlinkTxTotal = 0 - s.aggregates.nvlinkRxTotal = 0 + s.aggregates.energyConsumptionFallback.Reset() + s.aggregates.pcieTxTotal.Reset() + s.aggregates.pcieRxTotal.Reset() + s.aggregates.nvlinkTxTotal.Reset() + s.aggregates.nvlinkRxTotal.Reset() return nil } @@ -260,32 +260,32 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok { - pcieTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.pcieTxTotal += pcieTx /* delta to cumulative */ - s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieTxTotal, metadata.AttributeNetworkIoDirectionTransmit) + s.aggregates.pcieTxTotal.Update(metric.timestamp, metric.asInt64()) + _, pcieTx := s.aggregates.pcieTxTotal.Value() + s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok { - pcieRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.pcieRxTotal += pcieRx /* delta to cumulative */ - s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieRxTotal, metadata.AttributeNetworkIoDirectionReceive) + s.aggregates.pcieRxTotal.Update(metric.timestamp, metric.asInt64()) + _, pcieRx := s.aggregates.pcieRxTotal.Value() + s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok { - nvlinkTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.nvlinkTxTotal += nvlinkTx /* delta to cumulative */ - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkTxTotal, metadata.AttributeNetworkIoDirectionTransmit) + s.aggregates.nvlinkTxTotal.Update(metric.timestamp, metric.asInt64()) + _, nvlinkTx := s.aggregates.nvlinkTxTotal.Value() + s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok { - nvlinkRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */ - s.aggregates.nvlinkRxTotal += nvlinkRx /* delta to cumulative */ - s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkRxTotal, metadata.AttributeNetworkIoDirectionReceive) + s.aggregates.nvlinkRxTotal.Update(metric.timestamp, metric.asInt64()) + _, nvlinkRx := s.aggregates.nvlinkRxTotal.Value() + s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok { energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */ s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback - powerUsage := metric.asFloat64() * (s.config.CollectionInterval.Seconds()) /* rate to delta */ - s.aggregates.energyConsumptionFallback += powerUsage /* delta to cumulative */ - s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, s.aggregates.energyConsumptionFallback) + s.aggregates.energyConsumptionFallback.Update(metric.timestamp, metric.asFloat64()) + _, energyUsed := s.aggregates.energyConsumptionFallback.Value() + s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok { s.mb.RecordGpuDcgmTemperatureDataPoint(now, float64(metric.asInt64())) diff --git a/receiver/dcgmreceiver/util.go b/receiver/dcgmreceiver/util.go index 1f6de1233..1615ffde6 100644 --- a/receiver/dcgmreceiver/util.go +++ b/receiver/dcgmreceiver/util.go @@ -19,10 +19,42 @@ package dcgmreceiver import ( "fmt" + "time" "github.com/NVIDIA/go-dcgm/pkg/dcgm" ) +var nowUnixMicro = func() int64 { return time.Now().UnixNano() / 1e3 } + +// rateIntegrator converts timestamped values that represent rates into +// cumulative values. It assumes the rate stays constant since the last +// timestamp. +type rateIntegrator[V int64 | float64] struct { + lastTimestamp int64 + aggregatedRateUs V // the integration of the rate over microsecond timestamps. +} + +func (i *rateIntegrator[V]) Reset() { + i.lastTimestamp = nowUnixMicro() + i.aggregatedRateUs = V(0) +} + +func (i *rateIntegrator[V]) Update(ts int64, v V) { + // Drop stale points. + if ts <= i.lastTimestamp { + return + } + // v is the rate per second, and timestamps are in microseconds, so the + // delta will be 1e6 times the actual increment. + i.aggregatedRateUs += v * V(ts-i.lastTimestamp) + i.lastTimestamp = ts +} + +func (i *rateIntegrator[V]) Value() (int64, V) { + return i.lastTimestamp, i.aggregatedRateUs / V(1e6) +} + + var ( errBlankValue = fmt.Errorf("unspecified blank value") errDataNotFound = fmt.Errorf("data not found") diff --git a/receiver/dcgmreceiver/util_test.go b/receiver/dcgmreceiver/util_test.go new file mode 100644 index 000000000..2cb5cf2e0 --- /dev/null +++ b/receiver/dcgmreceiver/util_test.go @@ -0,0 +1,111 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build gpu +// +build gpu + +package dcgmreceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateIntegratorInt64(t *testing.T) { + origNowUnixMicro := nowUnixMicro + nowUnixMicro = func() int64 { return 10 } + defer func() { nowUnixMicro = origNowUnixMicro }() + + var i rateIntegrator[int64] + + i.Reset() + ts, iv := i.Value() + assert.Equal(t, int64(10), ts) + assert.Equal(t, int64(0), iv) + // Ensure updates affect aggregated values. + i.Update(15, 1e6) + ts, iv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, int64(5), iv) + // Ensure stale points are ignored. + i.Update(12, 1e8) + ts, iv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, int64(5), iv) + i.Update(15, 1e8) + ts, iv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, int64(5), iv) + // Ensure updates affect aggregated values. + i.Update(20, 2e6) + ts, iv = i.Value() + require.Equal(t, int64(20), ts) + require.Equal(t, int64(15), iv) + // Ensure zero rates don't change the aggregated value. + i.Update(25, 0) + ts, iv = i.Value() + require.Equal(t, int64(25), ts) + require.Equal(t, int64(15), iv) + + // Ensure the value is cleared on reset. + i.Reset() + ts, iv = i.Value() + assert.Equal(t, int64(10), ts) + assert.Equal(t, int64(0), iv) +} + +func TestRateIntegratorFloat64(t *testing.T) { + origNowUnixMicro := nowUnixMicro + nowUnixMicro = func() int64 { return 10 } + defer func() { nowUnixMicro = origNowUnixMicro }() + + var i rateIntegrator[float64] + + i.Reset() + ts, fv := i.Value() + assert.Equal(t, int64(10), ts) + assert.Equal(t, float64(0), fv) + // Ensure updates affect aggregated values. + i.Update(15, 1.e6) + ts, fv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, float64(5), fv) + // Ensure stale points are ignored. + i.Update(12, 1.e8) + ts, fv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, float64(5), fv) + i.Update(15, 1.e8) + ts, fv = i.Value() + require.Equal(t, int64(15), ts) + require.Equal(t, float64(5), fv) + // Ensure updates affect aggregated values. + i.Update(20, 2.e6) + ts, fv = i.Value() + require.Equal(t, int64(20), ts) + require.Equal(t, float64(15), fv) + // Ensure zero rates don't change the aggregated value. + i.Update(25, 0) + ts, fv = i.Value() + require.Equal(t, int64(25), ts) + require.Equal(t, float64(15), fv) + + // Ensure the value is cleared on reset. + i.Reset() + ts, fv = i.Value() + assert.Equal(t, int64(10), ts) + assert.Equal(t, float64(0), fv) +}