From c7d1ad6cc33dcb9eff7c5a6a7386524e9e5e592c Mon Sep 17 00:00:00 2001 From: Roberto D'Auria Date: Mon, 18 Nov 2024 23:23:46 +0100 Subject: [PATCH] Add a configurable limit to the number of current flows (#58) * Route packets to a fake saver when currentFlows > 100 * Fix tests * Add -maxflows flag to configure flow limit. * Remove test case --- demuxer/tcp.go | 101 ++++++++++++++++++++++++++++++++++++++------ demuxer/tcp_test.go | 59 ++++++++++++++++++-------- main.go | 8 ++-- metrics/metrics.go | 6 +++ saver/tcp.go | 10 +++++ 5 files changed, 151 insertions(+), 33 deletions(-) diff --git a/demuxer/tcp.go b/demuxer/tcp.go index ac05057..f732c57 100644 --- a/demuxer/tcp.go +++ b/demuxer/tcp.go @@ -16,6 +16,65 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Saver allows to save data coming from this demuxer. +type Saver interface { + PChan() chan<- gopacket.Packet + UUIDChan() chan<- saver.UUIDEvent + State() string +} + +// A saver implementation that only drains the packet and UUID channels, to +// effectively ignore all packets and UUIDs for specific flows. +type drain struct { + pChan chan gopacket.Packet + uuidChan chan saver.UUIDEvent +} + +// newDrain returns a draining saver and starts a new goroutine to constantly +// drain the packet and the uuid channels until they are closed or the context +// is canceled. +func newDrain() *drain { + // Use the same buffer size as the real saver because even if all writes to + // these channel are non-blocking, we don't want to increase Prometheus + // counters for failed writes in case we can't read fast enough. + return &drain{ + pChan: make(chan gopacket.Packet, 8192), + uuidChan: make(chan saver.UUIDEvent, 1), + } +} + +func (d *drain) start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-d.pChan: + if !ok { + return + } + case _, ok := <-d.uuidChan: + if !ok { + return + } + } + } +} + +// PChan returns the pChan channel. +func (d *drain) PChan() chan<- gopacket.Packet { + return d.pChan +} + +// UUIDChan returns the uuidChan channel. +func (d *drain) UUIDChan() chan<- saver.UUIDEvent { + return d.uuidChan +} + +// State always returns "draining". +func (d *drain) State() string { + return "draining" +} + // UUIDEvent is the datatype sent to a demuxer's UUIDChan to notify it about the // UUID of new flows. type UUIDEvent struct { @@ -36,9 +95,11 @@ type TCP struct { // collect all savers in oldFlows and make all the currentFlows into // oldFlows. It is only through this garbage collection process that // saver.TCP objects are finalized. - currentFlows map[FlowKey]*saver.TCP - oldFlows map[FlowKey]*saver.TCP + currentFlows map[FlowKey]Saver + oldFlows map[FlowKey]Saver + drainSaver *drain maxIdleRAM bytecount.ByteCount + maxFlows int status status // Variables required for the construction of new Savers @@ -68,7 +129,7 @@ func currentHeapAboveThreshold(maxHeap uint64) bool { } // GetSaver returns a saver with channels for packets and a uuid. -func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP { +func (d *TCP) getSaver(ctx context.Context, flow FlowKey) Saver { // Read the flow from the flows map, the oldFlows map, or create it. t, ok := d.currentFlows[flow] if !ok { @@ -91,8 +152,17 @@ func (d *TCP) getSaver(ctx context.Context, flow FlowKey) *saver.TCP { // packet-headers spilling over into other parts of the system. if currentHeapAboveThreshold(d.maxHeap) { metrics.SaversSkipped.Inc() - return nil + return d.drainSaver + } + + // If there is a maximum number of flows configured and the number + // of current flows exceeds it, do not create a new Saver and + // return drainSaver instead. + if d.maxFlows != 0 && len(d.currentFlows) > d.maxFlows { + metrics.DemuxerIgnoredCount.Inc() + return d.drainSaver } + t = saver.StartNew(ctx, d.anon, d.dataDir, d.uuidWaitDuration, d.maxDuration, flow.Format(d.anon), d.stream) } d.currentFlows[flow] = t @@ -117,7 +187,7 @@ func (d *TCP) savePacket(ctx context.Context, packet gopacket.Packet) { // Don't block on channel write to the saver, but do note when it fails. select { - case s.Pchan <- packet: + case s.PChan() <- packet: default: metrics.MissedPackets.WithLabelValues(s.State()).Inc() } @@ -131,7 +201,7 @@ func (d *TCP) assignUUID(ctx context.Context, ev UUIDEvent) { return } select { - case s.UUIDchan <- ev.UUIDEvent: + case s.UUIDChan() <- ev.UUIDEvent: default: metrics.MissedUUIDs.WithLabelValues(s.State()).Inc() } @@ -142,10 +212,10 @@ func (d *TCP) collectGarbage() { defer timer.ObserveDuration() // Collect garbage in a separate goroutine. - go func(toBeDeleted map[FlowKey]*saver.TCP) { + go func(toBeDeleted map[FlowKey]Saver) { for _, s := range toBeDeleted { - close(s.UUIDchan) - close(s.Pchan) + close(s.UUIDChan()) + close(s.PChan()) } // Tell the VM to try and return RAM to the OS. ms := runtime.MemStats{} @@ -158,7 +228,7 @@ func (d *TCP) collectGarbage() { d.status.GC(len(d.currentFlows), len(d.oldFlows)) // Advance the generation. d.oldFlows = d.currentFlows - d.currentFlows = make(map[FlowKey]*saver.TCP) + d.currentFlows = make(map[FlowKey]Saver) } // CapturePackets captures the packets from the channel `packets` and hands them @@ -173,6 +243,10 @@ func (d *TCP) collectGarbage() { // closing both the passed-in packet channel and the UUIDChan to indicate that // no future input is possible. func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet, gcTicker <-chan time.Time) { + // Start the drain goroutine. + d.drainSaver = newDrain() + go d.drainSaver.start(ctx) + // This is the loop that has to run at high speed. All processing that can // happen outside this loop should happen outside this loop. No function // called from this loop should ever block. @@ -197,14 +271,14 @@ func (d *TCP) CapturePackets(ctx context.Context, packets <-chan gopacket.Packet // NewTCP creates a demuxer.TCP, which is the system which chooses which channel // to send TCP/IP packets for subsequent saving to a file. -func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64) *TCP { +func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFlowDuration time.Duration, maxIdleRAM bytecount.ByteCount, stream bool, maxHeap uint64, maxFlows int) *TCP { uuidc := make(chan UUIDEvent, 100) return &TCP{ UUIDChan: uuidc, uuidReadChan: uuidc, - currentFlows: make(map[FlowKey]*saver.TCP), - oldFlows: make(map[FlowKey]*saver.TCP), + currentFlows: make(map[FlowKey]Saver), + oldFlows: make(map[FlowKey]Saver), maxIdleRAM: maxIdleRAM, status: promStatus{}, @@ -214,5 +288,6 @@ func NewTCP(anon anonymize.IPAnonymizer, dataDir string, uuidWaitDuration, maxFl uuidWaitDuration: uuidWaitDuration, stream: stream, maxHeap: maxHeap, + maxFlows: maxFlows, } } diff --git a/demuxer/tcp_test.go b/demuxer/tcp_test.go index 1f2bb21..f68a9c5 100644 --- a/demuxer/tcp_test.go +++ b/demuxer/tcp_test.go @@ -11,12 +11,10 @@ import ( "testing" "time" - "github.com/m-lab/go/bytecount" - "github.com/google/gopacket" "github.com/google/gopacket/pcap" - "github.com/m-lab/go/anonymize" + "github.com/m-lab/go/bytecount" "github.com/m-lab/go/rtx" "github.com/m-lab/packet-headers/saver" ) @@ -44,7 +42,7 @@ func TestTCPDryRun(t *testing.T) { rtx.Must(err, "Could not create directory") defer os.RemoveAll(dir) - tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte)) + tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte), 0) // While we have a demuxer created, make sure that the processing path for // packets does not crash when given a nil packet. @@ -87,7 +85,7 @@ func TestTCPWithRealPcaps(t *testing.T) { rtx.Must(err, "Could not create directory") defer os.RemoveAll(dir) - tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte)) + tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 500*time.Millisecond, time.Second, 1000000000, true, uint64(2*bytecount.Gigabyte), 0) st := &statusTracker{} tcpdm.status = st ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -225,17 +223,6 @@ func TestTCPWithRealPcaps(t *testing.T) { if len(v6) != 8 { t.Errorf("%+v should have length 8 not %d", v6, len(v6)) } - - // After all that, also check that writes to an out-of-capacity Pchan will - // not block. - sav := tcpdm.getSaver(ctx, flow1) - close(sav.Pchan) - close(sav.UUIDchan) - // This new channel assigned to sav.Pchan will never be read, so if a blocking - // write is performed then this goroutine will block. - sav.Pchan = make(chan gopacket.Packet) - tcpdm.savePacket(ctx, flow1packets[0]) - // If this doesn't block, then success! } func TestUUIDWontBlock(t *testing.T) { @@ -255,7 +242,7 @@ func TestUUIDWontBlock(t *testing.T) { rtx.Must(err, "Could not create directory") defer os.RemoveAll(dir) - tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true, uint64(2*bytecount.Gigabyte)) + tcpdm := NewTCP(anonymize.New(anonymize.None), dir, 15*time.Second, 30*time.Second, 1, true, uint64(2*bytecount.Gigabyte), 0) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) var wg sync.WaitGroup @@ -281,3 +268,41 @@ func TestUUIDWontBlock(t *testing.T) { wg.Wait() // No freeze == success! } + +func Test_drain(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + drain := newDrain() + + go func() { + <-time.After(100 * time.Millisecond) + cancel() + }() + + drain.start(ctx) + + drain = newDrain() + close(drain.pChan) + // This should immediately return if the channel is closed. + drain.start(context.Background()) + + drain = newDrain() + close(drain.uuidChan) + // This should immediately return if the channel is closed. + drain.start(context.Background()) + + // No freeze = success. + + // Test the getters. + if drain.PChan() != drain.pChan { + t.Error("PChan() should return pChan") + } + + if drain.UUIDChan() != drain.uuidChan { + t.Error("UUIDChan() should return uuidChan") + } + + // Test that the state is always "draining". + if drain.State() != "draining" { + t.Error("State() should return 'draining'") + } +} diff --git a/main.go b/main.go index 7f3b128..85a0fd4 100644 --- a/main.go +++ b/main.go @@ -36,8 +36,10 @@ var ( maxHeaderSize = flag.Int("maxheadersize", 256, "The maximum size of packet headers allowed. A lower value allows the pcap process to be less wasteful but risks more esoteric IPv6 headers (which can theoretically be up to the full size of the packet but in practice seem to be under 128) getting truncated.") sigtermWaitTime = flag.Duration("sigtermwait", 1*time.Second, "How long should the daemon hang around before exiting after receiving a SIGTERM.") streamToDisk = flag.Bool("stream", false, "Stream results to disk instead of buffering them in RAM.") - maxIdleRAM = 3 * bytecount.Gigabyte - maxHeap = 8 * bytecount.Gigabyte + maxFlows = flag.Int("maxflows", 0, "The maximum number of concurrent flows allowed. When this threshold is reached, new flows will be ignored.") + + maxIdleRAM = 3 * bytecount.Gigabyte + maxHeap = 8 * bytecount.Gigabyte interfaces flagx.StringArray @@ -148,7 +150,7 @@ func main() { // Get ready to save the incoming packets to files. tcpdm := demuxer.NewTCP( anonymize.New(anonymize.IPAnonymizationFlag), *dir, *uuidWaitDuration, - *captureDuration, maxIdleRAM, *streamToDisk, uint64(maxHeap)) + *captureDuration, maxIdleRAM, *streamToDisk, uint64(maxHeap), *maxFlows) // Inform the demuxer of new UUIDs h := tcpinfohandler.New(mainCtx, tcpdm.UUIDChan) diff --git a/metrics/metrics.go b/metrics/metrics.go index 8616b20..c4a0545 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -95,6 +95,12 @@ var ( Help: "How many savers were still active after the most recent garbage collection round", }, ) + DemuxerIgnoredCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "pcap_demuxer_ignored_total", + Help: "How many packets or UUIDs were ignored due to the configured flow limit.", + }, + ) BadEventsFromTCPInfo = promauto.NewCounterVec( prometheus.CounterOpts{ diff --git a/saver/tcp.go b/saver/tcp.go index e9c4c5e..eca8ef8 100644 --- a/saver/tcp.go +++ b/saver/tcp.go @@ -169,6 +169,16 @@ type TCP struct { stopOnce sync.Once } +// PChan returns the Pchan field. +func (t *TCP) PChan() chan<- gopacket.Packet { + return t.Pchan +} + +// UUIDChan returns the UUIDchan field. +func (t *TCP) UUIDChan() chan<- UUIDEvent { + return t.UUIDchan +} + // Increment the error counter when errors are encountered. func (t *TCP) error(cause string) { t.state.Set(cause + "error")