From dfcd0072394c596997690a1189793ba6e143dfd4 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 12:22:02 -0500 Subject: [PATCH 1/5] Log successes as well as failures --- main.go | 4 ++++ saver/tcp.go | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 41cbb84..7cf7a93 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "log" "net" "os" "os/signal" @@ -74,6 +75,7 @@ func main() { // interfaces with a running capture is tracked in the // pcap_muxer_interfaces_with_captures metric. if len(interfaces) == 0 { + log.Println("No interfaces specified, will listen for packets on all available interfaces.") ifaces, err := net.Interfaces() rtx.Must(err, "Could not list interfaces") for _, iface := range ifaces { @@ -105,6 +107,7 @@ func main() { cleanupWG.Add(1) go func() { eventsocket.MustRun(mainCtx, *eventSocket, h) + mainCancel() cleanupWG.Done() }() @@ -115,6 +118,7 @@ func main() { cleanupWG.Add(1) go func() { muxer.MustCaptureTCPOnInterfaces(mainCtx, interfaces, packets, pcapOpenLive, int32(*maxHeaderSize)) + mainCancel() cleanupWG.Done() }() diff --git a/saver/tcp.go b/saver/tcp.go index bccc06c..5dd931c 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -195,10 +195,12 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { } t.state.Set("savingfile") - err = ioutil.WriteFile(path.Join(dir, fname), buff.Bytes(), 0664) + fullFilename := path.Join(dir, fname) + err = ioutil.WriteFile(fullFilename, buff.Bytes(), 0664) if err != nil { t.error("filewrite") } + log.Println("Successfully wrote", fullFilename) t.state.Set("discardingpackets") // Now read until the channel is closed or the passed-in context is cancelled. From af1468226de543fc8593ee8d160f7a9bbe3aea2b Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 14:23:00 -0500 Subject: [PATCH 2/5] Don't buffer packets in channels Instead, buffer packet headers in in-memory files. Fixes #20 --- saver/tcp.go | 55 ++++++++++++++++++++++++----------------------- saver/tcp_test.go | 35 +++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/saver/tcp.go b/saver/tcp.go index 5dd931c..d372acf 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -132,35 +132,9 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { derivedCtx, derivedCancel := context.WithTimeout(ctx, duration) defer derivedCancel() - // First read the UUID - t.state.Set("uuidwait") - var uuidEvent UUIDEvent - var ok bool - select { - case uuidEvent, ok = <-t.uuidchanRead: - if !ok { - log.Println("UUID channel closed, PCAP capture cancelled with no UUID") - t.error("uuidchan") - return - } - case <-ctx.Done(): - log.Println("Context cancelled, PCAP capture cancelled with no UUID") - t.error("uuid") - return - } - - // Create a file and directory based on the UUID and the time. - t.state.Set("dircreation") - dir, fname := filename(t.dir, uuidEvent) - err := os.MkdirAll(dir, 0777) - if err != nil { - log.Println("Could not create directory", dir, err) - t.error("mkdir") - return - } buff := &bytes.Buffer{} - // Write PCAP data to the new file. + // Write PCAP data to the buffer. w := pcapgo.NewWriterNanos(buff) // Now save packets until the stream is done or the context is canceled. t.state.Set("readingpackets") @@ -194,6 +168,33 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { t.savePacket(w, p, headerLen) } + // Read the UUID to determine the filename + t.state.Set("uuidwait") + var uuidEvent UUIDEvent + select { + case uuidEvent, ok = <-t.uuidchanRead: + if !ok { + log.Println("UUID channel closed, PCAP capture cancelled with no UUID") + t.error("uuidchan") + return + } + case <-ctx.Done(): + log.Println("Context cancelled, PCAP capture cancelled with no UUID") + t.error("uuid") + return + } + // uuidEvent is now set to a good value. + + // Create a file and directory based on the UUID and the time. + t.state.Set("dircreation") + dir, fname := filename(t.dir, uuidEvent) + err := os.MkdirAll(dir, 0777) + if err != nil { + log.Println("Could not create directory", dir, err) + t.error("mkdir") + return + } + t.state.Set("savingfile") fullFilename := path.Join(dir, fname) err = ioutil.WriteFile(fullFilename, buff.Bytes(), 0664) diff --git a/saver/tcp_test.go b/saver/tcp_test.go index 068e890..929fe00 100644 --- a/saver/tcp_test.go +++ b/saver/tcp_test.go @@ -101,7 +101,7 @@ func TestSaverDryRun(t *testing.T) { s.start(ctx, 10*time.Second) // Give the disk IO 10 seconds to happen. expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "nopacketserror"}, + past: []string{"notstarted", "readingpackets", "nopacketserror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -120,12 +120,19 @@ func TestSaverNoUUID(t *testing.T) { tracker := statusTracker{status: s.state.Get()} s.state = &tracker + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() s.start(ctx, 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "uuiderror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "uuiderror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -144,11 +151,19 @@ func TestSaverNoUUIDClosedUUIDChan(t *testing.T) { tracker := statusTracker{status: s.state.Get()} s.state = &tracker + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + close(s.Pchan) close(s.UUIDchan) + s.start(context.Background(), 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "uuidchanerror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "uuidchanerror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -169,11 +184,20 @@ func TestSaverCantMkdir(t *testing.T) { s.state = &tracker s.UUIDchan <- UUIDEvent{"testUUID", time.Now()} + + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + close(s.Pchan) + s.start(context.Background(), 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "mkdirerror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "mkdirerror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -203,6 +227,7 @@ func TestSaverCantCreate(t *testing.T) { for p := range ps.Packets() { s.Pchan <- p } + close(s.Pchan) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -210,7 +235,7 @@ func TestSaverCantCreate(t *testing.T) { expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "savingfile", "filewriteerror", "discardingpackets"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "savingfile", "filewriteerror", "discardingpackets"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) From f1d4ab1e0eda17c2be4d8ad5f5c25fccebe04802 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 14:33:41 -0500 Subject: [PATCH 3/5] Fix header size. Fixes #18 --- saver/tcp.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/saver/tcp.go b/saver/tcp.go index d372acf..ed8e294 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -146,16 +146,18 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { } headerLen := len(p.Data()) // Now we try to discover the correct header length for the flow by - // discovering the size of the application layer and then subtracting it - // from the overall size of the packet data. IPv6 supports variable-length - // headers (unlike IPv4, where the length of the IPv4 header is - // well-defined), so this is actually required. + // discovering the size of everything before the transport layer, then + // adding that size and 60 bytes for the TCP header + // (https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure). + // IPv6 supports variable-length headers (unlike IPv4, where the length of + // the IPv4 header is well-defined), so this is actually required as opposed + // to just choosing the right value as a commandline parameter. // // This algorithm assumes that IPv6 header lengths are stable for a given // flow. tl := p.TransportLayer() if tl != nil { - headerLen -= len(tl.LayerPayload()) + headerLen = headerLen - len(tl.LayerContents()) - len(tl.LayerPayload()) + 60 } // Write out the header and the first packet. w.WriteFileHeader(uint32(headerLen), layers.LinkTypeEthernet) From 0d9dba0d5030d5e55abbbe1cb2930eac75bad092 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 15:20:58 -0500 Subject: [PATCH 4/5] Added a comment and clarified the code. --- saver/tcp.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/saver/tcp.go b/saver/tcp.go index ed8e294..7451196 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -157,7 +157,15 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { // flow. tl := p.TransportLayer() if tl != nil { - headerLen = headerLen - len(tl.LayerContents()) - len(tl.LayerPayload()) + 60 + // "LayerContents" == the header (I don't know why it's not "LayerHeader") + // "LayerPayload" == everything contained within this layer that is not + // the header (including all bytes for all subsequent layers) + // + // So, the data we want to save is: the complete packet before the TCP + // layer, plus the maximum size of a TCP header. We calculate this size + // by subtracting the actual TCP header and payload lengths from the + // overall packet size and then adding 60. + headerLen = len(p.Data()) - len(tl.LayerContents()) - len(tl.LayerPayload()) + 60 } // Write out the header and the first packet. w.WriteFileHeader(uint32(headerLen), layers.LinkTypeEthernet) From 89ed3c33c9c9dadf1158e00cba6f8d255c111837 Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 15:42:10 -0500 Subject: [PATCH 5/5] Clarified comment --- saver/tcp.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/saver/tcp.go b/saver/tcp.go index 7451196..959e7c8 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -157,9 +157,11 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { // flow. tl := p.TransportLayer() if tl != nil { - // "LayerContents" == the header (I don't know why it's not "LayerHeader") - // "LayerPayload" == everything contained within this layer that is not - // the header (including all bytes for all subsequent layers) + // "LayerContents" == the TCP header + // (I don't know why it's not "LayerHeader") + // "LayerPayload" == everything contained within the transport (TCP) + // layer that is not the header (including all bytes for all subsequent + // layers) // // So, the data we want to save is: the complete packet before the TCP // layer, plus the maximum size of a TCP header. We calculate this size