Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logs, memory usage, header size #21

Merged
merged 5 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -74,6 +75,7 @@ func main() {
// interfaces with a running capture is tracked in the
// pcap_muxer_interfaces_with_captures metric.
if len(interfaces) == 0 {
log.Println("No interfaces specified, will listen for packets on all available interfaces.")
ifaces, err := net.Interfaces()
rtx.Must(err, "Could not list interfaces")
for _, iface := range ifaces {
Expand Down Expand Up @@ -105,6 +107,7 @@ func main() {
cleanupWG.Add(1)
go func() {
eventsocket.MustRun(mainCtx, *eventSocket, h)
mainCancel()
cleanupWG.Done()
}()

Expand All @@ -115,6 +118,7 @@ func main() {
cleanupWG.Add(1)
go func() {
muxer.MustCaptureTCPOnInterfaces(mainCtx, interfaces, packets, pcapOpenLive, int32(*maxHeaderSize))
mainCancel()
cleanupWG.Done()
}()

Expand Down
81 changes: 48 additions & 33 deletions saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,9 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) {
derivedCtx, derivedCancel := context.WithTimeout(ctx, duration)
defer derivedCancel()

// First read the UUID
t.state.Set("uuidwait")
var uuidEvent UUIDEvent
var ok bool
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
log.Println("UUID channel closed, PCAP capture cancelled with no UUID")
t.error("uuidchan")
return
}
case <-ctx.Done():
log.Println("Context cancelled, PCAP capture cancelled with no UUID")
t.error("uuid")
return
}

// Create a file and directory based on the UUID and the time.
t.state.Set("dircreation")
dir, fname := filename(t.dir, uuidEvent)
err := os.MkdirAll(dir, 0777)
if err != nil {
log.Println("Could not create directory", dir, err)
t.error("mkdir")
return
}
buff := &bytes.Buffer{}

// Write PCAP data to the new file.
// Write PCAP data to the buffer.
w := pcapgo.NewWriterNanos(buff)
// Now save packets until the stream is done or the context is canceled.
t.state.Set("readingpackets")
Expand All @@ -172,16 +146,28 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) {
}
headerLen := len(p.Data())
// Now we try to discover the correct header length for the flow by
// discovering the size of the application layer and then subtracting it
// from the overall size of the packet data. IPv6 supports variable-length
// headers (unlike IPv4, where the length of the IPv4 header is
// well-defined), so this is actually required.
// discovering the size of everything before the transport layer, then
// adding that size and 60 bytes for the TCP header
// (https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_segment_structure).
// IPv6 supports variable-length headers (unlike IPv4, where the length of
// the IPv4 header is well-defined), so this is actually required as opposed
// to just choosing the right value as a commandline parameter.
//
// This algorithm assumes that IPv6 header lengths are stable for a given
// flow.
tl := p.TransportLayer()
if tl != nil {
headerLen -= len(tl.LayerPayload())
// "LayerContents" == the TCP header
// (I don't know why it's not "LayerHeader")
// "LayerPayload" == everything contained within the transport (TCP)
// layer that is not the header (including all bytes for all subsequent
// layers)
//
// So, the data we want to save is: the complete packet before the TCP
// layer, plus the maximum size of a TCP header. We calculate this size
// by subtracting the actual TCP header and payload lengths from the
// overall packet size and then adding 60.
headerLen = len(p.Data()) - len(tl.LayerContents()) - len(tl.LayerPayload()) + 60
}
// Write out the header and the first packet.
w.WriteFileHeader(uint32(headerLen), layers.LinkTypeEthernet)
Expand All @@ -194,11 +180,40 @@ func (t *TCP) start(ctx context.Context, duration time.Duration) {
t.savePacket(w, p, headerLen)
}

// Read the UUID to determine the filename
t.state.Set("uuidwait")
var uuidEvent UUIDEvent
select {
case uuidEvent, ok = <-t.uuidchanRead:
if !ok {
log.Println("UUID channel closed, PCAP capture cancelled with no UUID")
t.error("uuidchan")
return
}
case <-ctx.Done():
log.Println("Context cancelled, PCAP capture cancelled with no UUID")
t.error("uuid")
return
}
// uuidEvent is now set to a good value.

// Create a file and directory based on the UUID and the time.
t.state.Set("dircreation")
dir, fname := filename(t.dir, uuidEvent)
err := os.MkdirAll(dir, 0777)
if err != nil {
log.Println("Could not create directory", dir, err)
t.error("mkdir")
return
}

t.state.Set("savingfile")
err = ioutil.WriteFile(path.Join(dir, fname), buff.Bytes(), 0664)
fullFilename := path.Join(dir, fname)
err = ioutil.WriteFile(fullFilename, buff.Bytes(), 0664)
if err != nil {
t.error("filewrite")
}
log.Println("Successfully wrote", fullFilename)

t.state.Set("discardingpackets")
// Now read until the channel is closed or the passed-in context is cancelled.
Expand Down
35 changes: 30 additions & 5 deletions saver/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestSaverDryRun(t *testing.T) {
s.start(ctx, 10*time.Second) // Give the disk IO 10 seconds to happen.
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "nopacketserror"},
past: []string{"notstarted", "readingpackets", "nopacketserror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -120,12 +120,19 @@ func TestSaverNoUUID(t *testing.T) {
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
s.start(ctx, 10*time.Second)
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "uuiderror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "uuiderror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -144,11 +151,19 @@ func TestSaverNoUUIDClosedUUIDChan(t *testing.T) {
tracker := statusTracker{status: s.state.Get()}
s.state = &tracker

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)
close(s.UUIDchan)

s.start(context.Background(), 10*time.Second)
expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "uuidchanerror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "uuidchanerror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand All @@ -169,11 +184,20 @@ func TestSaverCantMkdir(t *testing.T) {
s.state = &tracker

s.UUIDchan <- UUIDEvent{"testUUID", time.Now()}

h, err := pcap.OpenOffline("../testdata/v4.pcap")
rtx.Must(err, "Could not open v4.pcap")
ps := gopacket.NewPacketSource(h, h.LinkType())
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)

s.start(context.Background(), 10*time.Second)

expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "mkdirerror"},
past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "mkdirerror"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand Down Expand Up @@ -203,14 +227,15 @@ func TestSaverCantCreate(t *testing.T) {
for p := range ps.Packets() {
s.Pchan <- p
}
close(s.Pchan)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
s.start(ctx, 100*time.Millisecond)

expected := statusTracker{
status: "stopped",
past: []string{"notstarted", "uuidwait", "dircreation", "readingpackets", "savingfile", "filewriteerror", "discardingpackets"},
past: []string{"notstarted", "readingpackets", "uuidwait", "dircreation", "savingfile", "filewriteerror", "discardingpackets"},
}
if !reflect.DeepEqual(&tracker, &expected) {
t.Errorf("%+v != %+v", &tracker, &expected)
Expand Down