Skip to content

Commit

Permalink
Decouple event parsing from config manager
Browse files Browse the repository at this point in the history
Signed-off-by: grantseltzer <[email protected]>
  • Loading branch information
grantseltzer committed Dec 20, 2024
1 parent 1f64aff commit 44dd831
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pkg/dynamicinstrumentation/diconfig/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (cm *RCConfigManager) readConfigs(r *ringbuf.Reader, procInfo *ditypes.Proc
continue
}

configEvent, err := eventparser.ParseEvent(cm.diProcs, record.RawSample, configRateLimiter)
configEvent, err := eventparser.ParseEvent(record.RawSample, configRateLimiter)
if err != nil {
log.Errorf("error parsing configuration for PID %d: %v", procInfo.PID, err)
continue
Expand Down
23 changes: 7 additions & 16 deletions pkg/dynamicinstrumentation/eventparser/event_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (

// ParseEvent takes the raw buffer from bpf and parses it into an event. It also potentially
// applies a rate limit
func ParseEvent(procs ditypes.DIProcs, record []byte, ratelimiters *ratelimiter.MultiProbeRateLimiter) (*ditypes.DIEvent, error) {
func ParseEvent(record []byte, ratelimiters *ratelimiter.MultiProbeRateLimiter) (*ditypes.DIEvent, error) {
event := ditypes.DIEvent{}

if len(record) < ditypes.SizeofBaseEvent {
Expand All @@ -46,30 +46,21 @@ func ParseEvent(procs ditypes.DIProcs, record []byte, ratelimiters *ratelimiter.
event.PID = baseEvent.Pid
event.UID = baseEvent.Uid
event.StackPCs = baseEvent.Program_counters[:]

probe := procs.GetProbe(event.PID, event.ProbeID)
if probe == nil {
return nil, fmt.Errorf("received event unassociated with probe. Probe ID: %s PID: %d", event.ProbeID, event.PID)
}

event.Argdata = readParamsForProbe(probe, record[ditypes.SizeofBaseEvent:])
event.Argdata = readParams(record[ditypes.SizeofBaseEvent:])
return &event, nil
}

func readParamsForProbe(probe *ditypes.Probe, values []byte) []*ditypes.Param {
func readParams(values []byte) []*ditypes.Param {
log.Tracef("DI event bytes (0:100): %v", values[0:100])
outputParams := []*ditypes.Param{}
for i := 0; i < probe.InstrumentationInfo.InstrumentationOptions.ArgumentsMaxSize; {
if i+3 >= len(values) {
break
}
for i := 0; i+3 < len(values); {
paramTypeDefinition := parseTypeDefinition(values[i:])
if paramTypeDefinition == nil {
break
}
sizeOfTypeDefinition := countBufferUsedByTypeDefinition(paramTypeDefinition)
i += sizeOfTypeDefinition
val, numBytesRead := parseParamValueForProbe(probe, paramTypeDefinition, values[i:])
val, numBytesRead := parseParamValue(paramTypeDefinition, values[i:])
if val == nil {
return outputParams
}
Expand All @@ -79,11 +70,11 @@ func readParamsForProbe(probe *ditypes.Probe, values []byte) []*ditypes.Param {
return outputParams
}

// parseParamValueForProbe takes the representation of the param type's definition and the
// parseParamValue takes the representation of the param type's definition and the
// actual values in the buffer and populates the definition with the value parsed
// from the byte buffer. It returns the resulting parameter and an indication of
// how many bytes were read from the buffer
func parseParamValueForProbe(probe *ditypes.Probe, definition *ditypes.Param, buffer []byte) (*ditypes.Param, int) {
func parseParamValue(definition *ditypes.Param, buffer []byte) (*ditypes.Param, int) {
var bufferIndex int = 0
// Start by creating a stack with each layer of the definition
// which will correspond with the layers of the values read from buffer.
Expand Down
29 changes: 4 additions & 25 deletions pkg/dynamicinstrumentation/eventparser/event_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,10 @@ func TestParseParamValue(t *testing.T) {
},
},
}
probe := &ditypes.Probe{
InstrumentationInfo: &ditypes.InstrumentationInfo{
InstrumentationOptions: &ditypes.InstrumentationOptions{
ArgumentsMaxSize: 300,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
val, _ := parseParamValueForProbe(probe, tt.inputDefinition, tt.inputBuffer)
val, _ := parseParamValue(tt.inputDefinition, tt.inputBuffer)
if !reflect.DeepEqual(val, tt.expectedValue) {
t.Errorf("Parsed incorrectly! Got %+v, expected %+v", val, tt.expectedValue)
}
Expand Down Expand Up @@ -172,16 +166,9 @@ func TestReadParams(t *testing.T) {
},
}

probe := &ditypes.Probe{
InstrumentationInfo: &ditypes.InstrumentationInfo{
InstrumentationOptions: &ditypes.InstrumentationOptions{
ArgumentsMaxSize: 300,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
output := readParamsForProbe(probe, tt.inputBuffer)
output := readParams(tt.inputBuffer)
if !reflect.DeepEqual(output, tt.expectedResult) {
fmt.Printf("Got: %v\n", output)
fmt.Printf("Expected: %v\n", tt.expectedResult)
Expand Down Expand Up @@ -448,17 +435,9 @@ func TestParseParams(t *testing.T) {
},
}

probe := &ditypes.Probe{
InstrumentationInfo: &ditypes.InstrumentationInfo{
InstrumentationOptions: &ditypes.InstrumentationOptions{
ArgumentsMaxSize: 1000,
},
},
}

for i := range testCases {
t.Run(testCases[i].Name, func(t *testing.T) {
result := readParamsForProbe(probe, testCases[i].Buffer)
result := readParams(testCases[i].Buffer)
assert.Equal(t, testCases[i].ExpectedOutput, result)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dynamicinstrumentation/ringbufconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (goDI *GoDI) startRingbufferConsumer(rate float64) (func(), error) {
continue
}

event, err := eventparser.ParseEvent(goDI.ConfigManager.GetProcInfos(), record.RawSample, rateLimiters)
event, err := eventparser.ParseEvent(record.RawSample, rateLimiters)
if err != nil {
log.Trace(err)
continue
Expand Down

0 comments on commit 44dd831

Please sign in to comment.