From b7a489e161968f6632c1cc2e7b4c5cc7b3aa5c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Martins?= Date: Thu, 3 Oct 2024 18:00:15 +0200 Subject: [PATCH] hubble: add printer for lost events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently hubble can't handle lost events which results on a large output on CI runs [1]. This commit implements this missing functionality while trying to maintain the same format for other types of messages. [1] ``` 2024-10-01T05:27:10.3601309Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER num_events_lost:1} 2024-10-01T05:27:10.3601823Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER num_events_lost:1} 2024-10-01T05:27:10.3602406Z unknown response type: &{LostEvents:source:HUBBLE_RING_BUFFER num_events_lost:1} ``` Signed-off-by: André Martins --- hubble/pkg/printer/printer.go | 88 ++++++++++++++++++++++++++++ hubble/pkg/printer/printer_test.go | 93 ++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) diff --git a/hubble/pkg/printer/printer.go b/hubble/pkg/printer/printer.go index d1aab440246db..d08940b9e901c 100644 --- a/hubble/pkg/printer/printer.go +++ b/hubble/pkg/printer/printer.go @@ -800,6 +800,8 @@ func (p *Printer) WriteGetFlowsResponse(res *observerpb.GetFlowsResponse) error return p.WriteProtoFlow(res) case *observerpb.GetFlowsResponse_NodeStatus: return p.WriteProtoNodeStatusEvent(res) + case *observerpb.GetFlowsResponse_LostEvents: + return p.WriteLostEvent(res) case nil: return nil default: @@ -914,3 +916,89 @@ func (p *Printer) WriteServerStatusResponse(res *observerpb.ServerStatusResponse } return nil } + +// WriteLostEvent writes v1.Flow into the output writer. +func (p *Printer) WriteLostEvent(res *observerpb.GetFlowsResponse) error { + f := res.GetLostEvents() + + switch p.opts.output { + case TabOutput: + ew := &errWriter{w: p.tw} + src := f.GetSource() + numEventsLost := f.GetNumEventsLost() + cpu := f.GetCpu() + + if p.line == 0 { + ew.write("TIMESTAMP", tab) + if p.opts.nodeName { + ew.write("NODE", tab) + } + ew.write( + "SOURCE", tab, + "DESTINATION", tab, + "TYPE", tab, + "VERDICT", tab, + "SUMMARY", newline, + ) + } + ew.write("", tab) + if p.opts.nodeName { + ew.write("", tab) + } + ew.write( + src, tab, + "", tab, + "EVENTS LOST", tab, + "", tab, + fmt.Sprintf("CPU(%d) - %d", cpu.GetValue(), numEventsLost), newline, + ) + if ew.err != nil { + return fmt.Errorf("failed to write out packet: %w", ew.err) + } + case DictOutput: + ew := &errWriter{w: p.opts.w} + src := f.GetSource() + numEventsLost := f.GetNumEventsLost() + cpu := f.GetCpu() + if p.line != 0 { + // TODO: line length? + ew.write(dictSeparator, newline) + } + + // this is a little crude, but will do for now. should probably find the + // longest header and auto-format the keys + ew.write(" TIMESTAMP: ", "", newline) + if p.opts.nodeName { + ew.write(" NODE: ", "", newline) + } + ew.write( + " SOURCE: ", src, newline, + " TYPE: ", "EVENTS LOST", newline, + " VERDICT: ", "", newline, + " SUMMARY: ", fmt.Sprintf("CPU(%d) - %d", cpu.GetValue(), numEventsLost), newline, + ) + if ew.err != nil { + return fmt.Errorf("failed to write out packet: %w", ew.err) + } + case CompactOutput: + src := f.GetSource() + numEventsLost := f.GetNumEventsLost() + cpu := f.GetCpu() + + _, err := fmt.Fprintf(p.opts.w, + "EVENTS LOST: %s CPU(%d) %d\n", + src, + cpu.GetValue(), + numEventsLost, + ) + if err != nil { + return fmt.Errorf("failed to write out packet: %w", err) + } + case JSONLegacyOutput: + return p.jsonEncoder.Encode(f) + case JSONPBOutput: + return p.jsonEncoder.Encode(res) + } + p.line++ + return nil +} diff --git a/hubble/pkg/printer/printer_test.go b/hubble/pkg/printer/printer_test.go index 521ede7c8d2e2..6a374f9707d83 100644 --- a/hubble/pkg/printer/printer_test.go +++ b/hubble/pkg/printer/printer_test.go @@ -1407,3 +1407,96 @@ NUM CONNECTED NODES: N/A }) } } + +func TestPrinter_WriteLostEventsResponse(t *testing.T) { + buf := bytes.Buffer{} + gfr := &observerpb.GetFlowsResponse{ + ResponseTypes: &observerpb.GetFlowsResponse_LostEvents{ + LostEvents: &observerpb.LostEvent{ + Source: observerpb.LostEventSource_HUBBLE_RING_BUFFER, + NumEventsLost: 1, + Cpu: wrapperspb.Int32(5), + }, + }, + } + type args struct { + le *observerpb.GetFlowsResponse + } + tests := []struct { + name string + options []Option + args args + wantErr bool + expected string + }{ + { + name: "tabular", + options: []Option{ + WithColor("never"), + Writer(&buf), + }, + args: args{gfr}, + wantErr: false, + expected: ` +TIMESTAMP SOURCE DESTINATION TYPE VERDICT SUMMARY + HUBBLE_RING_BUFFER EVENTS LOST CPU(5) - 1`, + }, { + name: "compact", + options: []Option{ + Compact(), + WithColor("never"), + Writer(&buf), + }, + args: args{gfr}, + wantErr: false, + expected: ` +EVENTS LOST: HUBBLE_RING_BUFFER CPU(5) 1`, + }, { + name: "json", + options: []Option{ + JSONPB(), + WithColor("never"), + Writer(&buf), + }, + args: args{gfr}, + wantErr: false, + expected: `{"lost_events":{"source":"HUBBLE_RING_BUFFER","num_events_lost":"1","cpu":5}}`, + }, { + name: "jsonpb", + options: []Option{ + JSONPB(), + WithColor("never"), + Writer(&buf), + }, + args: args{gfr}, + wantErr: false, + expected: `{"lost_events":{"source":"HUBBLE_RING_BUFFER","num_events_lost":"1","cpu":5}}`, + }, { + name: "dict", + options: []Option{ + Dict(), + WithColor("never"), + Writer(&buf), + }, + args: args{gfr}, + wantErr: false, + expected: ` + TIMESTAMP: + SOURCE: HUBBLE_RING_BUFFER + TYPE: EVENTS LOST + VERDICT: + SUMMARY: CPU(5) - 1`, + }, + } + for _, tt := range tests { + buf.Reset() + t.Run(tt.name, func(t *testing.T) { + p := New(tt.options...) + if err := p.WriteLostEvent(tt.args.le); (err != nil) != tt.wantErr { + t.Errorf("WriteServerStatusResponse() error = %v, wantErr %v", err, tt.wantErr) + } + require.NoError(t, p.Close()) + require.Equal(t, strings.TrimSpace(tt.expected), strings.TrimSpace(buf.String())) + }) + } +}