Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add network.direction to netflow/log fileset #23052

Merged
merged 5 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Migrate microsoft/m365_defender to httpjson v2 config {pull}23018[23018]
- Add top_level_domain enrichment for suricata/eve fileset. {pull}23046[23046]
- Add top_level_domain enrichment for zeek/dns fileset. {pull}23046[23046]
- Add `network.direction` to netflow/log fileset. {pull}23052[23052]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,11 @@ filebeat.modules:
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private

#-------------------------- Arbor Peakflow SP Module --------------------------
- module: netscout
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
InternalNetworks []string `config:"internal_networks"`
Protocols []string `config:"protocols"`
ExpirationTimeout time.Duration `config:"expiration_timeout"`
PacketQueueSize int `config:"queue_size"`
Expand Down
43 changes: 19 additions & 24 deletions x-pack/filebeat/input/netflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/flowhash"
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

Expand All @@ -35,10 +36,10 @@ var (
}
)

func toBeatEvent(flow record.Record) (event beat.Event) {
func toBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
switch flow.Type {
case record.Flow:
return flowToBeatEvent(flow)
return flowToBeatEvent(flow, internalNetworks)
case record.Options:
return optionsToBeatEvent(flow)
default:
Expand Down Expand Up @@ -113,7 +114,7 @@ func optionsToBeatEvent(flow record.Record) beat.Event {
return toBeatEventCommon(flow)
}

func flowToBeatEvent(flow record.Record) (event beat.Event) {
func flowToBeatEvent(flow record.Record, internalNetworks []string) (event beat.Event) {
event = toBeatEventCommon(flow)

ecsEvent, ok := event.Fields["event"].(common.MapStr)
Expand Down Expand Up @@ -179,12 +180,12 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
}
if srcIP != nil {
ecsSource["ip"] = srcIP
ecsSource["locality"] = getIPLocality(srcIP).String()
ecsSource["locality"] = getIPLocality(internalNetworks, srcIP).String()
}
ecsSource["mac"] = srcMac
if dstIP != nil {
ecsDest["ip"] = dstIP
ecsDest["locality"] = getIPLocality(dstIP).String()
ecsDest["locality"] = getIPLocality(internalNetworks, dstIP).String()
}
ecsDest["mac"] = dstMac
}
Expand All @@ -194,7 +195,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
if ip, found := getKeyIP(flow.Fields, "sourceIPv4Address"); found {
ecsSource["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsSource["locality"] = getIPLocality(ip).String()
ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
}
if sourcePort, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
ecsSource["port"] = sourcePort
Expand All @@ -207,7 +208,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
if ip, found := getKeyIP(flow.Fields, "destinationIPv4Address"); found {
ecsDest["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsDest["locality"] = getIPLocality(ip).String()
ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
}
if destPort, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
ecsDest["port"] = destPort
Expand Down Expand Up @@ -243,7 +244,7 @@ func flowToBeatEvent(flow record.Record) (event beat.Event) {
dstIP = net.IPv4(0, 0, 0, 0).To4()
}
ecsFlow["id"] = flowID(srcIP, dstIP, srcPort, dstPort, uint8(protocol))
ecsFlow["locality"] = getIPLocality(srcIP, dstIP).String()
ecsFlow["locality"] = getIPLocality(internalNetworks, srcIP, dstIP).String()

// ECS Fields -- network
ecsNetwork := common.MapStr{}
Expand Down Expand Up @@ -394,29 +395,23 @@ func (l Locality) String() string {
return "unknown (" + strconv.Itoa(int(l)) + ")"
}

func isPrivateNetwork(ip net.IP) bool {
for _, net := range privateIPv4 {
if net.Contains(ip) {
return true
}
}

return privateIPv6.Contains(ip)
}

func isLocalOrPrivate(ip net.IP) bool {
return isPrivateNetwork(ip) ||
ip.IsLoopback() ||
func isLocal(ip net.IP) bool {
return ip.IsLoopback() ||
ip.IsUnspecified() ||
ip.Equal(net.IPv4bcast) ||
ip.IsLinkLocalUnicast() ||
ip.IsLinkLocalMulticast() ||
ip.IsInterfaceLocalMulticast()
}

func getIPLocality(ip ...net.IP) Locality {
for _, addr := range ip {
if !isLocalOrPrivate(addr) {
func getIPLocality(internalNetworks []string, ips ...net.IP) Locality {
for _, ip := range ips {
contains, err := conditions.NetworkContains(ip, internalNetworks...)
if err != nil {
return LocalityPublic
}
// always consider loopback/link-local private
if !contains && !isLocal(ip) {
return LocalityPublic
}
}
Expand Down
32 changes: 17 additions & 15 deletions x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type packet struct {
}

type netflowInput struct {
mutex sync.Mutex
udp *udp.Server
decoder *decoder.Decoder
outlet channel.Outleter
forwarder *harvester.Forwarder
logger *logp.Logger
queueC chan packet
queueSize int
started bool
mutex sync.Mutex
udp *udp.Server
decoder *decoder.Decoder
outlet channel.Outleter
forwarder *harvester.Forwarder
internalNetworks []string
logger *logp.Logger
queueC chan packet
queueSize int
started bool
}

func init() {
Expand Down Expand Up @@ -122,11 +123,12 @@ func NewInput(
}

input := &netflowInput{
outlet: out,
forwarder: harvester.NewForwarder(out),
decoder: decoder,
logger: logger,
queueSize: config.PacketQueueSize,
outlet: out,
internalNetworks: config.InternalNetworks,
forwarder: harvester.NewForwarder(out),
decoder: decoder,
logger: logger,
queueSize: config.PacketQueueSize,
}

input.udp = udp.New(&config.Config, input.packetDispatch)
Expand Down Expand Up @@ -243,7 +245,7 @@ func (p *netflowInput) recvRoutine() {
evs := make([]beat.Event, n)
numFlows.Add(uint64(n))
for i, flow := range flows {
evs[i] = toBeatEvent(flow)
evs[i] = toBeatEvent(flow, p.internalNetworks)
}
p.Publish(evs)
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/netflow/netflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult {
}
ev := make([]beat.Event, len(flows))
for i := range flows {
ev[i] = toBeatEvent(flows[i])
ev[i] = toBeatEvent(flows[i], []string{"private"})
}
//return TestResult{Name: name, Error: err.Error(), Events: flowsToEvents(flows)}
events = append(events, ev...)
Expand Down Expand Up @@ -242,7 +242,7 @@ func getFlowsFromPCAP(t testing.TB, name, pcapFile string) TestResult {
}
ev := make([]beat.Event, len(flows))
for i := range flows {
ev[i] = toBeatEvent(flows[i])
ev[i] = toBeatEvent(flows[i], []string{"private"})
}
events = append(events, ev...)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestReverseFlows(t *testing.T) {

var evs []beat.Event
for _, f := range flows {
evs = append(evs, toBeatEvent(f))
evs = append(evs, toBeatEvent(f, []string{"private"}))
}
if !assert.Len(t, evs, 2) {
t.Fatal()
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/module/netflow/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private
7 changes: 7 additions & 0 deletions x-pack/filebeat/module/netflow/log/config/netflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ max_message_size: '{{.max_message_size}}'
expiration_timeout: '{{.expiration_timeout}}'
queue_size: {{.queue_size}}

{{if .internal_networks}}
internal_hosts:
{{range .internal_networks}}
- '{{ . }}'
{{end}}
{{end}}

{{if .timeout}}
timeout: '{{.timeout}}'
{{end}}
Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/module/netflow/log/ingest/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ processors:
field: destination.as.organization_name
target_field: destination.as.organization.name
ignore_missing: true
- set:
field: network.direction
value: inbound
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: outbound
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: internal
if: 'ctx?.source?.locality == "internal" && ctx?.destination?.locality == "internal"'
- set:
field: network.direction
value: external
if: 'ctx?.source?.locality == "external" && ctx?.destination?.locality == "external"'
- set:
field: network.direction
value: unknown
if: 'ctx?.network?.direction == null'

on_failure:
- set:
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/modules.d/netflow.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
var:
netflow_host: localhost
netflow_port: 2055
# internal_networks specifies which networks are considered internal or private
# you can specify either a CIDR block or any of the special named ranges listed
# at: https://www.elastic.co/guide/en/beats/filebeat/current/defining-processors.html#condition-network
internal_networks:
- private