From af1468226de543fc8593ee8d160f7a9bbe3aea2b Mon Sep 17 00:00:00 2001 From: Peter Boothe Date: Tue, 19 Nov 2019 14:23:00 -0500 Subject: [PATCH] Don't buffer packets in channels Instead, buffer packet headers in in-memory files. Fixes #20 --- saver/tcp.go | 55 ++++++++++++++++++++++++----------------------- saver/tcp_test.go | 35 +++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/saver/tcp.go b/saver/tcp.go index 5dd931c..d372acf 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -132,35 +132,9 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { derivedCtx, derivedCancel := context.WithTimeout(ctx, duration) defer derivedCancel() - // First read the UUID - t.state.Set("uuidwait") - var uuidEvent UUIDEvent - var ok bool - select { - case uuidEvent, ok = <-t.uuidchanRead: - if !ok { - log.Println("UUID channel closed, PCAP capture cancelled with no UUID") - t.error("uuidchan") - return - } - case <-ctx.Done(): - log.Println("Context cancelled, PCAP capture cancelled with no UUID") - t.error("uuid") - return - } - - // Create a file and directory based on the UUID and the time. - t.state.Set("dircreation") - dir, fname := filename(t.dir, uuidEvent) - err := os.MkdirAll(dir, 0777) - if err != nil { - log.Println("Could not create directory", dir, err) - t.error("mkdir") - return - } buff := &bytes.Buffer{} - // Write PCAP data to the new file. + // Write PCAP data to the buffer. w := pcapgo.NewWriterNanos(buff) // Now save packets until the stream is done or the context is canceled. t.state.Set("readingpackets") @@ -194,6 +168,33 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) { t.savePacket(w, p, headerLen) } + // Read the UUID to determine the filename + t.state.Set("uuidwait") + var uuidEvent UUIDEvent + select { + case uuidEvent, ok = <-t.uuidchanRead: + if !ok { + log.Println("UUID channel closed, PCAP capture cancelled with no UUID") + t.error("uuidchan") + return + } + case <-ctx.Done(): + log.Println("Context cancelled, PCAP capture cancelled with no UUID") + t.error("uuid") + return + } + // uuidEvent is now set to a good value. + + // Create a file and directory based on the UUID and the time. + t.state.Set("dircreation") + dir, fname := filename(t.dir, uuidEvent) + err := os.MkdirAll(dir, 0777) + if err != nil { + log.Println("Could not create directory", dir, err) + t.error("mkdir") + return + } + t.state.Set("savingfile") fullFilename := path.Join(dir, fname) err = ioutil.WriteFile(fullFilename, buff.Bytes(), 0664) diff --git a/saver/tcp_test.go b/saver/tcp_test.go index 068e890..929fe00 100644 --- a/saver/tcp_test.go +++ b/saver/tcp_test.go @@ -101,7 +101,7 @@ func TestSaverDryRun(t *testing.T) { s.start(ctx, 10*time.Second) // Give the disk IO 10 seconds to happen. expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "nopacketserror"}, + past: []string{"notstarted", "readingpackets", "nopacketserror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -120,12 +120,19 @@ func TestSaverNoUUID(t *testing.T) { tracker := statusTracker{status: s.state.Get()} s.state = &tracker + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() s.start(ctx, 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "uuiderror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "uuiderror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -144,11 +151,19 @@ func TestSaverNoUUIDClosedUUIDChan(t *testing.T) { tracker := statusTracker{status: s.state.Get()} s.state = &tracker + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + close(s.Pchan) close(s.UUIDchan) + s.start(context.Background(), 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "uuidchanerror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "uuidchanerror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -169,11 +184,20 @@ func TestSaverCantMkdir(t *testing.T) { s.state = &tracker s.UUIDchan <- UUIDEvent{"testUUID", time.Now()} + + h, err := pcap.OpenOffline("../testdata/v4.pcap") + rtx.Must(err, "Could not open v4.pcap") + ps := gopacket.NewPacketSource(h, h.LinkType()) + for p := range ps.Packets() { + s.Pchan <- p + } + close(s.Pchan) + s.start(context.Background(), 10*time.Second) expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "mkdirerror"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "mkdirerror"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected) @@ -203,6 +227,7 @@ func TestSaverCantCreate(t *testing.T) { for p := range ps.Packets() { s.Pchan <- p } + close(s.Pchan) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -210,7 +235,7 @@ func TestSaverCantCreate(t *testing.T) { expected := statusTracker{ status: "stopped", - past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "savingfile", "filewriteerror", "discardingpackets"}, + past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "savingfile", "filewriteerror", "discardingpackets"}, } if !reflect.DeepEqual(&tracker, &expected) { t.Errorf("%+v != %+v", &tracker, &expected)