Skip to content

Commit

Permalink
Add UDP support to packetbeat's process monitor (elastic#541)
Browse files Browse the repository at this point in the history
This updates the process monitor with UDP support.

Until now, it would lookup all ports as TCP. This could result in the
wrong process being reported when two different processes used the same
port number, one for TCP and one for UDP.

The Linux and Windows implementations have been updated to fetch
information for UDP ports.

Closes elastic#541
  • Loading branch information
adriansr committed Jul 12, 2018
1 parent 947221e commit 7bf0943
Show file tree
Hide file tree
Showing 22 changed files with 220 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135]
- Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455]
- Flows are enriched with process information using the process monitor. {pull}7507[7507]
- Added UDP support to process monitor. {pull}7571[7571]

*Winlogbeat*

Expand Down
11 changes: 8 additions & 3 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos/applayer"
)

type flowsProcessor struct {
Expand Down Expand Up @@ -216,6 +217,7 @@ func createEvent(
source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}
var proto applayer.Transport

// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
Expand Down Expand Up @@ -282,9 +284,11 @@ func createEvent(

// udp layer meta data
if src, dst, ok := f.id.UDPAddr(); ok {
source["port"] = binary.LittleEndian.Uint16(src)
dest["port"] = binary.LittleEndian.Uint16(dst)
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "udp"
proto = applayer.TransportUDP
}

// tcp layer meta data
Expand All @@ -293,6 +297,7 @@ func createEvent(
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "tcp"
proto = applayer.TransportTCP
}

if id := f.id.ConnectionID(); id != nil {
Expand All @@ -311,7 +316,7 @@ func createEvent(

// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); cmdline != nil {
src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline)

for key, value := range map[string]string{
Expand Down
64 changes: 44 additions & 20 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -49,15 +50,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping() (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[uint16]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -76,7 +77,11 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = make(map[uint16]portProcMapping)
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
}

proc.processCache = make(map[int]*process)

proc.enabled = config.Enabled
Expand All @@ -99,72 +104,91 @@ func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatch
return nil
}

func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
// FindProcessesTupleTCP looks up local process information for the source and
// destination addresses of TCP tuple
func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportTCP)
}

// FindProcessesTupleUDP looks up local process information for the source and
// destination addresses of UDP tuple
func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportUDP)
}

// FindProcessesTuple looks up local process information for the source and
// destination addresses of a tuple for the given transport protocol
func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) (procTuple *common.CmdlineTuple) {
procTuple = &common.CmdlineTuple{}

if !proc.enabled {
return
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort); p != nil {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.SrcPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort); p != nil {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.DstPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16) *process {
func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

p, exists := proc.portProcMap[port]
procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
if exists {
return p.proc
}

proc.updateMap()
proc.updateMap(transport)

p, exists = proc.portProcMap[port]
p, exists = procMap[port]
if exists {
return p.proc
}

return nil
}

func (proc *ProcessesWatcher) updateMap() {
func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
defer func() {
logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start))
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping()
ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(port, pid)
proc.updateMappingEntry(transport, port, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
prev, ok := proc.portProcMap[port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -179,10 +203,10 @@ func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d pid=%d process='%s' name=%s",
port, pid, p.commandLine, p.name)
logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
30 changes: 21 additions & 9 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strings"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -43,22 +44,33 @@ type socketInfo struct {
inode uint64
}

var procFiles = map[applayer.Transport]struct {
ipv4, ipv6 string
}{
applayer.TransportUDP: {"/proc/net/udp", "/proc/net/udp6"},
applayer.TransportTCP: {"/proc/net/tcp", "/proc/net/tcp6"},
}

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
}
var pids gosigar.ProcList
if err = pids.Get(); err != nil {
return nil, err
}
logp.Debug("procs", "getLocalPortsToPIDs()")
ipv4socks, err := socketsFromProc("/proc/net/tcp", false)
ipv4socks, err := socketsFromProc(sourceFiles.ipv4, false)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv4, err)
return nil, err
}
ipv6socks, err := socketsFromProc("/proc/net/tcp6", true)
ipv6socks, err := socketsFromProc(sourceFiles.ipv6, true)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv6, err)
return nil, err
}
socksMap := map[uint64]*socketInfo{}
Expand Down Expand Up @@ -126,11 +138,11 @@ func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) {
return nil, err
}
defer file.Close()
return parseProcNetTCP(file, ipv6)
return parseProcNetProto(file, ipv6)
}

// Parses the /proc/net/tcp file
func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
// Parses the /proc/net/(tcp|udp)6? file
func parseProcNetProto(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
buf := bufio.NewReader(input)

sockets := []*socketInfo{}
Expand All @@ -139,7 +151,7 @@ func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
for err != io.EOF {
line, err = buf.ReadBytes('\n')
if err != nil && err != io.EOF {
logp.Err("Error reading proc net tcp file: %s", err)
logp.Err("Error reading proc net file: %s", err)
return nil, err
}
words := bytes.Fields(line)
Expand Down
4 changes: 3 additions & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package procs

import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
return nil, nil
}
Loading

0 comments on commit 7bf0943

Please sign in to comment.