Skip to content

Commit

Permalink
Ensure all IPv4 addresses use only 4 bytes. (#30)
Browse files Browse the repository at this point in the history
* Ensure all IPv4 addresses use only 4 bytes.
* convert the eventsocket flag to the one provided in tcpinfo.eventsocket
  • Loading branch information
pboothe authored Dec 3, 2019
1 parent 432a143 commit 9f46cd3
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 6 deletions.
8 changes: 7 additions & 1 deletion demuxer/flowkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ func fromPacket(p gopacket.Packet) FlowKey {
return FlowKeyFrom4Tuple(ip1, ip1P, ip2, ip2P)
}

// FlowKeyFrom4Tuple creates a FlowKey (suitable for use as a map key) from a TCP 4-tuple.
// FlowKeyFrom4Tuple creates a FlowKey (suitable for use as a map key) from a
// TCP 4-tuple. This function is called once per packet, by the demuxer, and
// once per flow, by the tcpeventsocket handler. Because it is called once per
// packet, it should be as efficient as possible.
//
// IPv4 addresses passed in must be 4 bytes, because we do byte-based
// comparisons with sub-slices of packets retrieved from the wire.
func FlowKeyFrom4Tuple(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16) FlowKey {
srcIPS := string(srcIP)
dstIPS := string(dstIP)
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

var (
dir = flag.String("datadir", ".", "The directory to which data is written")
eventSocket = flag.String("eventsocket", "", "The absolute pathname of the unix-domain socket to which events will be posted.")
captureDuration = flag.Duration("captureduration", 30*time.Second, "Only save the first captureduration of each flow, to prevent long-lived flows from spamming the hard drive.")
uuidWaitDuration = flag.Duration("uuidwaitduration", 5*time.Second, "Wait up to uuidwaitduration for each flow before either assigning a UUID or discarding all future packets. This prevents buffering unsaveable packets.")
flowTimeout = flag.Duration("flowtimeout", 30*time.Second, "Once there have been no packets for a flow for at least flowtimeout, the flow can be assumed to be closed.")
Expand All @@ -45,6 +44,7 @@ var (

func init() {
flag.Var(&interfaces, "interface", "The interface on which to capture traffic. May be repeated. If unset, will capture on all available interfaces.")
log.SetFlags(log.Lshortfile | log.LstdFlags)
}

func catch(sig os.Signal) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func main() {
h := tcpinfohandler.New(mainCtx, tcpdm.UUIDChan)
cleanupWG.Add(1)
go func() {
eventsocket.MustRun(mainCtx, *eventSocket, h)
eventsocket.MustRun(mainCtx, *eventsocket.Filename, h)
mainCancel()
cleanupWG.Done()
}()
Expand Down
6 changes: 3 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func TestMainSmokeTest(t *testing.T) {
// server.
tcpiCtx, tcpiCancel := context.WithCancel(context.Background())
defer tcpiCancel()
*eventSocket = dir + "/tcpevents.sock"
tcpi := eventsocket.New(*eventSocket)
*eventsocket.Filename = dir + "/tcpevents.sock"
tcpi := eventsocket.New(*eventsocket.Filename)
tcpi.Listen()
go tcpi.Serve(tcpiCtx)

// Wait until the eventsocket appears.
for _, err := os.Stat(*eventSocket); err != nil; _, err = os.Stat(*eventSocket) {
for _, err := os.Stat(*eventsocket.Filename); err != nil; _, err = os.Stat(*eventsocket.Filename) {
}

// Tests are unlikely to have enough privileges to open packet captures, so
Expand Down
9 changes: 9 additions & 0 deletions saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func (t *TCP) savePackets(ctx context.Context, uuidDelay, duration time.Duration
// Read the first packet to determine the TCP+IP header size (as IPv6 is variable in size)
p, ok := t.readPacket(derivedCtx)
if !ok {
// This error should never occur in production. It indicates a
// configuration error or a bug in packet-headers.
log.Println("PCAP capture cancelled with no packets for flow", t.id)
t.error("nopackets")
return
Expand Down Expand Up @@ -200,6 +202,13 @@ func (t *TCP) savePackets(ctx context.Context, uuidDelay, duration time.Duration
// Read the UUID to determine the filename
t.state.Set("uuidwait")
var uuidEvent UUIDEvent
// The error conditions below are expected to occur in production. In
// particular, every flow that existed prior to the start of the start of
// the packet-headers binary will cause this error at least once.
//
// This error will also occur for long-lived flows that send packets so
// infrequently that the flow gets garbage-collected between packet
// arrivals.
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
Expand Down
8 changes: 8 additions & 0 deletions tcpinfohandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func (h *handler) Open(ctx context.Context, timestamp time.Time, uuid string, id
metrics.BadEventsFromTCPInfo.WithLabelValues("badip").Inc()
return
}
// Convert all IPv4 addresses to a 4-byte representation, as required by
// FlowKeyFrom4Tuple.
if srcIP.To4() != nil {
srcIP = srcIP.To4()
}
if dstIP.To4() != nil {
dstIP = dstIP.To4()
}
// Can't use a struct literal here due to embedding.
ev := demuxer.UUIDEvent{}
ev.Flow = demuxer.FlowKeyFrom4Tuple(srcIP, id.SPort, dstIP, id.DPort)
Expand Down

0 comments on commit 9f46cd3

Please sign in to comment.