Skip to content

Commit

Permalink
NETOBSERV-617: split big payloads in GRPC exporter (netobserv#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mario Macias authored and shach33 committed Jan 10, 2023
1 parent 99b7925 commit ce71544
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 25 deletions.
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The following environment variables are available to configure the NetObserv eBF
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc` or `kafka` or `ipfix+tcp` or `ipfix+udp`.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector.
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
message. Messages larger than that number will be split and submitted sequentially.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
address from in order to report it in the AgentIP field on each flow. Accepted values are:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func buildFlowExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
cfg.TargetHost, cfg.TargetPort)
}
target := fmt.Sprintf("%s:%d", cfg.TargetHost, cfg.TargetPort)
grpcExporter, err := exporter.StartGRPCProto(target)
grpcExporter, err := exporter.StartGRPCProto(target, cfg.GRPCMessageMaxFlows)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Config struct {
TargetHost string `env:"FLOWS_TARGET_HOST"`
// TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc"
TargetPort int `env:"FLOWS_TARGET_PORT"`
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
// Interfaces contains the interface names from where flows will be collected. If empty, the agent
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces.
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,
Expand Down
20 changes: 13 additions & 7 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ var glog = logrus.WithField("component", "exporter/GRPCProto")
type GRPCProto struct {
hostPort string
clientConn *grpc.ClientConnection
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
}

func StartGRPCProto(hostPort string) (*GRPCProto, error) {
func StartGRPCProto(hostPort string, maxFlowsPerMessage int) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostPort)
if err != nil {
return nil, err
}
return &GRPCProto{
hostPort: hostPort,
clientConn: clientConn,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
}, nil
}

Expand All @@ -34,10 +39,11 @@ func StartGRPCProto(hostPort string) (*GRPCProto, error) {
func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
log := glog.WithField("collector", g.hostPort)
for inputRecords := range input {
pbRecords := flowsToPB(inputRecords)
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
log.WithError(err).Error("couldn't send flow records to collector")
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
log.WithError(err).Error("couldn't send flow records to collector")
}
}
}
if err := g.clientConn.Close(); err != nil {
Expand Down
64 changes: 50 additions & 14 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test"

"github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc"
Expand All @@ -21,11 +23,12 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
_, err = grpc.StartCollector(port, serverOut)
coll, err := grpc.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port))
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), 1000)
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand All @@ -37,23 +40,14 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{EthProtocol: flow.IPv6Type}},
AgentIP: net.ParseIP("8888::1111")},
}
close(flows)
go exporter.ExportFlows(flows)

var rs *pbflow.Records
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}
rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 0x0a090807, r.GetAgentIp().GetIpv4())
select {
case rs = <-serverOut:
case <-time.After(timeout):
require.Fail(t, "timeout waiting for flows")
}

rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 1)
r = rs.Entries[0]
assert.EqualValues(t, net.ParseIP("8888::1111"), r.GetAgentIp().GetIpv6())
Expand All @@ -65,3 +59,45 @@ func TestGRPCProto_ExportFlows_AgentIP(t *testing.T) {
//ok!
}
}

func TestGRPCProto_SplitLargeMessages(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto(fmt.Sprintf("127.0.0.1:%d", port), msgMaxLen)
require.NoError(t, err)

// Send a message much longer than the limit length
flows := make(chan []*flow.Record, 10)
var input []*flow.Record
for i := 0; i < 25000; i++ {
input = append(input, &flow.Record{RawRecord: flow.RawRecord{RecordKey: flow.RecordKey{
EthProtocol: flow.IPv6Type,
}}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"})
}
flows <- input
go exporter.ExportFlows(flows)

// expect that the submitted message is split in chunks no longer than msgMaxLen
rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 5000)

// after all the operation, no more flows are sent
select {
case rs = <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
//ok!
}
}
13 changes: 10 additions & 3 deletions pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ import (

// flowsToPB is an auxiliary function to convert flow records, as returned by the eBPF agent,
// into protobuf-encoded messages ready to be sent to the collector via GRPC
func flowsToPB(inputRecords []*flow.Record) *pbflow.Records {
func flowsToPB(inputRecords []*flow.Record, maxLen int) []*pbflow.Records {
entries := make([]*pbflow.Record, 0, len(inputRecords))
for _, record := range inputRecords {
entries = append(entries, flowToPB(record))
}
return &pbflow.Records{
Entries: entries,
var records []*pbflow.Records
for len(entries) > 0 {
end := len(entries)
if end > maxLen {
end = maxLen
}
records = append(records, &pbflow.Records{Entries: entries[:end]})
entries = entries[end:]
}
return records
}

// flowsToPB is an auxiliary function to convert a single flow record, as returned by the eBPF agent,
Expand Down
20 changes: 20 additions & 0 deletions pkg/test/channels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package test

import (
"testing"
"time"
)

// ReceiveTimeout returns the first received element or fails the test if nothing is received
// before the given timeout
func ReceiveTimeout[T any](t *testing.T, ch <-chan T, timeout time.Duration) T {
t.Helper()
select {
case e := <-ch:
return e
case <-time.After(timeout):
var z T
t.Fatalf("timeout while waiting %s for a %T element in channel", timeout, z)
return z
}
}

0 comments on commit ce71544

Please sign in to comment.