-
Notifications
You must be signed in to change notification settings - Fork 2
/
perf.go
561 lines (469 loc) · 17.1 KB
/
perf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
// Copyright (C) 2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//go:build linux && amd64
package powertelemetry
import (
"bytes"
"errors"
"fmt"
"math/big"
"os"
"strconv"
"strings"
"syscall"
ev "github.com/intel/iaevents"
"golang.org/x/sync/errgroup"
)
// File path which monitors the maximum number of file-handles that the Linux
// kernel can allocate.
const fileMaxPath = "/proc/sys/fs/file-max"
// fileInfoProvider reads contents of files and provides the maximum number of
// file descriptors that a process may allocate.
// TODO: Consider to move rlimit into a new single method interface.
// TODO: Move this interface to a separate file.
type fileInfoProvider interface {
// readFile reads the contents of a file.
readFile(path string) ([]byte, error)
// rlimit returns the maximum number of file descriptors that a process may allocate.
rlimit() (uint64, error)
}
// fsHelper implements fileInfoProvider interface.
type fsHelper struct{}
// readFile reads the contents of the given file path.
func (*fsHelper) readFile(path string) ([]byte, error) {
return os.ReadFile(path)
}
// rlimit returns the maximum number of file descriptors that a process may allocate.
// It makes a syscall to get RLIMIT_NOFILE property.
func (*fsHelper) rlimit() (uint64, error) {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
return rLimit.Cur, err
}
// getMaxFd is a helper function that takes a fileInfoProvider interface and returns
// the maximum number of file-handles that the Linux kernel can allocate.
func getMaxFd(fp fileInfoProvider) (uint64, error) {
buf, err := fp.readFile(fileMaxPath)
if err != nil {
return 0, fmt.Errorf("could not read file %q: %w", fileMaxPath, err)
}
maxFd, err := strconv.ParseUint(string(bytes.TrimRight(buf, "\n")), 10, 64)
if err != nil {
return 0, fmt.Errorf("could not parse file content %v to uint64: %w", maxFd, err)
}
return maxFd, nil
}
// checkFileDescriptors is a helper function that takes the number of estimated file descriptors
// needed and a fileInfoProvider, and returns nil if the number of estimated file descriptors does
// not exceed the maximum number of file-handles that the Linux kernel can allocate. Otherwise, returns
// an error.
// TODO: Add information about max number of file handles into README.md.
func checkFileDescriptors(fd uint64, reader fileInfoProvider) error {
maxFd, err := getMaxFd(reader)
if err != nil {
return fmt.Errorf("error retrieving kernel max file descriptors: %w", err)
}
if fd > maxFd {
return fmt.Errorf("required file descriptors %d, exceeds the maximum number of available file descriptors %d", fd, maxFd)
}
limit, err := reader.rlimit()
if err != nil {
return fmt.Errorf("error retrieving process max file descriptors: %w", err)
}
if fd > limit {
return fmt.Errorf("required file descriptors %d, exceeds the maximum number of available file descriptors that a process may allocate %d", fd, limit)
}
return nil
}
// multiply is a helper function that calculates the product of two uint64 values.
// If overflow occurs, it returns an error.
func multiply(a, b uint64) (uint64, error) {
bigA := new(big.Int).SetUint64(a)
bigB := new(big.Int).SetUint64(b)
res := new(big.Int).Mul(bigA, bigB)
if !res.IsUint64() {
return 0, fmt.Errorf("value could not be represented as uint64: %v", res)
}
return res.Uint64(), nil
}
// c0StateType is an enum type to identify event names corresponding to C0 substate metrics.
type c0StateType int
// c0StateType enum defines supported event names for C0 substate metrics.
const (
c01 c0StateType = iota
c02
c0Wait
thread
)
// Helper function to return a string representation of c0StateType.
func (t c0StateType) String() string {
switch t {
case c01:
return "CPU_CLK_UNHALTED.C01"
case c02:
return "CPU_CLK_UNHALTED.C02"
case c0Wait:
return "CPU_CLK_UNHALTED.C0_WAIT"
case thread:
return "CPU_CLK_UNHALTED.THREAD"
default:
return ""
}
}
// coreMetric represents the values of a core event read at a specific time instant.
type coreMetric struct {
name string
cpuID int
values ev.CounterValue
scaled uint64
}
// eventsResolver resolves event names, from a core event group, to custom events which
// be activated.
type eventsResolver interface {
resolveEvents(events []string) ([]ev.CustomizableEvent, error)
}
// eventsResolverImpl implements eventsResolver interface.
type eventsResolverImpl struct {
reader ev.Reader
transformer ev.Transformer
}
// resolveEvents takes a core event group with event names and resolves them into
// custom events that can be activated.
func (r *eventsResolverImpl) resolveEvents(events []string) ([]ev.CustomizableEvent, error) {
if len(events) == 0 {
return nil, errors.New("event group cannot be empty")
}
customEvents := make([]ev.CustomizableEvent, len(events))
for i, event := range events {
var err error
customEvents[i], err = r.resolveEvent(event)
if err != nil {
return nil, fmt.Errorf("error resolving event %q: %w", event, err)
}
}
return customEvents, nil
}
// resolveEvent takes an event name string and returns a custom event that be activated.
func (r *eventsResolverImpl) resolveEvent(name string) (ev.CustomizableEvent, error) {
if r.transformer == nil {
return ev.CustomizableEvent{}, errors.New("transformer is nil")
}
perfEvents, err := r.transformer.Transform(r.reader, ev.NewNameMatcher(name))
if err != nil {
return ev.CustomizableEvent{}, err
}
if len(perfEvents) == 0 {
return ev.CustomizableEvent{}, errors.New("event could not be resolved")
}
return ev.CustomizableEvent{
Event: perfEvents[0],
}, nil
}
// placementMaker takes a slice of cores and an event, which is the leader of
// the event group, and returns core placements needed for activation of each
// event of the group.
type placementMaker interface {
makeCorePlacement(cpuIDs []int, factory ev.PlacementFactory) ([]ev.PlacementProvider, error)
}
// placementMakerImpl implements placementMaker interface.
type placementMakerImpl struct{}
// makeCorePlacement takes a slice of cores and makes core placements for the given
// PlacementProvider.
func (*placementMakerImpl) makeCorePlacement(cpuIDs []int, factory ev.PlacementFactory) ([]ev.PlacementProvider, error) {
var cpuPlacements []ev.PlacementProvider
var err error
switch len(cpuIDs) {
case 0:
return nil, errors.New("no CPU IDs were provided")
case 1:
cpuPlacements, err = ev.NewCorePlacements(factory, cpuIDs[0])
if err != nil {
return nil, fmt.Errorf("failed to create single core placement: %w", err)
}
default:
cpuPlacements, err = ev.NewCorePlacements(factory, cpuIDs[0], cpuIDs[1:]...)
if err != nil {
return nil, fmt.Errorf("failed to create multiple core placements: %w", err)
}
}
return cpuPlacements, nil
}
// eventGroupActivator activates custom core events using the given core PlacementProvider.
type eventGroupActivator interface {
activateEventsAsGroup(p ev.PlacementProvider, events []ev.CustomizableEvent) ([]*ev.ActiveEvent, error)
}
// eventGroupActivatorImpl implements eventGroupActivator interface.
type eventGroupActivatorImpl struct{}
// activateEventsAsGroup takes a core PlacementProvider and a slice of custom events, and
// returns a slice of events which have been successfully activated.
func (*eventGroupActivatorImpl) activateEventsAsGroup(p ev.PlacementProvider, events []ev.CustomizableEvent) ([]*ev.ActiveEvent, error) {
activeEventGroup, err := ev.ActivateGroup(p, ev.NewEventTargetProcess(-1, 0), events)
if err != nil {
return nil, err
}
return activeEventGroup.Events(), nil
}
// eventsActivator activates a group of core events.
type eventsActivator interface {
activateEvents(customEvents []ev.CustomizableEvent, cores []int) ([]*ev.ActiveEvent, error)
}
// eventsActivatorImpl implements eventsActivator interface.
type eventsActivatorImpl struct {
placementMaker placementMaker
perfActivator eventGroupActivator
}
// activateGroup takes a group of core events and activates them.
func (a *eventsActivatorImpl) activateEvents(customEvents []ev.CustomizableEvent, cores []int) ([]*ev.ActiveEvent, error) {
if len(customEvents) == 0 {
return nil, errors.New("no custom events provided")
}
if len(cores) == 0 {
return nil, errors.New("no cores provided")
}
leader := customEvents[0]
placements, err := a.placementMaker.makeCorePlacement(cores, leader.Event)
if err != nil {
return nil, fmt.Errorf("failed to make core placements: %w", err)
}
activeEvents := make([]*ev.ActiveEvent, 0)
for _, placement := range placements {
events, err := a.perfActivator.activateEventsAsGroup(placement, customEvents)
if err != nil {
return activeEvents, fmt.Errorf("failed to activate events as a group: %w", err)
}
activeEvents = append(activeEvents, events...)
}
return activeEvents, nil
}
// valuesReader reads values of an active core event.
type valuesReader interface {
readValue(event *ev.ActiveEvent) (ev.CounterValue, error)
}
// valuesReaderImpl implements valuesReader interface.
type valuesReaderImpl struct{}
// readValue takes an active event and returns its values.
// It is a wrapper of ReadValue method of an ev.ActiveEvent value type.
func (*valuesReaderImpl) readValue(event *ev.ActiveEvent) (ev.CounterValue, error) {
return event.ReadValue()
}
// eventsReader reads the values of a group of active core events.
type eventsReader interface {
readEvents(events []*ev.ActiveEvent) ([]coreMetric, error)
}
// eventsReaderImpl implements eventsReader interface.
type eventsReaderImpl struct {
eventReader valuesReader
}
// readEvents takes a group of active core events and returns a slice of coreMetrics.
// Each coreMetric has read values specific for an event name and core.
// TODO: Rework implementation to accept context propagated from top of the call stack.
func (r *eventsReaderImpl) readEvents(events []*ev.ActiveEvent) ([]coreMetric, error) {
if len(events) == 0 {
return nil, errors.New("no active events provided")
}
metrics := make([]coreMetric, len(events))
errGroup := errgroup.Group{}
for i, event := range events {
if event == nil || event.PerfEvent == nil {
return nil, errors.New("invalid active event")
}
index := i
activeEvent := event
errGroup.Go(func() error {
values, err := r.eventReader.readValue(activeEvent)
if err != nil {
return fmt.Errorf("failed to read values for event %q: %w", activeEvent, err)
}
cpu, _ := activeEvent.PMUPlacement()
metrics[index] = coreMetric{
values: values,
cpuID: cpu,
name: activeEvent.PerfEvent.Name,
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, err
}
return metrics, nil
}
// eventDeactivator deactivates an active core event.
type eventDeactivator interface {
deactivateEvent(event *ev.ActiveEvent) error
}
// eventDeactivatorImpl implements eventDeactivator interface.
type eventDeactivatorImpl struct{}
// deactivateEvent takes an active core event and deactivates it. If the event could not
// be deactivated successfully an error is returned. This method is a wrapper of Deactivate
// method of ev.ActiveEvent value type.
func (*eventDeactivatorImpl) deactivateEvent(event *ev.ActiveEvent) error {
return event.Deactivate()
}
// eventsDeactivator deactivates a group of active core events.
type eventsDeactivator interface {
deactivateEvents(events []*ev.ActiveEvent) ([]*ev.ActiveEvent, error)
}
// eventsDeactivatorImpl implements eventsDeactivator interface.
type eventsDeactivatorImpl struct {
perfDeactivator eventDeactivator
}
// deactivateEvents takes a slice of active core events and deactivates them.
func (d *eventsDeactivatorImpl) deactivateEvents(events []*ev.ActiveEvent) ([]*ev.ActiveEvent, error) {
var err error
failedToDeactivate := make([]string, 0)
activeEvents := make([]*ev.ActiveEvent, 0)
for _, event := range events {
if event == nil || event.PerfEvent == nil {
continue
}
if err := d.perfDeactivator.deactivateEvent(event); err != nil {
failedToDeactivate = append(failedToDeactivate, event.PerfEvent.Name)
activeEvents = append(activeEvents, event)
}
}
if len(failedToDeactivate) != 0 {
err = fmt.Errorf("failed to deactivate events: %q", strings.Join(failedToDeactivate, ", "))
}
return activeEvents, err
}
// perfReader activates, reads and deactivates groups of core events accessible via `perf_events`
// kernel interface.
type perfReader interface {
initResolver(jsonFile string) error
activate(events []string, cores []int) error
read() ([]coreMetric, error)
deactivate() error
}
// perf implements perfReader interface. It keeps track of the current active events.
type perf struct {
resolver eventsResolver
activator eventsActivator
deactivator eventsDeactivator
valuesReader eventsReader
fileInfoReader fileInfoProvider
activeEvents []*ev.ActiveEvent
}
// newPerf takes a path string, corresponding to a JSON file which comprises processor model
// specific events.
func newPerf() perfReader {
return &perf{
activator: &eventsActivatorImpl{
placementMaker: &placementMakerImpl{},
perfActivator: &eventGroupActivatorImpl{},
},
deactivator: &eventsDeactivatorImpl{&eventDeactivatorImpl{}},
valuesReader: &eventsReaderImpl{&valuesReaderImpl{}},
fileInfoReader: &fsHelper{},
}
}
func (p *perf) initResolver(jsonFile string) error {
reader := ev.NewFilesReader()
if err := reader.AddFiles(jsonFile); err != nil {
return fmt.Errorf("error adding file to reader: %w", err)
}
p.resolver = &eventsResolverImpl{
reader: reader,
transformer: ev.NewPerfTransformer(),
}
return nil
}
// activate takes a slice of core event names and cores. It resolves the given event
// names into perf events and activates them. If number of file descriptors needed to
// read the events it returns an error.
// TODO: Do not receive events from arguments.
func (p *perf) activate(events []string, cores []int) error {
// resolve
customEvents, err := p.resolver.resolveEvents(events)
if err != nil {
return fmt.Errorf("error resolving event: %w", err)
}
// calculate file descriptors needed to access all events
numEvents := uint64(len(customEvents))
numCores := uint64(len(cores))
fd, err := multiply(numEvents, numCores)
if err != nil {
return err
}
// check maximum allowed number of file descriptors
err = checkFileDescriptors(fd, p.fileInfoReader)
if err != nil {
return fmt.Errorf("error checking available file descriptors: %w", err)
}
// activate
p.activeEvents, err = p.activator.activateEvents(customEvents, cores)
if err != nil {
return fmt.Errorf("error during event activation: %w", err)
}
return nil
}
// deactivate deactivates all active events. If an event or events could not
// be successfully deactivated, an error is returned.
func (p *perf) deactivate() error {
var err error
p.activeEvents, err = p.deactivator.deactivateEvents(p.activeEvents)
return err
}
// read performs a single read of all active events and returns a slice with the metrics for each one.
// Events need to be activated previously by calling resolve method.
// TODO: Rework implementation to accept context propagated from top of the call stack.
func (p *perf) read() ([]coreMetric, error) {
return p.valuesReader.readEvents(p.activeEvents)
}
// perfReaderWithStorage decorates perfReader with the ability to store core event read
// values and to retrieve all metrics that belong to a specific CPU ID.
type perfReaderWithStorage interface {
perfReader
update() error
getCoreMetrics(cpuID int) []coreMetric
}
// perfWithStorage implements perfReaderWithStorage interface. The content of metrics field
// are the core event values read from the last call to read method.
type perfWithStorage struct {
// TODO: Evaluate implications of either embedding perf or perfReader
perfReader
metrics []coreMetric
}
// update reads values for active core events specified by the receiver. It updates the metrics
// field with the latest values returned by read method and calculates scaled value of a metric.
func (p *perfWithStorage) update() error {
var err error
p.metrics, err = p.read()
if err != nil {
return err
}
for i := range p.metrics {
p.metrics[i].scaled, err = scaleMetricValues(p.metrics[i].values)
if err != nil {
return err
}
}
return nil
}
// scaleMetricValues calculates scaled value from metric values. Scaled value is equal to
// raw * enabled / running. If running value is equal to 0, then the raw value will be returned.
func scaleMetricValues(values ev.CounterValue) (uint64, error) {
enabledBig := new(big.Int).SetUint64(values.Enabled)
runningBig := new(big.Int).SetUint64(values.Running)
rawBig := new(big.Int).SetUint64(values.Raw)
if values.Enabled != values.Running && values.Running != uint64(0) {
product := new(big.Int).Mul(rawBig, enabledBig)
scaled := new(big.Int).Div(product, runningBig)
if !scaled.IsUint64() {
return 0, fmt.Errorf("scaled value could not be represented as uint64: %v", scaled)
}
return scaled.Uint64(), nil
}
return rawBig.Uint64(), nil
}
// getCoreMetrics takes a CPU ID as argument and returns all core metrics specific to this core
// stored in metrics field.
func (p *perfWithStorage) getCoreMetrics(cpuID int) []coreMetric {
metrics := make([]coreMetric, 0)
for _, metric := range p.metrics {
if metric.cpuID == cpuID {
metrics = append(metrics, metric)
}
}
return metrics
}