Skip to content

Commit

Permalink
hubble: add printer for lost events
Browse files Browse the repository at this point in the history
Currently hubble can't handle lost events which results on a large
output on CI runs [1]. This commit implements this missing functionality
while trying to maintain the same format for other types of messages.

[1]
```
2024-10-01T05:27:10.3601309Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER  num_events_lost:1}
2024-10-01T05:27:10.3601823Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER  num_events_lost:1}
2024-10-01T05:27:10.3602406Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER  num_events_lost:1}
```

Signed-off-by: André Martins <[email protected]>
  • Loading branch information
aanm committed Oct 4, 2024
1 parent c3c989b commit b7a489e
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 0 deletions.
88 changes: 88 additions & 0 deletions hubble/pkg/printer/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ func (p *Printer) WriteGetFlowsResponse(res *observerpb.GetFlowsResponse) error
return p.WriteProtoFlow(res)
case *observerpb.GetFlowsResponse_NodeStatus:
return p.WriteProtoNodeStatusEvent(res)
case *observerpb.GetFlowsResponse_LostEvents:
return p.WriteLostEvent(res)
case nil:
return nil
default:
Expand Down Expand Up @@ -914,3 +916,89 @@ func (p *Printer) WriteServerStatusResponse(res *observerpb.ServerStatusResponse
}
return nil
}

// WriteLostEvent writes v1.Flow into the output writer.
func (p *Printer) WriteLostEvent(res *observerpb.GetFlowsResponse) error {
f := res.GetLostEvents()

switch p.opts.output {
case TabOutput:
ew := &errWriter{w: p.tw}
src := f.GetSource()
numEventsLost := f.GetNumEventsLost()
cpu := f.GetCpu()

if p.line == 0 {
ew.write("TIMESTAMP", tab)
if p.opts.nodeName {
ew.write("NODE", tab)
}
ew.write(
"SOURCE", tab,
"DESTINATION", tab,
"TYPE", tab,
"VERDICT", tab,
"SUMMARY", newline,
)
}
ew.write("", tab)
if p.opts.nodeName {
ew.write("", tab)
}
ew.write(
src, tab,
"", tab,
"EVENTS LOST", tab,
"", tab,
fmt.Sprintf("CPU(%d) - %d", cpu.GetValue(), numEventsLost), newline,
)
if ew.err != nil {
return fmt.Errorf("failed to write out packet: %w", ew.err)
}
case DictOutput:
ew := &errWriter{w: p.opts.w}
src := f.GetSource()
numEventsLost := f.GetNumEventsLost()
cpu := f.GetCpu()
if p.line != 0 {
// TODO: line length?
ew.write(dictSeparator, newline)
}

// this is a little crude, but will do for now. should probably find the
// longest header and auto-format the keys
ew.write(" TIMESTAMP: ", "", newline)
if p.opts.nodeName {
ew.write(" NODE: ", "", newline)
}
ew.write(
" SOURCE: ", src, newline,
" TYPE: ", "EVENTS LOST", newline,
" VERDICT: ", "", newline,
" SUMMARY: ", fmt.Sprintf("CPU(%d) - %d", cpu.GetValue(), numEventsLost), newline,
)
if ew.err != nil {
return fmt.Errorf("failed to write out packet: %w", ew.err)
}
case CompactOutput:
src := f.GetSource()
numEventsLost := f.GetNumEventsLost()
cpu := f.GetCpu()

_, err := fmt.Fprintf(p.opts.w,
"EVENTS LOST: %s CPU(%d) %d\n",
src,
cpu.GetValue(),
numEventsLost,
)
if err != nil {
return fmt.Errorf("failed to write out packet: %w", err)
}
case JSONLegacyOutput:
return p.jsonEncoder.Encode(f)
case JSONPBOutput:
return p.jsonEncoder.Encode(res)
}
p.line++
return nil
}
93 changes: 93 additions & 0 deletions hubble/pkg/printer/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,3 +1407,96 @@ NUM CONNECTED NODES: N/A
})
}
}

func TestPrinter_WriteLostEventsResponse(t *testing.T) {
buf := bytes.Buffer{}
gfr := &observerpb.GetFlowsResponse{
ResponseTypes: &observerpb.GetFlowsResponse_LostEvents{
LostEvents: &observerpb.LostEvent{
Source: observerpb.LostEventSource_HUBBLE_RING_BUFFER,
NumEventsLost: 1,
Cpu: wrapperspb.Int32(5),
},
},
}
type args struct {
le *observerpb.GetFlowsResponse
}
tests := []struct {
name string
options []Option
args args
wantErr bool
expected string
}{
{
name: "tabular",
options: []Option{
WithColor("never"),
Writer(&buf),
},
args: args{gfr},
wantErr: false,
expected: `
TIMESTAMP SOURCE DESTINATION TYPE VERDICT SUMMARY
HUBBLE_RING_BUFFER EVENTS LOST CPU(5) - 1`,
}, {
name: "compact",
options: []Option{
Compact(),
WithColor("never"),
Writer(&buf),
},
args: args{gfr},
wantErr: false,
expected: `
EVENTS LOST: HUBBLE_RING_BUFFER CPU(5) 1`,
}, {
name: "json",
options: []Option{
JSONPB(),
WithColor("never"),
Writer(&buf),
},
args: args{gfr},
wantErr: false,
expected: `{"lost_events":{"source":"HUBBLE_RING_BUFFER","num_events_lost":"1","cpu":5}}`,
}, {
name: "jsonpb",
options: []Option{
JSONPB(),
WithColor("never"),
Writer(&buf),
},
args: args{gfr},
wantErr: false,
expected: `{"lost_events":{"source":"HUBBLE_RING_BUFFER","num_events_lost":"1","cpu":5}}`,
}, {
name: "dict",
options: []Option{
Dict(),
WithColor("never"),
Writer(&buf),
},
args: args{gfr},
wantErr: false,
expected: `
TIMESTAMP:
SOURCE: HUBBLE_RING_BUFFER
TYPE: EVENTS LOST
VERDICT:
SUMMARY: CPU(5) - 1`,
},
}
for _, tt := range tests {
buf.Reset()
t.Run(tt.name, func(t *testing.T) {
p := New(tt.options...)
if err := p.WriteLostEvent(tt.args.le); (err != nil) != tt.wantErr {
t.Errorf("WriteServerStatusResponse() error = %v, wantErr %v", err, tt.wantErr)
}
require.NoError(t, p.Close())
require.Equal(t, strings.TrimSpace(tt.expected), strings.TrimSpace(buf.String()))
})
}
}

0 comments on commit b7a489e

Please sign in to comment.