Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flag -maxheap #46

Merged
merged 12 commits into from
Nov 29, 2022
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: go
go:
- 1.18
- 1.19

# From https://github.com/travis-ci/travis-ci/issues/8891#issuecomment-353403729
before_install:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build packet-headers
FROM golang:1.18-alpine3.16 as build
FROM golang:1.19-alpine3.16 as build
RUN apk --no-cache add libpcap-dev git gcc libc-dev
COPY . /go/src/github.com/m-lab/packet-headers
WORKDIR /go/src/github.com/m-lab/packet-headers
Expand Down
33 changes: 32 additions & 1 deletion demuxer/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type TCP struct {
anon anonymize.IPAnonymizer
dataDir string
stream bool
maxHeap uint64
}

type status interface {
Expand All @@ -60,6 +61,12 @@ func (promStatus) GC(stillPresent, discarded int) {
metrics.DemuxerSaverCount.Set(float64(stillPresent))
}

func currentHeapAboveThreshold(maxHeap uint64) bool {
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)
return (ms.HeapAlloc > maxHeap)
}

// GetSaver returns a saver with channels for packets and a uuid.
func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
// Read the flow from the flows map, the oldFlows map, or create it.
Expand All @@ -71,6 +78,21 @@ func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
if ok {
delete(d.oldFlows, flow)
} else {

// Only create a new Saver if we are below the configured maxHeap
// threshold. This condition is required because a SYN flood can
// cause packet-headers to allocate memory faster than partial
// connection timeouts allow the garbage collector to recover used
// memory. The result in that case would be system RAM exhaustion.
//
// NOTE: When we are above the maxHeap threshold, we may lose some
// legitimate measurements. This check is a load shedding strategy
// to keep the process running and prevent resource usage from
// packet-headers spilling over into other parts of the system.
if currentHeapAboveThreshold(d.maxHeap) {
metrics.SaversSkipped.Inc()
return nil
}
t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.Format(d.anon), d.stream)
}
d.currentFlows[flow] = t
Expand All @@ -88,6 +110,10 @@ func (d *TCP) savePacket(ctx context.Context, packet gopacket.Packet) {
}
// Send the packet to the saver.
s := d.getSaver(ctx, fromPacket(packet))
if s == nil {
metrics.MissedPackets.WithLabelValues("skipped").Inc()
return
}

// Don't block on channel write to the saver, but do note when it fails.
select {
Expand All @@ -100,6 +126,10 @@ func (d *TCP) savePacket(ctx context.Context, packet gopacket.Packet) {
func (d *TCP) assignUUID(ctx context.Context, ev UUIDEvent) {
metrics.DemuxerUUIDCount.Inc()
s := d.getSaver(ctx, ev.Flow)
if s == nil {
metrics.MissedUUIDs.WithLabelValues("skipped").Inc()
return
}
select {
case s.UUIDchan <- ev.UUIDEvent:
default:
Expand Down Expand Up @@ -167,7 +197,7 @@ func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet

// NewTCP creates a demuxer.TCP, which is the system which chooses which channel
// to send TCP/IP packets for subsequent saving to a file.
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool) *TCP {
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64) *TCP {
uuidc := make(chan UUIDEvent, 100)
return &TCP{
UUIDChan: uuidc,
Expand All @@ -183,5 +213,6 @@ func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFl
maxDuration: maxFlowDuration,
uuidWaitDuration: uuidWaitDuration,
stream: stream,
maxHeap: maxHeap,
}
}
8 changes: 5 additions & 3 deletions demuxer/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/m-lab/go/bytecount"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"

Expand Down Expand Up @@ -42,7 +44,7 @@ func TestTCPDryRun(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true)
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte))

// While we have a demuxer created, make sure that the processing path for
// packets does not crash when given a nil packet.
Expand Down Expand Up @@ -85,7 +87,7 @@ func TestTCPWithRealPcaps(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true)
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte))
st := &statusTracker{}
tcpdm.status = st
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -253,7 +255,7 @@ func TestUUIDWontBlock(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true)
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true, uint64(2*bytecount.Gigabyte))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

var wg sync.WaitGroup
Expand Down
21 changes: 15 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ import (
"net"
"os"
"os/signal"
"runtime/debug"
"sync"
"syscall"
"time"

"github.com/m-lab/go/bytecount"

"github.com/m-lab/go/anonymize"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"

"github.com/m-lab/go/anonymize"
"github.com/m-lab/go/bytecount"
"github.com/m-lab/go/flagx"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
Expand All @@ -38,6 +37,7 @@ var (
sigtermWaitTime = flag.Duration("sigtermwait", 1*time.Second, "How long should the daemon hang around before exiting after receiving a SIGTERM.")
streamToDisk = flag.Bool("stream", false, "Stream results to disk instead of buffering them in RAM.")
maxIdleRAM = 3 * bytecount.Gigabyte
maxHeap = 8 * bytecount.Gigabyte

interfaces flagx.StringArray

Expand All @@ -49,6 +49,7 @@ var (
func init() {
flag.Var(&interfaces, "interface", "The interface on which to capture traffic. May be repeated. If unset, will capture on all available interfaces.")
flag.Var(&maxIdleRAM, "maxidleram", "How much idle RAM we should tolerate before we try and forcibly return it to the OS.")
flag.Var(&maxHeap, "maxheap", "Stop collecting traces once process uses more than this amount of RAM.")
log.SetFlags(log.Lshortfile | log.LstdFlags)
}

Expand All @@ -73,7 +74,7 @@ var netInterfaces = net.Interfaces
func processFlags() ([]net.Interface, error) {
// Verify that capture duration is always longer than uuid wait duration.
if *uuidWaitDuration > *captureDuration {
return nil, fmt.Errorf("Capture duration must be greater than UUID wait duration: %s vs %s",
return nil, fmt.Errorf("capture duration must be greater than UUID wait duration: %s vs %s",
*captureDuration, *uuidWaitDuration)
}

Expand Down Expand Up @@ -111,6 +112,12 @@ func main() {

rtx.Must(os.Chdir(*dir), "Could not cd to directory %q", *dir)

// Set a memory limit for the GC to keep RAM used below maxHeap bytes. This
// is a soft limit that will alter how the GC behaves (e.g. reclaiming more
// frequently, releasing RAM back to the OS) but will not stop RAM usage by
// the process.
debug.SetMemoryLimit(int64(maxHeap))

// A waitgroup to make sure main() doesn't return before all its components
// get cleaned up.
cleanupWG := sync.WaitGroup{}
Expand All @@ -123,7 +130,9 @@ func main() {
}()

// Get ready to save the incoming packets to files.
tcpdm := demuxer.NewTCP(anonymize.New(anonymize.IPAnonymizationFlag), *dir, *uuidWaitDuration, *captureDuration, maxIdleRAM, *streamToDisk)
tcpdm := demuxer.NewTCP(
anonymize.New(anonymize.IPAnonymizationFlag), *dir, *uuidWaitDuration,
*captureDuration, maxIdleRAM, *streamToDisk, uint64(maxHeap))

// Inform the demuxer of new UUIDs
h := tcpinfohandler.New(mainCtx, tcpdm.UUIDChan)
Expand Down
6 changes: 6 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ var (
},
[]string{"state"},
)
SaversSkipped = promauto.NewCounter(
prometheus.CounterOpts{
Name: "pcap_saver_skipped_total",
Help: "How many flows were never saved due to configured memory limits.",
},
)

// Demuxer metrics keep track of the state of the system that sends packets
// to a particular saver.
Expand Down