Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Trafficdump tool #1004

Merged
merged 17 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
lint:
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand Down Expand Up @@ -55,7 +55,7 @@ jobs:
lint-jsonnet:
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand Down Expand Up @@ -85,7 +85,7 @@ jobs:
test:
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand All @@ -112,7 +112,7 @@ jobs:
build-mimir:
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand Down Expand Up @@ -144,7 +144,7 @@ jobs:
build-tools:
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand Down Expand Up @@ -227,7 +227,7 @@ jobs:
if: (startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/r') ) && github.event_name == 'push' && github.repository == 'grafana/mimir'
runs-on: ubuntu-20.04
container:
image: us.gcr.io/kubernetes-dev/mimir-build-image:import-jsonnet-readme-2418fd778-WIP
image: us.gcr.io/kubernetes-dev/mimir-build-image:trafficdump-b1d000267
credentials:
username: _json_key
password: ${{ secrets.gcr_json_key }}
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ run:
build-tags:
- netgo
- requires_docker
- requires_libpcap
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ mimir-build-image/$(UPTODATE): mimir-build-image/*
# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
BUILD_IN_CONTAINER := true
LATEST_BUILD_IMAGE_TAG ?= import-jsonnet-readme-2418fd778-WIP
LATEST_BUILD_IMAGE_TAG ?= trafficdump-b1d000267

# TTY is parameterized to allow Google Cloud Builder to run builds,
# as it currently disallows TTY devices. This value needs to be overridden
Expand Down
26 changes: 26 additions & 0 deletions docs/sources/tools/trafficdump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
title: "Tenant injector"
description: ""
weight: 100
---

# Trafficdump

Trafficdump tool can read packets from captured tcpdump output, reassemble them into TCP streams
and parse HTTP requests and responses. It then prints requests and responses as json (one request/response per line)
for further processing. Trafficdump can only parse "raw" HTTP requests and responses, and not HTTP requests and responses
wrapped in gRPC, as used by Mimir between some components. Best place to capture such traffic is on the entrypoint to Mimir
(eg. authentication gateway/proxy).

It has some Mimir-specific and generic HTTP features:

- filter requests based on Tenant (in Basic or X-Scope-OrgId header)
- filter requests based on URL path
- filter requests based on status code of the response
- decode Mimir push requests
- filter requests based on matching series in push requests

Trafficdump can be used to inspect both remote-write requests and queries.

Note that trafficdump currently cannot decode `LINUX_SSL2` link type, which is used when doing `tcpdump -i any` on Linux.
Capturing traffic with `tcpdump -i eth0` (and link type ETHERNET / EN10MB) works fine.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/gogo/status v1.1.0
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.0
github.com/grafana/dskit v0.0.0-20220131162925-09047fe4cc11
github.com/grafana/e2e v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,8 @@ github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down
2 changes: 1 addition & 1 deletion mimir-build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
FROM golang:1.17.3-buster
ARG goproxyValue
ENV GOPROXY=${goproxyValue}
RUN apt-get update && apt-get install -y curl python-requests python-yaml file jq zip unzip protobuf-compiler libprotobuf-dev shellcheck && \
RUN apt-get update && apt-get install -y curl python-requests python-yaml file jq zip unzip protobuf-compiler libprotobuf-dev shellcheck libpcap-dev && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go install golang.org/x/tools/cmd/goimports@3fce476f0a782aeb5034d592c189e63be4ba6c9e
RUN curl -sL https://deb.nodesource.com/setup_14.x | bash -
Expand Down
227 changes: 227 additions & 0 deletions tools/trafficdump/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
// This tool uses requires_libpcap tag to avoid compilation problems on machines that
// don't have libpcap installed (eg. when running "go test ./..." from Mimir root).
//
//go:build requires_libpcap
// +build requires_libpcap

package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
"go.uber.org/atomic"
)

func main() {
parser := &parser{}

parser.RegisterFlags(flag.CommandLine)

fname := flag.String("r", "", "Filename to read traffic dump from")
filter := flag.String("f", "tcp and port 80", "BPF filter for pcap")
filterEndpointPort := flag.Int("p", 80, "Only process packets with one of the endpoint ports equal to this value. This should match BPF filter (-f option), if used. 0 to disable.")
assemblersCount := flag.Uint("assembler.concurrency", 16, "How many TCP Assemblers to run concurrently")
assemblersMaxPagesPerConnection := flag.Int("assembler.max-pages-per-connection", 0, "Upper limit on the number of pages buffered for a single connection. If this limit is reached for a connection, the smallest sequence number will be flushed, along with any contiguous data. If <= 0, this is ignored.")
httpServer := flag.String("http-listen", ":18080", "Listen address for HTTP server (useful for profiling of this tool)")

flag.Parse()

if *httpServer != "" {
go func() {
log.Println("HTTP server running on", *httpServer)
log.Println(http.ListenAndServe(*httpServer, nil))
}()
}

var handle *pcap.Handle
var err error

// Set up pcap packet capture
if *fname != "" {
log.Printf("Reading from pcap dump %q", *fname)
handle, err = pcap.OpenOffline(*fname)
} else {
log.Println("No dump file specified")
os.Exit(1)
}
if err != nil {
log.Fatal(err)
}

if err := handle.SetBPFFilter(*filter); err != nil {
log.Fatal(err)
}

parser.prepare()

// Set up assembly
streamFactory := &httpStreamFactory{
parser: parser,
processorMap: map[processorKey]*processor{},
}

// Wait until all processors finish before exiting.
defer streamFactory.processorWG.Wait()

streamPool := tcpassembly.NewStreamPool(streamFactory)

assemblers := make([]*tcpassembly.Assembler, *assemblersCount)
for ix := uint(0); ix < *assemblersCount; ix++ {
assemblers[ix] = tcpassembly.NewAssembler(streamPool)
assemblers[ix].AssemblerOptions.MaxBufferedPagesPerConnection = *assemblersMaxPagesPerConnection
}

log.Println("Reading packets")

// Read in packets, pass to assembler.
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packets := packetSource.Packets()
ticker := time.Tick(time.Minute)

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT)
defer cancel()

var packetsCount atomic.Int64

go func() {
for {
time.Sleep(1 * time.Second)
log.Println("Processed", packetsCount.Load(), "packets, tracking", streamFactory.runningProcessors.Load(), "TCP connections")
}
}()

filterPort := layers.TCPPort(*filterEndpointPort)

stop:
for {
select {
case packet := <-packets:
packetsCount.Inc()
// A nil packet indicates the end of a pcap file.
if packet == nil {
break stop
}

if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
log.Println("Unusable packet")
continue
}

tcp := packet.TransportLayer().(*layers.TCP)
if filterPort != 0 && tcp.SrcPort != filterPort && tcp.DstPort != filterPort {
// Ignored packet.
continue
}

netFlow := packet.NetworkLayer().NetworkFlow()
transportFlow := tcp.TransportFlow()

// Use netFlow (IP address) and transportFlow (TCP ports) to find the shard.
// FastHash guarantees that flow and its reverse (src->dest, dest->src) have the same hash.
shard := (netFlow.FastHash() ^ transportFlow.FastHash()) % uint64(*assemblersCount)
replay marked this conversation as resolved.
Show resolved Hide resolved
assemblers[shard].AssembleWithTimestamp(netFlow, tcp, packet.Metadata().Timestamp)

case <-ticker:
// Every minute, flush connections that haven't seen activity in the past 2 minutes.
flushed, closed := 0, 0
for i := uint(0); i < *assemblersCount; i++ {
f, c := assemblers[i].FlushOlderThan(time.Now().Add(time.Minute * -2))
flushed += f
closed += c
}
log.Println("Flushed", flushed, "and closed", closed, "connections")

case <-ctx.Done():
log.Println("CTRL-C, exiting")
os.Exit(1)
}
}

log.Println("Read", packetsCount, "packets, closing remaining connections")

closeCh := make(chan int)
for i := 0; uint(i) < *assemblersCount; i++ {
go func(ix int) {
closeCh <- assemblers[ix].FlushAll()
}(i)
}

closed := 0
for i := 0; uint(i) < *assemblersCount; i++ {
closed += <-closeCh
}
log.Println("Closed", closed, "connections")
}

// processorKey is used to map bidirectional streams to each other.
type processorKey struct {
net, transport gopacket.Flow
}

// String prints out the key in a human-readable fashion.
func (k processorKey) String() string {
return fmt.Sprintf("%v:%v", k.net, k.transport)
}

// httpStreamFactory implements tcpassembly.StreamFactory
type httpStreamFactory struct {
parser *parser

// processorMap maps keys to bidirectional stream pairs. Only used when matching request and response streams,
// key is deleted after matching succeeds.
processorMap map[processorKey]*processor

processorWG sync.WaitGroup
runningProcessors atomic.Int64
}

func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
readerStream := tcpreader.NewReaderStream()

// Find if this stream will be request or response stream, and find existing processor, if it exists.
// If not, start a new one.

k := processorKey{net, transport}
p := h.processorMap[k]
if p == nil {
// Assume that first stream is from client to server (ie. request stream).
// TODO: decide whether this is client->server or vice versa based on port, and perhaps configurable IP.

p = newProcessor(&h.parser.processorConfig, net, transport)
p.req = newRequestStream(&readerStream, h.parser)

h.processorMap[processorKey{net.Reverse(), transport.Reverse()}] = p
} else { // processor already exists, fill in the second stream.
delete(h.processorMap, k)

p.resp = newResponseStream(&readerStream, h.parser)

// We have both directions now, start the request/response processor. It can rely on having both streams set.
h.processorWG.Add(1)
h.runningProcessors.Inc()
go func() {
defer h.processorWG.Done()
defer h.runningProcessors.Dec()

p.run()
}()
}

return &readerStream
}
Loading