Skip to content

Commit

Permalink
Implement a cumulativeTracker struct.
Browse files Browse the repository at this point in the history
Rearrange aggregates in scraper.
  • Loading branch information
igorpeshansky committed Jul 28, 2024
1 parent 57b5017 commit 4706b7a
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 31 deletions.
122 changes: 91 additions & 31 deletions receiver/dcgmreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,32 @@ type dcgmScraper struct {
mb *metadata.MetricsBuilder
// Aggregate cumulative values.
aggregates struct {
energyConsumptionFallback *defaultMap[uint, *rateIntegrator[float64]] // ...from power usage rate.
pcieTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie tx.
pcieRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie rx.
nvlinkTxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink tx.
nvlinkRxTotal *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink rx.
energyConsumption struct {
total *defaultMap[uint, *cumulativeTracker[int64]]
fallback *defaultMap[uint, *rateIntegrator[float64]] // ...from power usage rate.
}
pcieTotal struct {
tx *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie tx.
rx *defaultMap[uint, *rateIntegrator[int64]] // ...from pcie rx.
}
nvlinkTotal struct {
tx *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink tx.
rx *defaultMap[uint, *rateIntegrator[int64]] // ...from nvlink rx.
}
throttleDuration struct {
powerViolation *defaultMap[uint, *cumulativeTracker[int64]]
thermalViolation *defaultMap[uint, *cumulativeTracker[int64]]
syncBoostViolation *defaultMap[uint, *cumulativeTracker[int64]]
boardLimitViolation *defaultMap[uint, *cumulativeTracker[int64]]
lowUtilViolation *defaultMap[uint, *cumulativeTracker[int64]]
reliabilityViolation *defaultMap[uint, *cumulativeTracker[int64]]
totalAppClocksViolation *defaultMap[uint, *cumulativeTracker[int64]]
totalBaseClocksViolation *defaultMap[uint, *cumulativeTracker[int64]]
}
eccTotal struct {
sbe *defaultMap[uint, *cumulativeTracker[int64]]
dbe *defaultMap[uint, *cumulativeTracker[int64]]
}
}
}

Expand Down Expand Up @@ -87,17 +108,34 @@ func newRateIntegrator[V int64 | float64]() *rateIntegrator[V] {
return ri
}

func newCumulativeTracker[V int64 | float64]() *cumulativeTracker[V] {
ct := new(cumulativeTracker[V])
ct.Reset()
return ct
}

func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
startTime := pcommon.NewTimestampFromTime(time.Now())
mbConfig := metadata.DefaultMetricsBuilderConfig()
mbConfig.Metrics = s.config.Metrics
s.mb = metadata.NewMetricsBuilder(
mbConfig, s.settings, metadata.WithStartTime(startTime))
s.aggregates.energyConsumptionFallback = newDefaultMap[uint](newRateIntegrator[float64])
s.aggregates.pcieTxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.pcieRxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkTxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkRxTotal = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.energyConsumption.total = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.energyConsumption.fallback = newDefaultMap[uint](newRateIntegrator[float64])
s.aggregates.pcieTotal.tx = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.pcieTotal.rx = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkTotal.tx = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.nvlinkTotal.rx = newDefaultMap[uint](newRateIntegrator[int64])
s.aggregates.throttleDuration.powerViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.thermalViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.syncBoostViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.boardLimitViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.lowUtilViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.reliabilityViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.totalAppClocksViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.throttleDuration.totalBaseClocksViolation = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.eccTotal.sbe = newDefaultMap[uint](newCumulativeTracker[int64])
s.aggregates.eccTotal.dbe = newDefaultMap[uint](newCumulativeTracker[int64])

