Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect metrics asynchronously #223

Merged
merged 31 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e4e0d4b
Checkpoint (refactor scraping to a separate goroutine)
quentinmit Aug 5, 2024
795f60a
scraper.go now compiles
quentinmit Aug 5, 2024
ede6247
goimports
quentinmit Aug 5, 2024
69982a6
client_gpu_test compiles
quentinmit Aug 5, 2024
1ffd000
Update scraper_gpu_test
quentinmit Aug 6, 2024
7cdb057
Only use points after the start of the client
quentinmit Aug 6, 2024
039d832
Wait for GPU metrics to be present
quentinmit Aug 6, 2024
03dc917
Make shutdown behavior reliable
quentinmit Aug 7, 2024
3a803c4
Adapt RateIntegrator tests for metricStats
quentinmit Aug 7, 2024
81a6cc6
Remove unused rateIntegrator type
quentinmit Aug 7, 2024
0d9fb28
Update cumulativeTracker test
quentinmit Aug 7, 2024
3a6e6af
Remove cumulativeTracker implementation
quentinmit Aug 7, 2024
36fbe39
Ensure a client connection is open when trying to pause/resume profiling
quentinmit Aug 7, 2024
131d921
Remove more unused code
quentinmit Aug 7, 2024
43ac034
Address review comments
quentinmit Aug 7, 2024
0243a5e
goimports
quentinmit Aug 7, 2024
6d7147c
Review comment
quentinmit Aug 7, 2024
0cfaee1
Add comment on how integration works
quentinmit Aug 7, 2024
e84a661
Calculate buffer duration since we no longer get a full buffer, and e…
quentinmit Aug 8, 2024
5513b42
goimports
quentinmit Aug 8, 2024
48fc30b
Explicitly test scrape collection interval
quentinmit Aug 9, 2024
82acc2d
Initialize the dcgm library in testprofilepause
quentinmit Aug 10, 2024
1a0c33c
Reuse client constructor's device group
quentinmit Aug 10, 2024
6cb898d
Merge branch 'igorpeshansky-dcgm-new-metrics' into quentin-dcgm-new-m…
igorpeshansky Aug 14, 2024
ff45911
getSupportedRegularFields is unnecessary now that we expect blank values
quentinmit Aug 27, 2024
f249d19
Violation timers are in ns, not µs
quentinmit Aug 27, 2024
26a7362
Suppress blank value warnings
quentinmit Aug 28, 2024
395c2f7
Sort field lists before checking goldens
quentinmit Sep 3, 2024
5c3bdd5
Generate golden metric lists from actual scraping, to detect metrics …
quentinmit Sep 4, 2024
76ef57f
Use a different error message and rate limit specifically for 'field …
quentinmit Sep 9, 2024
79e19d6
Add missing continue
quentinmit Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 105 additions & 108 deletions receiver/dcgmreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package dcgmreceiver
import (
"errors"
"fmt"
"math"
"time"

"github.com/NVIDIA/go-dcgm/pkg/dcgm"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.uber.org/zap"
)

Expand All @@ -41,41 +41,40 @@ type dcgmClientSettings struct {
fields []string
}

type deviceMetrics struct {
ModelName string
UUID string
Metrics MetricsMap
}

type dcgmClient struct {
logger *zap.SugaredLogger
handleCleanup func()
enabledFieldIDs []dcgm.Short
enabledFieldGroup dcgm.FieldHandle
deviceIndices []uint
devicesModelName []string
devicesUUID []string
logger *zap.SugaredLogger
handleCleanup func()
enabledFieldIDs []dcgm.Short
enabledFieldGroup dcgm.FieldHandle
deviceGroup dcgm.GroupHandle

devices map[uint]deviceMetrics
lastSuccessfulPoll time.Time

deviceMetricToFailedQueryCount map[string]uint64
pollingInterval time.Duration
retryBlankValues bool
maxRetries int
}

type dcgmMetric struct {
timestamp int64
name string
value interface{}
}

// Can't pass argument dcgm.mode because it is unexported
var dcgmInit = func(args ...string) (func(), error) {
return dcgm.Init(dcgm.Standalone, args...)
}

var dcgmGetLatestValuesForFields = dcgm.GetLatestValuesForFields
var dcgmGetValuesSince = dcgm.GetValuesSince

