Skip to content

Commit

Permalink
Implement a rateIntegrator struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
igorpeshansky committed Jul 27, 2024
1 parent 1488eed commit eef4729
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 25 deletions.
50 changes: 25 additions & 25 deletions receiver/dcgmreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type dcgmScraper struct {
mb *metadata.MetricsBuilder
// Aggregate cumulative values.
aggregates struct {
energyConsumptionFallback float64 // ...from power usage rate.
pcieTxTotal int64 // ...from pcie tx.
pcieRxTotal int64 // ...from pcie rx.
nvlinkTxTotal int64 // ...from nvlink tx.
nvlinkRxTotal int64 // ...from nvlink rx.
energyConsumptionFallback rateIntegrator[float64] // ...from power usage rate.
pcieTxTotal rateIntegrator[int64] // ...from pcie tx.
pcieRxTotal rateIntegrator[int64] // ...from pcie rx.
nvlinkTxTotal rateIntegrator[int64] // ...from nvlink tx.
nvlinkRxTotal rateIntegrator[int64] // ...from nvlink rx.
}
}

Expand Down Expand Up @@ -87,11 +87,11 @@ func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
mbConfig.Metrics = s.config.Metrics
s.mb = metadata.NewMetricsBuilder(
mbConfig, s.settings, metadata.WithStartTime(startTime))
s.aggregates.energyConsumptionFallback = 0.0
s.aggregates.pcieTxTotal = 0
s.aggregates.pcieRxTotal = 0
s.aggregates.nvlinkTxTotal = 0
s.aggregates.nvlinkRxTotal = 0
s.aggregates.energyConsumptionFallback.Reset()
s.aggregates.pcieTxTotal.Reset()
s.aggregates.pcieRxTotal.Reset()
s.aggregates.nvlinkTxTotal.Reset()
s.aggregates.nvlinkRxTotal.Reset()