return nil
}
Expand Down Expand Up @@ -266,31 +304,33 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok {
s.aggregates.pcieTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTxTotal.Get(gpuIndex).Value()
s.aggregates.pcieTotal.tx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTotal.tx.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok {
s.aggregates.pcieRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieRxTotal.Get(gpuIndex).Value()
s.aggregates.pcieTotal.rx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieTotal.rx.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok {
s.aggregates.nvlinkTxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTxTotal.Get(gpuIndex).Value()
s.aggregates.nvlinkTotal.tx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTotal.tx.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok {
s.aggregates.nvlinkRxTotal.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkRxTotal.Get(gpuIndex).Value()
s.aggregates.nvlinkTotal.rx.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkTotal.rx.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok {
energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */
s.aggregates.energyConsumption.total.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.energyConsumption.total.Get(gpuIndex).Value()
energyUsed := float64(value) / 1e3 /* mJ to J */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
} else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback
s.aggregates.energyConsumptionFallback.Get(gpuIndex).Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumptionFallback.Get(gpuIndex).Value()
s.aggregates.energyConsumption.fallback.Get(gpuIndex).Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumption.fallback.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
}
if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok {
Expand All @@ -301,42 +341,62 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmClockFrequencyDataPoint(now, clockFreq)
}
if metric, ok := metrics["DCGM_FI_DEV_POWER_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.powerViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.powerViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationPower)
}
if metric, ok := metrics["DCGM_FI_DEV_THERMAL_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.thermalViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.thermalViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationThermal)
}
if metric, ok := metrics["DCGM_FI_DEV_SYNC_BOOST_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.syncBoostViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.syncBoostViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationSyncBoost)
}
if metric, ok := metrics["DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.boardLimitViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.boardLimitViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBoardLimit)
}
if metric, ok := metrics["DCGM_FI_DEV_LOW_UTIL_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.lowUtilViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.lowUtilViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationLowUtil)
}
if metric, ok := metrics["DCGM_FI_DEV_RELIABILITY_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.reliabilityViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.reliabilityViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationReliability)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.totalAppClocksViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.totalAppClocksViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationAppClock)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.totalBaseClocksViolation.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.totalBaseClocksViolation.Get(gpuIndex).Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBaseClock)
}
if metric, ok := metrics["DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"]; ok {
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeSbe)
s.aggregates.eccTotal.sbe.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, sbeErrors := s.aggregates.eccTotal.sbe.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, sbeErrors, metadata.AttributeGpuErrorTypeSbe)
}
if metric, ok := metrics["DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"]; ok {
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeDbe)
s.aggregates.eccTotal.dbe.Get(gpuIndex).Update(metric.timestamp, metric.asInt64())
_, dbeErrors := s.aggregates.eccTotal.dbe.Get(gpuIndex).Value()
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, dbeErrors, metadata.AttributeGpuErrorTypeDbe)
}
// TODO: XID errors.
// s.mb.RecordGpuDcgmXidErrorsDataPoint(now, metric.asInt64(), xid)
Expand Down
35 changes: 35 additions & 0 deletions receiver/dcgmreceiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,41 @@ func (m *defaultMap[K, V]) Get(k K) V {
return v
}

// cumulativeTracker records cumulative values since last reset.
type cumulativeTracker[V int64 | float64] struct {
baseTimestamp int64
baseline V // the value seen at baseTimestamp.
lastTimestamp int64
lastValue V // the value seen at lastTimestamp.
}

func (i *cumulativeTracker[V]) Reset() {
i.baseTimestamp = 0
i.lastTimestamp = nowUnixMicro()
i.baseline = V(0)
i.lastValue = V(0)
}

func (i *cumulativeTracker[V]) Update(ts int64, v V) {
// On first update, record the value as the baseline.
if i.baseTimestamp == 0 {
i.baseTimestamp, i.baseline = ts, v
}
// Drop stale points.
if ts <= i.lastTimestamp {
return
}
i.lastTimestamp, i.lastValue = ts, v
}

func (i *cumulativeTracker[V]) Value() (int64, V) {
return i.lastTimestamp, i.lastValue - i.baseline
}

func (i *cumulativeTracker[V]) Baseline() (int64, V) {
return i.baseTimestamp, i.baseline
}

var (
errBlankValue = fmt.Errorf("unspecified blank value")
errDataNotFound = fmt.Errorf("data not found")
Expand Down
50 changes: 50 additions & 0 deletions receiver/dcgmreceiver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,56 @@ func TestRateIntegratorFloat64(t *testing.T) {
testRateIntegrator[float64](t)
}

func testCumulativeTracker[V int64 | float64](t *testing.T) {
origNowUnixMicro := nowUnixMicro
nowUnixMicro = func() int64 { return 10 }
defer func() { nowUnixMicro = origNowUnixMicro }()

type P struct {
ts int64
v V
}
p := func(ts int64, v V) P { return P{ts, v} }

var ct cumulativeTracker[V]

ct.Reset()
require.Equal(t, P{0, 0}, p(ct.Baseline()))
require.Equal(t, P{10, 0}, p(ct.Value()))
// Ensure first updates sets the baseline.
ct.Update(15, 50)
require.Equal(t, P{15, 50}, p(ct.Baseline()))
assert.Equal(t, P{15, 0}, p(ct.Value()))
// Ensure updates affect values, but not the baseline.
ct.Update(20, 80)
assert.Equal(t, P{15, 50}, p(ct.Baseline()))
assert.Equal(t, P{20, 30}, p(ct.Value()))
// Ensure stale points are ignored.
ct.Update(18, 1e8)
assert.Equal(t, P{20, 30}, p(ct.Value()))
ct.Update(20, 1e8)
assert.Equal(t, P{20, 30}, p(ct.Value()))
// Ensure updates affect values.
ct.Update(25, 100)
assert.Equal(t, P{25, 50}, p(ct.Value()))
// Ensure same inputs don't affect values.
ct.Update(30, 100)
assert.Equal(t, P{30, 50}, p(ct.Value()))

// Ensure the value and baseline are cleared on reset.
ct.Reset()
assert.Equal(t, P{0, 0}, p(ct.Baseline()))
assert.Equal(t, P{10, 0}, p(ct.Value()))
}

func TestCumulativeTrackerInt64(t *testing.T) {
testCumulativeTracker[int64](t)
}

func TestCumulativeTrackerFloat64(t *testing.T) {
testCumulativeTracker[float64](t)
}

func TestDefaultMap(t *testing.T) {
called := false
m := newDefaultMap[int, int64](func() int64 {
Expand Down

0 comments on commit 4706b7a

Please sign in to comment.