func newClient(settings *dcgmClientSettings, logger *zap.Logger) (*dcgmClient, error) {
dcgmCleanup, err := initializeDcgm(settings.endpoint, logger)
if err != nil {
return nil, errors.Join(ErrDcgmInitialization, err)
}
deviceIndices := make([]uint, 0)
names := make([]string, 0)
UUIDs := make([]string, 0)
enabledFieldGroup := dcgm.FieldHandle{}
requestedFieldIDs := toFieldIDs(settings.fields)
supportedRegularFieldIDs, err := getSupportedRegularFields(requestedFieldIDs, logger)
Expand All @@ -92,12 +91,15 @@ func newClient(settings *dcgmClientSettings, logger *zap.Logger) (*dcgmClient, e
for _, f := range unavailableFields {
logger.Sugar().Warnf("Field '%s' is not supported. Metric '%s' will not be collected", dcgmIDToName[f], dcgmIDToName[f])
}
var deviceGroup dcgm.GroupHandle
if len(enabledFields) != 0 {
deviceIndices, names, UUIDs, err = discoverDevices(logger)
supportedDeviceIndices, err := dcgm.GetSupportedDevices()
if err != nil {
return nil, err
return nil, fmt.Errorf("Unable to discover supported GPUs on %w", err)
}
deviceGroup, err := createDeviceGroup(logger, deviceIndices)
logger.Sugar().Infof("Discovered %d supported GPU devices", len(supportedDeviceIndices))

deviceGroup, err = createDeviceGroup(logger, supportedDeviceIndices)
if err != nil {
return nil, err
}
Expand All @@ -112,9 +114,9 @@ func newClient(settings *dcgmClientSettings, logger *zap.Logger) (*dcgmClient, e
handleCleanup: dcgmCleanup,
enabledFieldIDs: enabledFields,
enabledFieldGroup: enabledFieldGroup,
deviceIndices: deviceIndices,
devicesModelName: names,
devicesUUID: UUIDs,
deviceGroup: deviceGroup,
devices: map[uint]deviceMetrics{},
lastSuccessfulPoll: time.Now(),
deviceMetricToFailedQueryCount: make(map[string]uint64),
pollingInterval: settings.pollingInterval,
retryBlankValues: settings.retryBlankValues,
Expand All @@ -139,30 +141,20 @@ func initializeDcgm(endpoint string, logger *zap.Logger) (func(), error) {
return dcgmCleanup, nil
}

func discoverDevices(logger *zap.Logger) ([]uint, []string, []string, error) {
supportedDeviceIndices, err := dcgm.GetSupportedDevices()
func newDeviceMetrics(logger *zap.SugaredLogger, gpuIndex uint) (deviceMetrics, error) {
quentinmit marked this conversation as resolved.
Show resolved Hide resolved
deviceInfo, err := dcgm.GetDeviceInfo(gpuIndex)
if err != nil {
return nil, nil, nil, fmt.Errorf("Unable to discover supported GPUs on %w", err)
logger.Warnf("Unable to query device info for NVIDIA device %d on '%w'", gpuIndex, err)
return deviceMetrics{}, err
}
logger.Sugar().Infof("Discovered %d supported GPU devices", len(supportedDeviceIndices))

devices := make([]uint, 0, len(supportedDeviceIndices))
names := make([]string, 0, len(supportedDeviceIndices))
UUIDs := make([]string, 0, len(supportedDeviceIndices))
for _, gpuIndex := range supportedDeviceIndices {
deviceInfo, err := dcgm.GetDeviceInfo(gpuIndex)
if err != nil {
logger.Sugar().Warnf("Unable to query device info for NVIDIA device %d on '%w'", gpuIndex, err)
continue
}

devices = append(devices, gpuIndex)
names = append(names, deviceInfo.Identifiers.Model)
UUIDs = append(UUIDs, deviceInfo.UUID)
logger.Sugar().Infof("Discovered NVIDIA device %s with UUID %s", names[gpuIndex], UUIDs[gpuIndex])
device := deviceMetrics{
ModelName: deviceInfo.Identifiers.Model,
UUID: deviceInfo.UUID,
Metrics: MetricsMap{},
}

return devices, names, UUIDs, nil
logger.Infof("Discovered NVIDIA device %s with UUID %s (DCGM GPU ID %d)", device.ModelName, device.UUID, gpuIndex)
return device, nil
}

func createDeviceGroup(logger *zap.Logger, deviceIndices []uint) (dcgm.GroupHandle, error) {
Expand Down Expand Up @@ -289,7 +281,7 @@ func getSupportedRegularFields(requestedFields []dcgm.Short, logger *zap.Logger)
}
found := make(map[dcgm.Short]bool)
for _, gpuIndex := range deviceIndices {
fieldValues, pollErr := dcgm.GetLatestValuesForFields(gpuIndex, regularFields)
fieldValues, pollErr := dcgm.EntitiesGetLatestValues([]dcgm.GroupEntityPair{{dcgm.FE_GPU, gpuIndex}}, regularFields, 0)
if pollErr != nil {
continue
}
Expand Down Expand Up @@ -352,103 +344,108 @@ func setWatchesOnFields(logger *zap.Logger, deviceGroup dcgm.GroupHandle, fieldI
return fieldGroup, nil
}

const maxKeepSamples = 100 // TODO: Is this enough?

func setWatchesOnEnabledFields(pollingInterval time.Duration, logger *zap.Logger, deviceGroup dcgm.GroupHandle, enabledFieldIDs []dcgm.Short) (dcgm.FieldHandle, error) {
return setWatchesOnFields(logger, deviceGroup, enabledFieldIDs, dcgmWatchParams{
// Note: Add random suffix to avoid conflict amongnst any parallel collectors
fieldGroupName: fmt.Sprintf("google-cloud-ops-agent-metrics-%d", randSource.Intn(10000)),
// Note: DCGM retained samples = Max(maxKeepSamples, maxKeepTime/updateFreq)
updateFreqUs: int64(pollingInterval / time.Microsecond),
maxKeepTime: 600.0, /* 10 min */
maxKeepSamples: int32(15),
maxKeepSamples: maxKeepSamples,
})
}

func (client *dcgmClient) cleanup() {
_ = dcgm.FieldGroupDestroy(client.enabledFieldGroup)
_ = dcgm.DestroyGroup(client.deviceGroup)
if client.handleCleanup != nil {
client.handleCleanup()
}

client.logger.Info("Shutdown DCGM")
}

func (client *dcgmClient) getDeviceModelName(gpuIndex uint) string {
return client.devicesModelName[gpuIndex]
}

func (client *dcgmClient) getDeviceUUID(gpuIndex uint) string {
return client.devicesUUID[gpuIndex]
}

func (client *dcgmClient) collectDeviceMetrics() (map[uint][]dcgmMetric, error) {
var err scrapererror.ScrapeErrors
gpuMetrics := make(map[uint][]dcgmMetric)
for _, gpuIndex := range client.deviceIndices {
client.logger.Debugf("Polling DCGM daemon for GPU %d", gpuIndex)
retry := true
for i := 0; retry && i < client.maxRetries; i++ {
fieldValues, pollErr := dcgmGetLatestValuesForFields(gpuIndex, client.enabledFieldIDs)
client.logger.Debugf("Got %d field values", len(fieldValues))
if pollErr == nil {
gpuMetrics[gpuIndex], retry = client.appendMetrics(gpuMetrics[gpuIndex], gpuIndex, fieldValues)
if retry {
client.logger.Warnf("Retrying poll of DCGM daemon for GPU %d; attempt %d", gpuIndex, i+1)
time.Sleep(client.pollingInterval)
continue
}
client.logger.Debugf("Successful poll of DCGM daemon for GPU %d", gpuIndex)
} else {
msg := fmt.Sprintf("Unable to poll DCGM daemon for GPU %d on %s", gpuIndex, pollErr)
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, "all-profiling-metrics", msg)
err.AddPartial(1, fmt.Errorf("%s", msg))
}
}
// collect will poll dcgm for any new metrics, updating client.devices as appropriate
// It returns the estimated polling interval.
func (client *dcgmClient) collect() (time.Duration, error) {
client.logger.Debugf("Polling DCGM daemon for field values")
if len(client.enabledFieldIDs) == 0 {
// Make sure we don't try to scrape without a device group (since we don't construct one when there are no enabled fields).
quentinmit marked this conversation as resolved.
Show resolved Hide resolved
return 0, nil
}

return gpuMetrics, err.Combine()
}

func (client *dcgmClient) appendMetrics(gpuMetrics []dcgmMetric, gpuIndex uint, fieldValues []dcgm.FieldValue_v1) (result []dcgmMetric, retry bool) {
retry = false
fieldValues, pollTime, err := dcgmGetValuesSince(client.deviceGroup, client.enabledFieldGroup, client.lastSuccessfulPoll)
if err != nil {
msg := fmt.Sprintf("Unable to poll DCGM daemon for on %s", err)
client.issueWarningForFailedQueryUptoThreshold("all-profiling-metrics", msg)
return 0, err
}
client.logger.Debugf("Got %d field values over %s", len(fieldValues), pollTime.Sub(client.lastSuccessfulPoll))
client.lastSuccessfulPoll = pollTime
oldestTs := int64(math.MaxInt64)
newestTs := int64(0)
for _, fieldValue := range fieldValues {
if fieldValue.EntityGroupId != dcgm.FE_GPU {
continue
}
gpuIndex := fieldValue.EntityId
if _, ok := client.devices[gpuIndex]; !ok {
device, err := newDeviceMetrics(client.logger, gpuIndex)
if err != nil {
continue
}
client.devices[gpuIndex] = device
quentinmit marked this conversation as resolved.
Show resolved Hide resolved
}
device := client.devices[gpuIndex]
quentinmit marked this conversation as resolved.
Show resolved Hide resolved
dcgmName := dcgmIDToName[dcgm.Short(fieldValue.FieldId)]
if err := isValidValue(fieldValue); err != nil {
msg := fmt.Sprintf("Received invalid value (ts %d gpu %d) %s: %v", fieldValue.Ts, gpuIndex, dcgmName, err)
client.issueWarningForFailedQueryUptoThreshold(gpuIndex, dcgmName, msg)
if client.retryBlankValues && errors.Is(err, errBlankValue) {
retry = true
}
client.issueWarningForFailedQueryUptoThreshold(fmt.Sprintf("device%d.%s", gpuIndex, dcgmName), msg)
igorpeshansky marked this conversation as resolved.
Show resolved Hide resolved
continue
}

var metricValue interface{}
switch fieldValue.FieldType {
case dcgm.DCGM_FT_DOUBLE:
value := fieldValue.Float64()
client.logger.Debugf("Discovered (ts %d gpu %d) %s = %.3f (f64)", fieldValue.Ts, gpuIndex, dcgmName, value)
metricValue = value
case dcgm.DCGM_FT_INT64:
value := fieldValue.Int64()
client.logger.Debugf("Discovered (ts %d gpu %d) %s = %d (i64)", fieldValue.Ts, gpuIndex, dcgmName, value)
metricValue = value
default:
metricValue = fieldValue.Value
if fieldValue.Ts < oldestTs {
oldestTs = fieldValue.Ts
}
if fieldValue.Ts > newestTs {
newestTs = fieldValue.Ts
}
gpuMetrics = append(gpuMetrics, dcgmMetric{fieldValue.Ts, dcgmName, metricValue})
if _, ok := device.Metrics[dcgmName]; !ok {
device.Metrics[dcgmName] = &metricStats{}
}
device.Metrics[dcgmName].Update(fieldValue)
}
duration := time.Duration(newestTs-oldestTs) * time.Microsecond
client.logger.Debugf("Successful poll of DCGM daemon returned %v of data", duration)
// If we did a partial poll, there should be more room in the buffer.
duration = max(duration, client.pollingInterval*maxKeepSamples)
return duration, nil
}

return gpuMetrics, retry
// getDeviceMetrics returns a deep copy of client.devices
func (client *dcgmClient) getDeviceMetrics() map[uint]deviceMetrics {
out := map[uint]deviceMetrics{}
for gpuIndex, device := range client.devices {
new := MetricsMap{}
for key, value := range device.Metrics {
newValue := *value
new[key] = &newValue
}
// device is already a copy here
device.Metrics = new
out[gpuIndex] = device
}
return out
}

func (client *dcgmClient) issueWarningForFailedQueryUptoThreshold(deviceIdx uint, dcgmName string, reason string) {
deviceMetric := fmt.Sprintf("device%d.%s", deviceIdx, dcgmName)
client.deviceMetricToFailedQueryCount[deviceMetric]++
func (client *dcgmClient) issueWarningForFailedQueryUptoThreshold(dcgmName string, reason string) {
client.deviceMetricToFailedQueryCount[dcgmName]++

failedCount := client.deviceMetricToFailedQueryCount[deviceMetric]
failedCount := client.deviceMetricToFailedQueryCount[dcgmName]
if failedCount <= maxWarningsForFailedDeviceMetricQuery {
client.logger.Warnf("Unable to query '%s' for Nvidia device %d on '%s'", dcgmName, deviceIdx, reason)
client.logger.Warnf("Unable to query '%s' on '%s'", dcgmName, reason)
if failedCount == maxWarningsForFailedDeviceMetricQuery {
client.logger.Warnf("Surpressing further device query warnings for '%s' for Nvidia device %d", dcgmName, deviceIdx)
client.logger.Warnf("Surpressing further device query warnings for '%s'", dcgmName)
}
}
}
Loading