return nil
}
Expand Down Expand Up @@ -260,32 +260,32 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok {
pcieTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.pcieTxTotal += pcieTx /* delta to cumulative */
s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieTxTotal, metadata.AttributeNetworkIoDirectionTransmit)
s.aggregates.pcieTxTotal.Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTxTotal.Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok {
pcieRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.pcieRxTotal += pcieRx /* delta to cumulative */
s.mb.RecordGpuDcgmPcieIoDataPoint(now, s.aggregates.pcieRxTotal, metadata.AttributeNetworkIoDirectionReceive)
s.aggregates.pcieRxTotal.Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieRxTotal.Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok {
nvlinkTx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.nvlinkTxTotal += nvlinkTx /* delta to cumulative */
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkTxTotal, metadata.AttributeNetworkIoDirectionTransmit)
s.aggregates.nvlinkTxTotal.Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTxTotal.Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok {
nvlinkRx := int64(float64(metric.asInt64()) * (s.config.CollectionInterval.Seconds())) /* rate to delta */
s.aggregates.nvlinkRxTotal += nvlinkRx /* delta to cumulative */
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, s.aggregates.nvlinkRxTotal, metadata.AttributeNetworkIoDirectionReceive)
s.aggregates.nvlinkRxTotal.Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkRxTotal.Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok {
energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
} else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback
powerUsage := metric.asFloat64() * (s.config.CollectionInterval.Seconds()) /* rate to delta */
s.aggregates.energyConsumptionFallback += powerUsage /* delta to cumulative */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, s.aggregates.energyConsumptionFallback)
s.aggregates.energyConsumptionFallback.Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumptionFallback.Value()
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
}
if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok {
s.mb.RecordGpuDcgmTemperatureDataPoint(now, float64(metric.asInt64()))
Expand Down
32 changes: 32 additions & 0 deletions receiver/dcgmreceiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,42 @@ package dcgmreceiver

import (
"fmt"
"time"

"github.com/NVIDIA/go-dcgm/pkg/dcgm"
)

var nowUnixMicro = func() int64 { return time.Now().UnixNano() / 1e3 }

// rateIntegrator converts timestamped values that represent rates into
// cumulative values. It assumes the rate stays constant since the last
// timestamp.
type rateIntegrator[V int64 | float64] struct {
lastTimestamp int64
aggregatedRateUs V // the integration of the rate over microsecond timestamps.
}

func (i *rateIntegrator[V]) Reset() {
i.lastTimestamp = nowUnixMicro()
i.aggregatedRateUs = V(0)
}

func (i *rateIntegrator[V]) Update(ts int64, v V) {
// Drop stale points.
if ts <= i.lastTimestamp {
return
}
// v is the rate per second, and timestamps are in microseconds, so the
// delta will be 1e6 times the actual increment.
i.aggregatedRateUs += v * V(ts-i.lastTimestamp)
i.lastTimestamp = ts
}

func (i *rateIntegrator[V]) Value() (int64, V) {
return i.lastTimestamp, i.aggregatedRateUs / V(1e6)
}


var (
errBlankValue = fmt.Errorf("unspecified blank value")
errDataNotFound = fmt.Errorf("data not found")
Expand Down
111 changes: 111 additions & 0 deletions receiver/dcgmreceiver/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build gpu
// +build gpu

package dcgmreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRateIntegratorInt64(t *testing.T) {
origNowUnixMicro := nowUnixMicro
nowUnixMicro = func() int64 { return 10 }
defer func() { nowUnixMicro = origNowUnixMicro }()

var i rateIntegrator[int64]

i.Reset()
ts, iv := i.Value()
assert.Equal(t, int64(10), ts)
assert.Equal(t, int64(0), iv)
// Ensure updates affect aggregated values.
i.Update(15, 1e6)
ts, iv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, int64(5), iv)
// Ensure stale points are ignored.
i.Update(12, 1e8)
ts, iv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, int64(5), iv)
i.Update(15, 1e8)
ts, iv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, int64(5), iv)
// Ensure updates affect aggregated values.
i.Update(20, 2e6)
ts, iv = i.Value()
require.Equal(t, int64(20), ts)
require.Equal(t, int64(15), iv)
// Ensure zero rates don't change the aggregated value.
i.Update(25, 0)
ts, iv = i.Value()
require.Equal(t, int64(25), ts)
require.Equal(t, int64(15), iv)

// Ensure the value is cleared on reset.
i.Reset()
ts, iv = i.Value()
assert.Equal(t, int64(10), ts)
assert.Equal(t, int64(0), iv)
}

func TestRateIntegratorFloat64(t *testing.T) {
origNowUnixMicro := nowUnixMicro
nowUnixMicro = func() int64 { return 10 }
defer func() { nowUnixMicro = origNowUnixMicro }()

var i rateIntegrator[float64]

i.Reset()
ts, fv := i.Value()
assert.Equal(t, int64(10), ts)
assert.Equal(t, float64(0), fv)
// Ensure updates affect aggregated values.
i.Update(15, 1.e6)
ts, fv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, float64(5), fv)
// Ensure stale points are ignored.
i.Update(12, 1.e8)
ts, fv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, float64(5), fv)
i.Update(15, 1.e8)
ts, fv = i.Value()
require.Equal(t, int64(15), ts)
require.Equal(t, float64(5), fv)
// Ensure updates affect aggregated values.
i.Update(20, 2.e6)
ts, fv = i.Value()
require.Equal(t, int64(20), ts)
require.Equal(t, float64(15), fv)
// Ensure zero rates don't change the aggregated value.
i.Update(25, 0)
ts, fv = i.Value()
require.Equal(t, int64(25), ts)
require.Equal(t, float64(15), fv)

// Ensure the value is cleared on reset.
i.Reset()
ts, fv = i.Value()
assert.Equal(t, int64(10), ts)
assert.Equal(t, float64(0), fv)
}

0 comments on commit eef4729

Please sign in to comment.