Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pcapng): Read interface name from pcapng file and optimize interface handling when writing #165

Merged
merged 4 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions bpf/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ func (b *BPF) PullNewNetDeviceEvents(ctx context.Context, chanSize int) (<-chan
go func() {
defer close(ch)
defer reader.Close()
b.handleNetNetDeviceEvents(ctx, reader, ch)
b.handleNewNetDeviceEvents(ctx, reader, ch)
}()

return ch, nil
}

func (b *BPF) handleNetNetDeviceEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfNewNetdeviceEventT) {
func (b *BPF) handleNewNetDeviceEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfNewNetdeviceEventT) {
for {
select {
case <-ctx.Done():
Expand All @@ -240,12 +240,12 @@ func (b *BPF) handleNetNetDeviceEvents(ctx context.Context, reader *perf.Reader,
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read new net device event failed: %s", err)
continue
}
event, err := parseNewNetDeviceEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse new net device event failed: %s", err)
} else {
ch <- *event
dev := event.Dev
Expand Down Expand Up @@ -307,12 +307,12 @@ func (b *BPF) handleNetDeviceChangeEvents(ctx context.Context, reader *perf.Read
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read net device change event failed: %s", err)
continue
}
event, err := parseNetDeviceChangeEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse net device change event failed: %s", err)
} else {
ch <- *event
oldDev := event.OldDevice
Expand Down Expand Up @@ -376,12 +376,12 @@ func (b *BPF) handleMountEvents(ctx context.Context, reader *perf.Reader, ch cha
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read mount event failed: %s", err)
continue
}
event, err := parseMountEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse mount event failed: %s", err)
} else {
ch <- *event
log.Infof("new BpfMountEventT: (source %s, dest %s, fstype, %s)",
Expand Down
24 changes: 8 additions & 16 deletions cmd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,13 @@ func newPcapNgWriter(w io.Writer, pcache *metadata.ProcessCache, opts *Options)
return nil, fmt.Errorf(": %w", err)
}

// to avoid "Interface id 9 not present in section (have only 7 interfaces)"
maxIndex := 0
for _, dev := range devices.Devs() {
if dev.Ifindex > maxIndex {
maxIndex = dev.Ifindex
}
}
interfaces := make([]pcapgo.NgInterface, maxIndex+1)
for _, dev := range devices.Devs() {
interfaces[dev.Ifindex] = metadata.NewNgInterface(dev, opts.pcapFilter)
}
for i, iface := range interfaces {
if iface.Index == 0 {
interfaces[i] = metadata.NewDummyNgInterface(i)
}
interfaceIds := map[string]int{}
interfaces := []pcapgo.NgInterface{metadata.NewDummyNgInterface(0, opts.pcapFilter)}
for i, dev := range devices.Devs() {
index := i + 1
intf := metadata.NewNgInterface(dev, opts.pcapFilter, index)
interfaces = append(interfaces, intf)
interfaceIds[dev.Key()] = index
}

pcapNgWriter, err := pcapgo.NewNgWriterInterface(w, interfaces[0], pcapgo.NgWriterOptions{
Expand All @@ -115,7 +107,7 @@ func newPcapNgWriter(w io.Writer, pcache *metadata.ProcessCache, opts *Options)
return nil, fmt.Errorf("writing pcapNg header: %w", err)
}

wt := writer.NewPcapNGWriter(pcapNgWriter, pcache, interfaces).WithPcapFilter(opts.pcapFilter)
wt := writer.NewPcapNGWriter(pcapNgWriter, pcache, interfaceIds).WithPcapFilter(opts.pcapFilter)
return wt, nil
}

Expand Down
44 changes: 27 additions & 17 deletions internal/metadata/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,29 @@ func (d *DeviceCache) GetByIfindex(ifindex int, netNsInode uint32) (types.Device
ns = types.NewNetNsWithInode(netNsInode)
}

devs, ok := d.allLinks[netNsInode]
if !ok {
for _, links := range d.allLinks {
for _, dev := range links {
devs = append(devs, dev)
for inode, links := range d.allLinks {
if netNsInode != inode {
continue
}
for _, dev := range links {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
}
}
}
for _, dev := range devs {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
for _, links := range d.allLinks {
for _, dev := range links {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
}
}
}

Expand Down Expand Up @@ -193,13 +201,14 @@ func (d *DeviceCache) getAllLinks(inode uint32) ([]net.Interface, error) {
return d.allLinks[inode], nil
}

func NewNgInterface(dev types.Device, filter string) pcapgo.NgInterface {
comment := ""
func NewNgInterface(dev types.Device, filter string, index int) pcapgo.NgInterface {
comment := fmt.Sprintf("ifIndex: %d", dev.Ifindex)
if dev.NetNs != nil {
comment = fmt.Sprintf("netNsInode: %d, netNsPath: %s", dev.NetNs.Inode(), dev.NetNs.Path())
comment = fmt.Sprintf("%s, netNsInode: %d, netNsPath: %s",
comment, dev.NetNs.Inode(), dev.NetNs.Path())
}
return pcapgo.NgInterface{
Index: dev.Ifindex,
Index: index,
Name: dev.Name,
Comment: comment,
Filter: filter,
Expand All @@ -210,10 +219,11 @@ func NewNgInterface(dev types.Device, filter string) pcapgo.NgInterface {
}
}

func NewDummyNgInterface(index int) pcapgo.NgInterface {
func NewDummyNgInterface(index int, filter string) pcapgo.NgInterface {
return pcapgo.NgInterface{
Index: index,
Name: fmt.Sprintf("dummy-%d", index),
Filter: filter,
OS: runtime.GOOS,
LinkType: layers.LinkTypeEthernet,
SnapLength: uint32(math.MaxUint16),
Expand Down
13 changes: 13 additions & 0 deletions internal/parser/pcapng.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parser

import (
"github.com/mozillazg/ptcpdump/internal/types"
"io"

"github.com/gopacket/gopacket/pcapgo"
Expand Down Expand Up @@ -29,12 +30,24 @@ func (p *PcapNGParser) Parse() (*event.Packet, error) {
if err != nil {
return nil, err
}

e, err := event.FromPacket(ci, data)
if err != nil {
return nil, err
}

interf, err := p.r.Interface(ci.InterfaceIndex)
if err == nil {
e.Device = types.Device{
Name: interf.Name,
Ifindex: 0,
NetNs: nil,
}
}

exec, ctx := event.FromPacketOptions(opts)
e.Pid = exec.Pid
p.pcache.AddItemWithContext(exec, ctx)

return e, nil
}
10 changes: 5 additions & 5 deletions internal/types/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewInterfaces() *Interfaces {
}

func (i *Interfaces) Add(dev Device) {
k := i.key(dev)
k := dev.Key()
i.devs[k] = dev
}

Expand All @@ -45,10 +45,6 @@ func (i *Interfaces) Devs() []Device {
return devs
}

func (i *Interfaces) key(dev Device) string {
return fmt.Sprintf("%d.%d", dev.NetNs.Inode(), dev.Ifindex)
}

func (i *Interfaces) GetByIfindex(index int) Device {
for _, v := range i.devs {
if v.Ifindex == index {
Expand All @@ -58,6 +54,10 @@ func (i *Interfaces) GetByIfindex(index int) Device {
return Device{}
}

func (d *Device) Key() string {
return fmt.Sprintf("%d.%d", d.NetNs.Inode(), d.Ifindex)
}

func (d *Device) String() string {
return fmt.Sprintf("{Device ifindex: %d, name: %s, ns: %s}", d.Ifindex, d.Name, d.NetNs)
}
3 changes: 3 additions & 0 deletions internal/types/netns.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,8 @@ func (n *NetNs) Path() string {
}

func (n *NetNs) Inode() uint32 {
if n == nil {
return 0
}
return n.inode
}
59 changes: 38 additions & 21 deletions internal/writer/pcapng.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"github.com/mozillazg/ptcpdump/internal/types"
"strings"
"sync"

"github.com/gopacket/gopacket"
Expand All @@ -14,19 +15,19 @@ import (
)

type PcapNGWriter struct {
pw *pcapgo.NgWriter
pcache *metadata.ProcessCache
interfaces []pcapgo.NgInterface
pcapFilter string
pw *pcapgo.NgWriter
pcache *metadata.ProcessCache
interfaceIds map[string]int
pcapFilter string

noBuffer bool
lock sync.Mutex
lock sync.RWMutex
keylogs bytes.Buffer
}

func NewPcapNGWriter(pw *pcapgo.NgWriter, pcache *metadata.ProcessCache,
interfaces []pcapgo.NgInterface) *PcapNGWriter {
return &PcapNGWriter{pw: pw, pcache: pcache, interfaces: interfaces, lock: sync.Mutex{}}
interfaceIds map[string]int) *PcapNGWriter {
return &PcapNGWriter{pw: pw, pcache: pcache, interfaceIds: interfaceIds, lock: sync.RWMutex{}}
}

func (w *PcapNGWriter) Write(e *event.Packet) error {
Expand All @@ -35,7 +36,7 @@ func (w *PcapNGWriter) Write(e *event.Packet) error {
Timestamp: e.Time.Local(),
CaptureLength: payloadLen,
Length: e.Len,
InterfaceIndex: e.Device.Ifindex,
InterfaceIndex: w.getInterfaceIndex(e.Device),
}
p := w.pcache.Get(e.Pid, e.MntNs, e.NetNs, e.CgroupName)

Expand Down Expand Up @@ -109,24 +110,40 @@ func (w *PcapNGWriter) AddDev(dev types.Device) {
w.lock.Lock()
defer w.lock.Unlock()

log.Infof("new dev: %+v, currLen: %d", dev, len(w.interfaces))
if len(w.interfaces) > dev.Ifindex {
log.Infof("new dev: %+v, currLen: %d", dev, len(w.interfaceIds))
key := dev.Key()
if w.interfaceIds[key] > 0 {
return
}

for i := len(w.interfaces); i <= dev.Ifindex; i++ {
var intf pcapgo.NgInterface
if i == dev.Ifindex {
intf = metadata.NewNgInterface(dev, w.pcapFilter)
} else {
intf = metadata.NewDummyNgInterface(i)
}
log.Debugf("add interface: %+v", intf)
if _, err := w.pw.AddInterface(intf); err != nil {
log.Errorf("error adding interface %s: %+v", intf.Name, err)
index := len(w.interfaceIds) + 1
intf := metadata.NewNgInterface(dev, w.pcapFilter, index)
log.Infof("add interface: %+v", intf)

if _, err := w.pw.AddInterface(intf); err != nil {
log.Errorf("error adding interface %s: %+v", intf.Name, err)
}

w.interfaceIds[key] = index
}

func (w *PcapNGWriter) getInterfaceIndex(dev types.Device) int {
w.lock.RLock()
defer w.lock.RUnlock()

log.Infof("interfaceIds: %+v, dev: %+v", w.interfaceIds, dev)

index := w.interfaceIds[dev.Key()]
if index > 0 {
return index
}
suffix := fmt.Sprintf(".%d", dev.Ifindex)
for k, index := range w.interfaceIds {
if strings.HasSuffix(k, suffix) {
return index
}
w.interfaces = append(w.interfaces, intf)
}
return 0
}

func (w *PcapNGWriter) WithPcapFilter(filter string) *PcapNGWriter {
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_arp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function test_tcpdump_read() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" > "${RNAME}"
diff "${EXPECT_NAME}" "${RNAME}"
}
Expand Down
6 changes: 4 additions & 2 deletions testdata/test_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ function test_tcpdump_read() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" > "${RNAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" |tee "${RNAME}"
diff "${EXPECT_NAME}" "${RNAME}"

${CMD} -r ${FNAME}
}

function main() {
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 5 1.1.1.1.* $cid2"
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd_container_id_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
}
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd_container_name_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
}
Expand Down
Loading
Loading