Skip to content

Commit

Permalink
Add a configurable limit to the number of current flows (#58)
Browse files Browse the repository at this point in the history
* Route packets to a fake saver when currentFlows > 100

* Fix tests

* Add -maxflows flag to configure flow limit.

* Remove test case
  • Loading branch information
robertodauria authored Nov 18, 2024
1 parent 70d4ff1 commit c7d1ad6
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 33 deletions.
101 changes: 88 additions & 13 deletions demuxer/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,65 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Saver allows to save data coming from this demuxer.
type Saver interface {
PChan() chan<- gopacket.Packet
UUIDChan() chan<- saver.UUIDEvent
State() string
}

// A saver implementation that only drains the packet and UUID channels, to
// effectively ignore all packets and UUIDs for specific flows.
type drain struct {
pChan chan gopacket.Packet
uuidChan chan saver.UUIDEvent
}

// newDrain returns a draining saver and starts a new goroutine to constantly
// drain the packet and the uuid channels until they are closed or the context
// is canceled.
func newDrain() *drain {
// Use the same buffer size as the real saver because even if all writes to
// these channel are non-blocking, we don't want to increase Prometheus
// counters for failed writes in case we can't read fast enough.
return &drain{
pChan: make(chan gopacket.Packet, 8192),
uuidChan: make(chan saver.UUIDEvent, 1),
}
}

func (d *drain) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case _, ok := <-d.pChan:
if !ok {
return
}
case _, ok := <-d.uuidChan:
if !ok {
return
}
}
}
}

// PChan returns the pChan channel.
func (d *drain) PChan() chan<- gopacket.Packet {
return d.pChan
}

// UUIDChan returns the uuidChan channel.
func (d *drain) UUIDChan() chan<- saver.UUIDEvent {
return d.uuidChan
}

// State always returns "draining".
func (d *drain) State() string {
return "draining"
}

