Skip to content

Commit

Permalink
Don't buffer packets in channels
Browse files Browse the repository at this point in the history
Instead, buffer packet headers in in-memory files.

Fixes #20
  • Loading branch information
pboothe committed Nov 19, 2019
1 parent dfcd007 commit af14682
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 32 deletions.
55 changes: 28 additions & 27 deletions saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,9 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) {
derivedCtx, derivedCancel := context.WithTimeout(ctx, duration)
defer derivedCancel()

// First read the UUID
t.state.Set("uuidwait")
var uuidEvent UUIDEvent
var ok bool
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
log.Println("UUID channel closed, PCAP capture cancelled with no UUID")
t.error("uuidchan")
return
}
case <-ctx.Done():
log.Println("Context cancelled, PCAP capture cancelled with no UUID")
t.error("uuid")
return
}

// Create a file and directory based on the UUID and the time.
t.state.Set("dircreation")
dir, fname := filename(t.dir, uuidEvent)
err := os.MkdirAll(dir, 0777)
if err != nil {
log.Println("Could not create directory", dir, err)
t.error("mkdir")
return
}
buff := &bytes.Buffer{}

// Write PCAP data to the new file.
// Write PCAP data to the buffer.
w := pcapgo.NewWriterNanos(buff)
// Now save packets until the stream is done or the context is canceled.
t.state.Set("readingpackets")
Expand Down Expand Up @@ -194,6 +168,33 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) {
t.savePacket(w, p, headerLen)
}

// Read the UUID to determine the filename
t.state.Set("uuidwait")
var uuidEvent UUIDEvent
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
log.Println("UUID channel closed, PCAP capture cancelled with no UUID")
t.error("uuidchan")
return
}
case <-ctx.Done():
log.Println("Context cancelled, PCAP capture cancelled with no UUID")
t.error("uuid")
return
}
// uuidEvent is now set to a good value.

// Create a file and directory based on the UUID and the time.
t.state.Set("dircreation")
dir, fname := filename(t.dir, uuidEvent)
err := os.MkdirAll(dir, 0777)
if err != nil {
log.Println("Could not create directory", dir, err)
t.error("mkdir")
return
}

t.state.Set("savingfile")
fullFilename := path.Join(dir, fname)
err = ioutil.WriteFile(fullFilename, buff.Bytes(), 0664)
Expand Down
35 changes: 30 additions & 5 deletions saver/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestSaverDryRun(t *testing.T) {
s.start(ctx, 10*time.Second) // Give the disk IO 10 seconds to happen.
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "nopacketserror"},
past: []string{"notstarted", "readingpackets", "nopacketserror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -120,12 +120,19 @@ func TestSaverNoUUID(t *testing.T) {
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
s.start(ctx, 10*time.Second)
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "uuiderror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "uuiderror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -144,11 +151,19 @@ func TestSaverNoUUIDClosedUUIDChan(t *testing.T) {
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)
close(s.UUIDchan)

s.start(context.Background(), 10*time.Second)
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "uuidchanerror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "uuidchanerror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -169,11 +184,20 @@ func TestSaverCantMkdir(t *testing.T) {
s.state = &tracker

s.UUIDchan <- UUIDEvent{"testUUID", time.Now()}

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)

s.start(context.Background(), 10*time.Second)

expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "mkdirerror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "mkdirerror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand Down Expand Up @@ -203,14 +227,15 @@ func TestSaverCantCreate(t *testing.T) {
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
s.start(ctx, 100*time.Millisecond)

expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "savingfile", "filewriteerror", "discardingpackets"},
past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "savingfile", "filewriteerror", "discardingpackets"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand Down

0 comments on commit af14682

Please sign in to comment.