Skip to content

Commit

Permalink
fix: reduce number of anypb.New invoke when enriching flows (#270)
Browse files Browse the repository at this point in the history
# Description

Reducing the number of `anypb.New` calls by making all the flow
enrichment functions that is invoking `anypb.New` such as
`AddPacketSize` and `AddTCPFlags`, which we are calling separately, into
functions that accept `RetinaMetadata` struct as an additional param,
therefore allowing us to create a single flow's metadata struct, adding
data into it, and then calling `anypb.New` once via `AddRetinaMetadata`
at the end when adding the metadata struct to its flow.
## Related Issue

If this pull request is related to any issue, please mention it here.
Additionally, make sure that the issue is assigned to you before
submitting this pull request.

## Checklist

- [x] I have read the [contributing
documentation](https://retina.sh/docs/contributing).
- [x] I signed and signed-off the commits (`git commit -S -s ...`). See
[this
documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification)
on signing commits.
- [x] I have correctly attributed the author(s) of the code.
- [x] I have tested the changes locally.
- [x] I have followed the project's style guidelines.
- [x] I have updated the documentation, if necessary.
- [x] I have added tests, if applicable.

## Screenshots (if applicable) or Testing Completed

Please add any relevant screenshots or GIFs to showcase the changes
made.

## Additional Notes

Add any additional notes or context about the pull request here.

---

Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more
information on how to contribute to this project.
  • Loading branch information
nddq committed May 6, 2024
1 parent 694acbb commit cf2e6d0
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 99 deletions.
12 changes: 8 additions & 4 deletions pkg/module/metrics/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package metrics

import (
"fmt"
"strconv"
"strings"

v1 "github.com/cilium/cilium/api/v1/flow"
Expand Down Expand Up @@ -83,8 +84,8 @@ func (d *DNSMetrics) getLabels() []string {
}

func (d *DNSMetrics) values(flow *v1.Flow) []string {
flowDns, dnsType, numResponses := utils.GetDns(flow)
if flowDns == nil {
flowDNS, dnsType, numResponses := utils.GetDNS(flow)
if flowDNS == nil {
return nil
}
if dnsType == utils.DNSType_UNKNOWN ||
Expand All @@ -98,8 +99,11 @@ func (d *DNSMetrics) values(flow *v1.Flow) []string {
// https://github.com/inspektor-gadget/inspektor-gadget/issues/2008 .
// Also ref: https://github.com/inspektor-gadget/inspektor-gadget/blob/main/docs/gadgets/trace/dns.md#limitations .
labels := []string{
utils.DnsRcodeToString(flow),
strings.Join(flowDns.Qtypes, ","), flowDns.Query, strings.Join(flowDns.Ips, ","), fmt.Sprintf("%d", numResponses),
utils.DNSRcodeToString(flow),
strings.Join(flowDNS.GetQtypes(), ","),
flowDNS.GetQuery(),
strings.Join(flowDNS.GetIps(), ","),
strconv.FormatUint(uint64(numResponses), 10),
}
return labels
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/module/metrics/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,19 @@ func TestGetLabels(t *testing.T) {

func TestValues(t *testing.T) {
testR := &flow.Flow{}
utils.AddDnsInfo(testR, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
metaR := &utils.RetinaMetadata{}
utils.AddDNSInfo(testR, metaR, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
utils.AddRetinaMetadata(testR, metaR)

testQ := &flow.Flow{}
utils.AddDnsInfo(testQ, "Q", 0, "bing.com", []string{"A"}, 0, []string{})
metaQ := &utils.RetinaMetadata{}
utils.AddDNSInfo(testQ, metaQ, "Q", 0, "bing.com", []string{"A"}, 0, []string{})
utils.AddRetinaMetadata(testQ, metaQ)

testU := &flow.Flow{}
utils.AddDnsInfo(testU, "U", 0, "bing.com", []string{"A"}, 0, []string{})
metaU := &utils.RetinaMetadata{}
utils.AddDNSInfo(testU, metaU, "U", 0, "bing.com", []string{"A"}, 0, []string{})
utils.AddRetinaMetadata(testU, metaU)

tests := []struct {
name string
Expand Down Expand Up @@ -155,13 +161,19 @@ func TestProcessLocalCtx(t *testing.T) {
defer ctrl.Finish()

testR := &flow.Flow{}
utils.AddDnsInfo(testR, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
metaR := &utils.RetinaMetadata{}
utils.AddDNSInfo(testR, metaR, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
utils.AddRetinaMetadata(testR, metaR)

testIngress := &flow.Flow{TrafficDirection: flow.TrafficDirection_INGRESS}
utils.AddDnsInfo(testIngress, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
metaIngress := &utils.RetinaMetadata{}
utils.AddDNSInfo(testIngress, metaIngress, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
utils.AddRetinaMetadata(testIngress, metaIngress)

testEgress := &flow.Flow{TrafficDirection: flow.TrafficDirection_EGRESS}
utils.AddDnsInfo(testEgress, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
metaEgress := &utils.RetinaMetadata{}
utils.AddDNSInfo(testEgress, metaEgress, "R", 0, "bing.com", []string{"A"}, 1, []string{"1.1.1.1"})
utils.AddRetinaMetadata(testEgress, metaEgress)

tests := []struct {
name string
Expand Down
16 changes: 8 additions & 8 deletions pkg/module/metrics/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (lm *LatencyMetrics) Clean() {
}

func (lm *LatencyMetrics) ProcessFlow(f *v1.Flow) {
if f == nil || f.L4 == nil || f.L4.GetTCP() == nil || utils.GetTcpID(f) == 0 || f.IP == nil {
if f == nil || f.GetL4() == nil || f.GetL4().GetTCP() == nil || utils.GetTCPID(f) == 0 || f.GetIP() == nil {
return
}

Expand Down Expand Up @@ -264,24 +264,24 @@ func (lm *LatencyMetrics) calculateLatency(f *v1.Flow) {
k := key{
srcIP: f.IP.Source,
dstIP: f.IP.Destination,
srcP: f.L4.GetTCP().SourcePort,
dstP: f.L4.GetTCP().DestinationPort,
id: utils.GetTcpID(f),
srcP: f.GetL4().GetTCP().GetSourcePort(),
dstP: f.GetL4().GetTCP().GetDestinationPort(),
id: utils.GetTCPID(f),
}
// There will be multiple identical packets with same ID. Store only the first one.
if item := lm.cache.Get(k); item == nil {
lm.cache.Set(k, &val{
t: f.Time.Nanos,
flags: f.L4.GetTCP().Flags,
flags: f.GetL4().GetTCP().GetFlags(),
}, TTL)
}
} else if f.TraceObservationPoint == v1.TraceObservationPoint_FROM_NETWORK {
k := key{
srcIP: f.IP.Destination,
dstIP: f.IP.Source,
srcP: f.L4.GetTCP().DestinationPort,
dstP: f.L4.GetTCP().SourcePort,
id: utils.GetTcpID(f),
srcP: f.GetL4().GetTCP().GetDestinationPort(),
dstP: f.GetL4().GetTCP().GetSourcePort(),
id: utils.GetTCPID(f),
}
if item := lm.cache.Get(k); item != nil {
// Calculate latency in milliseconds.
Expand Down
17 changes: 11 additions & 6 deletions pkg/module/metrics/latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,20 @@ func TestProcessFlow(t *testing.T) {
*/
// Node -> Api server.
f1 := utils.ToFlow(t1, apiSeverIp, nodeIp, 80, 443, 6, 3, 0)
utils.AddTcpID(f1, 1234)
utils.AddTcpFlags(f1, 1, 0, 0, 0, 0, 0)
metaf1 := &utils.RetinaMetadata{}
utils.AddTCPID(metaf1, 1234)
utils.AddTCPFlags(f1, 1, 0, 0, 0, 0, 0)
utils.AddRetinaMetadata(f1, metaf1)
f1.Destination = &flow.Endpoint{
PodName: "kubernetes-apiserver",
}

// Api server -> Node.
f2 := utils.ToFlow(t2, nodeIp, apiSeverIp, 443, 80, 6, 2, 0)
utils.AddTcpID(f2, 1234)
utils.AddTcpFlags(f2, 1, 1, 0, 0, 0, 0)
metaf2 := &utils.RetinaMetadata{}
utils.AddTCPID(metaf2, 1234)
utils.AddTCPFlags(f2, 1, 1, 0, 0, 0, 0)
utils.AddRetinaMetadata(f2, metaf2)
f2.Source = &flow.Endpoint{
PodName: "kubernetes-apiserver",
}
Expand All @@ -142,9 +147,9 @@ func TestProcessFlow(t *testing.T) {
* Test case 2: Existing TCP connection.
*/
// Node -> Api server.
utils.AddTcpFlags(f1, 1, 0, 0, 0, 0, 0)
utils.AddTCPFlags(f1, 1, 0, 0, 0, 0, 0)
// Api server -> Node.
utils.AddTcpFlags(f2, 0, 1, 0, 0, 0, 0)
utils.AddTCPFlags(f2, 0, 1, 0, 0, 0, 0)
// Process flow.
lm.ProcessFlow(f1)
lm.ProcessFlow(f2)
Expand Down
25 changes: 18 additions & 7 deletions pkg/plugin/dns/dns_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,27 @@ func (d *dns) eventHandler(event *types.Event) {
}

// Update advanced metrics.
f := utils.ToFlow(int64(event.Timestamp), net.ParseIP(event.SrcIP),
net.ParseIP(event.DstIP), uint32(event.SrcPort), uint32(event.DstPort),
fl := utils.ToFlow(
int64(event.Timestamp),
net.ParseIP(event.SrcIP),
net.ParseIP(event.DstIP),
uint32(event.SrcPort),
uint32(event.DstPort),
uint8(common.ProtocolToFlow(event.Protocol)),
dir, utils.Verdict_DNS)
utils.AddDnsInfo(f, string(event.Qr), common.RCodeToFlow(event.Rcode), event.DNSName, []string{event.QType}, event.NumAnswers, event.Addresses)
// d.l.Debug("DNS Flow", zap.Any("flow", f))
dir,
utils.Verdict_DNS,
)

meta := &utils.RetinaMetadata{}

utils.AddDNSInfo(fl, meta, string(event.Qr), common.RCodeToFlow(event.Rcode), event.DNSName, []string{event.QType}, event.NumAnswers, event.Addresses)

// Add metadata to the flow.
utils.AddRetinaMetadata(fl, meta)

ev := (&v1.Event{
Event: f,
Timestamp: f.Time,
Event: fl,
Timestamp: fl.GetTime(),
})
if d.enricher != nil {
d.enricher.Write(ev)
Expand Down
14 changes: 7 additions & 7 deletions pkg/plugin/dns/dns_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ type EventMatcher struct {

func (m *EventMatcher) Matches(x interface{}) bool {
inputFlow := x.(*v1.Event).Event.(*flow.Flow)
expectedDns, expectedDnsType, expectedNumResponses := utils.GetDns(inputFlow)
return expectedDns != nil &&
expectedDns.Rcode == m.rCode &&
expectedDns.Query == m.query &&
reflect.DeepEqual(expectedDns.Ips, m.ips) &&
reflect.DeepEqual(expectedDns.Qtypes, m.qTypes) &&
expectedDnsType == m.qType &&
expectedDNS, expectedDNSType, expectedNumResponses := utils.GetDNS(inputFlow)
return expectedDNS != nil &&
expectedDNS.GetRcode() == m.rCode &&
expectedDNS.GetQuery() == m.query &&
reflect.DeepEqual(expectedDNS.GetIps(), m.ips) &&
reflect.DeepEqual(expectedDNS.GetQtypes(), m.qTypes) &&
expectedDNSType == m.qType &&
expectedNumResponses == m.numAnswers
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/plugin/dropreason/dropreason_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,16 @@ func (dr *dropReason) processRecord(ctx context.Context, id int) {
continue
}

// Add drop reason to the flow.
utils.AddDropReason(fl, dropKey.DropType)
meta := &utils.RetinaMetadata{}

// Add packet size to the flow.
utils.AddPacketSize(fl, bpfEvent.SkbLen)
// Add drop reason to the flow's metadata.
utils.AddDropReason(fl, meta, dropKey.DropType)

// Add packet size to the flow's metadata.
utils.AddPacketSize(meta, bpfEvent.SkbLen)

// Add metadata to the flow.
utils.AddRetinaMetadata(fl, meta)

// This is only for development purposes.
// Removing this makes logs way too chatter-y.
Expand All @@ -382,7 +387,7 @@ func (dr *dropReason) processRecord(ctx context.Context, id int) {
// Write the event to the enricher.
ev := &hubblev1.Event{
Event: fl,
Timestamp: fl.Time,
Timestamp: fl.GetTime(),
}
if dr.enricher != nil {
dr.enricher.Write(ev)
Expand Down
Binary file removed pkg/plugin/packetparser/packetparser_bpf.o
Binary file not shown.
18 changes: 13 additions & 5 deletions pkg/plugin/packetparser/packetparser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,25 +547,33 @@ func (p *packetParser) processRecord(ctx context.Context, id int) {
p.l.Warn("Could not convert bpfEvent to flow", zap.Any("bpfEvent", bpfEvent))
continue
}
utils.AddPacketSize(fl, bpfEvent.Bytes)

meta := &utils.RetinaMetadata{}

// Add packet size to the flow's metadata.
utils.AddPacketSize(meta, bpfEvent.Bytes)

// Add the TCP metadata to the flow.
tcpMetadata := bpfEvent.TcpMetadata
utils.AddTcpFlags(fl, tcpMetadata.Syn, tcpMetadata.Ack, tcpMetadata.Fin, tcpMetadata.Rst, tcpMetadata.Psh, tcpMetadata.Urg)
utils.AddTCPFlags(fl, tcpMetadata.Syn, tcpMetadata.Ack, tcpMetadata.Fin, tcpMetadata.Rst, tcpMetadata.Psh, tcpMetadata.Urg)

// For packets originating from node, we use tsval as the tcpID.
// Packets coming back has the tsval echoed in tsecr.
if fl.TraceObservationPoint == flow.TraceObservationPoint_TO_NETWORK {
utils.AddTcpID(fl, uint64(tcpMetadata.Tsval))
utils.AddTCPID(meta, uint64(tcpMetadata.Tsval))
} else if fl.TraceObservationPoint == flow.TraceObservationPoint_FROM_NETWORK {
utils.AddTcpID(fl, uint64(tcpMetadata.Tsecr))
utils.AddTCPID(meta, uint64(tcpMetadata.Tsecr))
}

// Add metadata to the flow.
utils.AddRetinaMetadata(fl, meta)

p.l.Debug("Received packet", zap.Any("flow", fl))

// Write the event to the enricher.
ev := &v1.Event{
Event: fl,
Timestamp: fl.Time,
Timestamp: fl.GetTime(),
}
if p.enricher != nil {
p.enricher.Write(ev)
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/tcpretrans/tcpretrans_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (t *tcpretrans) eventHandler(event *types.Event) {
return
}
syn, ack, fin, rst, psh, urg := getTcpFlags(event.Tcpflags)
utils.AddTcpFlags(fl, syn, ack, fin, rst, psh, urg)
utils.AddTCPFlags(fl, syn, ack, fin, rst, psh, urg)

// This is only for development purposes.
// Removing this makes logs way too chatter-y.
Expand Down
Loading

0 comments on commit cf2e6d0

Please sign in to comment.