// UUIDEvent is the datatype sent to a demuxer's UUIDChan to notify it about the
// UUID of new flows.
type UUIDEvent struct {
Expand All @@ -36,9 +95,11 @@ type TCP struct {
// collect all savers in oldFlows and make all the currentFlows into
// oldFlows. It is only through this garbage collection process that
// saver.TCP objects are finalized.
currentFlows map[FlowKey]*saver.TCP
oldFlows map[FlowKey]*saver.TCP
currentFlows map[FlowKey]Saver
oldFlows map[FlowKey]Saver
drainSaver *drain
maxIdleRAM bytecount.ByteCount
maxFlows int
status status

// Variables required for the construction of new Savers
Expand Down Expand Up @@ -68,7 +129,7 @@ func currentHeapAboveThreshold(maxHeap uint64) bool {
}

// GetSaver returns a saver with channels for packets and a uuid.
func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
func (d *TCP) getSaver(ctx context.Context, flow FlowKey) Saver {
// Read the flow from the flows map, the oldFlows map, or create it.
t, ok := d.currentFlows[flow]
if !ok {
Expand All @@ -91,8 +152,17 @@ func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP {
// packet-headers spilling over into other parts of the system.
if currentHeapAboveThreshold(d.maxHeap) {
metrics.SaversSkipped.Inc()
return nil
return d.drainSaver
}

// If there is a maximum number of flows configured and the number
// of current flows exceeds it, do not create a new Saver and
// return drainSaver instead.
if d.maxFlows != 0 && len(d.currentFlows) > d.maxFlows {
metrics.DemuxerIgnoredCount.Inc()
return d.drainSaver
}

t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.Format(d.anon), d.stream)
}
d.currentFlows[flow] = t
Expand All @@ -117,7 +187,7 @@ func (d *TCP) savePacket(ctx context.Context, packet gopacket.Packet) {

// Don't block on channel write to the saver, but do note when it fails.
select {
case s.Pchan <- packet:
case s.PChan() <- packet:
default:
metrics.MissedPackets.WithLabelValues(s.State()).Inc()
}
Expand All @@ -131,7 +201,7 @@ func (d *TCP) assignUUID(ctx context.Context, ev UUIDEvent) {
return
}
select {
case s.UUIDchan <- ev.UUIDEvent:
case s.UUIDChan() <- ev.UUIDEvent:
default:
metrics.MissedUUIDs.WithLabelValues(s.State()).Inc()
}
Expand All @@ -142,10 +212,10 @@ func (d *TCP) collectGarbage() {
defer timer.ObserveDuration()

// Collect garbage in a separate goroutine.
go func(toBeDeleted map[FlowKey]*saver.TCP) {
go func(toBeDeleted map[FlowKey]Saver) {
for _, s := range toBeDeleted {
close(s.UUIDchan)
close(s.Pchan)
close(s.UUIDChan())
close(s.PChan())
}
// Tell the VM to try and return RAM to the OS.
ms := runtime.MemStats{}
Expand All @@ -158,7 +228,7 @@ func (d *TCP) collectGarbage() {
d.status.GC(len(d.currentFlows), len(d.oldFlows))
// Advance the generation.
d.oldFlows = d.currentFlows
d.currentFlows = make(map[FlowKey]*saver.TCP)
d.currentFlows = make(map[FlowKey]Saver)
}

// CapturePackets captures the packets from the channel `packets` and hands them
Expand All @@ -173,6 +243,10 @@ func (d *TCP) collectGarbage() {
// closing both the passed-in packet channel and the UUIDChan to indicate that
// no future input is possible.
func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet, gcTicker <-chan time.Time) {
// Start the drain goroutine.
d.drainSaver = newDrain()
go d.drainSaver.start(ctx)

// This is the loop that has to run at high speed. All processing that can
// happen outside this loop should happen outside this loop. No function
// called from this loop should ever block.
Expand All @@ -197,14 +271,14 @@ func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet

// NewTCP creates a demuxer.TCP, which is the system which chooses which channel
// to send TCP/IP packets for subsequent saving to a file.
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64) *TCP {
func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64, maxFlows int) *TCP {
uuidc := make(chan UUIDEvent, 100)
return &TCP{
UUIDChan: uuidc,
uuidReadChan: uuidc,

currentFlows: make(map[FlowKey]*saver.TCP),
oldFlows: make(map[FlowKey]*saver.TCP),
currentFlows: make(map[FlowKey]Saver),
oldFlows: make(map[FlowKey]Saver),
maxIdleRAM: maxIdleRAM,
status: promStatus{},

Expand All @@ -214,5 +288,6 @@ func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFl
uuidWaitDuration: uuidWaitDuration,
stream: stream,
maxHeap: maxHeap,
maxFlows: maxFlows,
}
}
59 changes: 42 additions & 17 deletions demuxer/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import (
"testing"
"time"

"github.com/m-lab/go/bytecount"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"

"github.com/m-lab/go/anonymize"
"github.com/m-lab/go/bytecount"
"github.com/m-lab/go/rtx"
"github.com/m-lab/packet-headers/saver"
)
Expand Down Expand Up @@ -44,7 +42,7 @@ func TestTCPDryRun(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte))
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte), 0)

// While we have a demuxer created, make sure that the processing path for
// packets does not crash when given a nil packet.
Expand Down Expand Up @@ -87,7 +85,7 @@ func TestTCPWithRealPcaps(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte))
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte), 0)
st := &statusTracker{}
tcpdm.status = st
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -225,17 +223,6 @@ func TestTCPWithRealPcaps(t *testing.T) {
if len(v6) != 8 {
t.Errorf("%+v should have length 8 not %d", v6, len(v6))
}

// After all that, also check that writes to an out-of-capacity Pchan will
// not block.
sav := tcpdm.getSaver(ctx, flow1)
close(sav.Pchan)
close(sav.UUIDchan)
// This new channel assigned to sav.Pchan will never be read, so if a blocking
// write is performed then this goroutine will block.
sav.Pchan = make(chan gopacket.Packet)
tcpdm.savePacket(ctx, flow1packets[0])
// If this doesn't block, then success!
}

func TestUUIDWontBlock(t *testing.T) {
Expand All @@ -255,7 +242,7 @@ func TestUUIDWontBlock(t *testing.T) {
rtx.Must(err, "Could not create directory")
defer os.RemoveAll(dir)

tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true, uint64(2*bytecount.Gigabyte))
tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true, uint64(2*bytecount.Gigabyte), 0)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

var wg sync.WaitGroup
Expand All @@ -281,3 +268,41 @@ func TestUUIDWontBlock(t *testing.T) {
wg.Wait()
// No freeze == success!
}

func Test_drain(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
drain := newDrain()

go func() {
<-time.After(100 * time.Millisecond)
cancel()
}()

drain.start(ctx)

drain = newDrain()
close(drain.pChan)
// This should immediately return if the channel is closed.
drain.start(context.Background())

drain = newDrain()
close(drain.uuidChan)
// This should immediately return if the channel is closed.
drain.start(context.Background())

// No freeze = success.

// Test the getters.
if drain.PChan() != drain.pChan {
t.Error("PChan() should return pChan")
}

if drain.UUIDChan() != drain.uuidChan {
t.Error("UUIDChan() should return uuidChan")
}

// Test that the state is always "draining".
if drain.State() != "draining" {
t.Error("State() should return 'draining'")
}
}
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ var (
maxHeaderSize = flag.Int("maxheadersize", 256, "The maximum size of packet headers allowed. A lower value allows the pcap process to be less wasteful but risks more esoteric IPv6 headers (which can theoretically be up to the full size of the packet but in practice seem to be under 128) getting truncated.")
sigtermWaitTime = flag.Duration("sigtermwait", 1*time.Second, "How long should the daemon hang around before exiting after receiving a SIGTERM.")
streamToDisk = flag.Bool("stream", false, "Stream results to disk instead of buffering them in RAM.")
maxIdleRAM = 3 * bytecount.Gigabyte
maxHeap = 8 * bytecount.Gigabyte
maxFlows = flag.Int("maxflows", 0, "The maximum number of concurrent flows allowed. When this threshold is reached, new flows will be ignored.")

maxIdleRAM = 3 * bytecount.Gigabyte
maxHeap = 8 * bytecount.Gigabyte

interfaces flagx.StringArray

Expand Down Expand Up @@ -148,7 +150,7 @@ func main() {
// Get ready to save the incoming packets to files.
tcpdm := demuxer.NewTCP(
anonymize.New(anonymize.IPAnonymizationFlag), *dir, *uuidWaitDuration,
*captureDuration, maxIdleRAM, *streamToDisk, uint64(maxHeap))
*captureDuration, maxIdleRAM, *streamToDisk, uint64(maxHeap), *maxFlows)

// Inform the demuxer of new UUIDs
h := tcpinfohandler.New(mainCtx, tcpdm.UUIDChan)
Expand Down
6 changes: 6 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ var (
Help: "How many savers were still active after the most recent garbage collection round",
},
)
DemuxerIgnoredCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "pcap_demuxer_ignored_total",
Help: "How many packets or UUIDs were ignored due to the configured flow limit.",
},
)

BadEventsFromTCPInfo = promauto.NewCounterVec(
prometheus.CounterOpts{
Expand Down
10 changes: 10 additions & 0 deletions saver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ type TCP struct {
stopOnce sync.Once
}

// PChan returns the Pchan field.
func (t *TCP) PChan() chan<- gopacket.Packet {
return t.Pchan
}

// UUIDChan returns the UUIDchan field.
func (t *TCP) UUIDChan() chan<- UUIDEvent {
return t.UUIDchan
}

// Increment the error counter when errors are encountered.
func (t *TCP) error(cause string) {
t.state.Set(cause + "error")
Expand Down

0 comments on commit c7d1ad6

Please sign in to comment.