diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cd437f1f1135..d6875cc5c2ba 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Affecting all Beats* *Packetbeat* +- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557] *Topbeat* diff --git a/libbeat/publisher/preprocess.go b/libbeat/publisher/preprocess.go index e2c59511116c..135b364b476e 100644 --- a/libbeat/publisher/preprocess.go +++ b/libbeat/publisher/preprocess.go @@ -140,6 +140,13 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool { event["client_proc"] = src.Proc event["client_server"] = srcServer delete(event, "src") + + // check if it's outgoing transaction (as client) + if publisher.IsPublisherIP(src.Ip) { + //outgoing transaction + event["direction"] = "out" + } + } dst, ok := event["dst"].(*common.Endpoint) if ok { @@ -150,18 +157,15 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool { event["server"] = dstServer delete(event, "dst") - //get the direction of the transaction: outgoing (as client)/incoming (as server) + //check if it's incoming transaction (as server) if publisher.IsPublisherIP(dst.Ip) { // incoming transaction event["direction"] = "in" - } else { - //outgoing transaction - event["direction"] = "out" } + } - if publisher.IgnoreOutgoing && dstServer != "" && - dstServer != publisher.name { + if publisher.IgnoreOutgoing && event["direction"] == "out" { // duplicated transaction -> ignore it debug("Ignore duplicated transaction on %s: %s -> %s", publisher.name, srcServer, dstServer) diff --git a/libbeat/publisher/preprocess_test.go b/libbeat/publisher/preprocess_test.go index 9a57dc5e4893..0f289a4c81c9 100644 --- a/libbeat/publisher/preprocess_test.go +++ b/libbeat/publisher/preprocess_test.go @@ -48,3 +48,85 @@ func TestFilterEvent(t *testing.T) { assert.Regexp(t, test.err, filterEvent(test.f())) } } + +func TestDirectionOut(t *testing.T) { + publisher := PublisherType{} + + publisher.ipaddrs = []string{"192.145.2.4"} + + event := common.MapStr{ + "src": &common.Endpoint{ + Ip: "192.145.2.4", + Port: 3267, + Name: "server1", + Cmdline: "proc1 start", + Proc: "proc1", + }, + "dst": &common.Endpoint{ + Ip: "192.145.2.5", + Port: 32232, + Name: "server2", + Cmdline: "proc2 start", + Proc: "proc2", + }, + } + + assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, event["client_ip"] == "192.145.2.4") + assert.True(t, event["direction"] == "out") +} + +func TestDirectionIn(t *testing.T) { + publisher := PublisherType{} + + publisher.ipaddrs = []string{"192.145.2.5"} + + event := common.MapStr{ + "src": &common.Endpoint{ + Ip: "192.145.2.4", + Port: 3267, + Name: "server1", + Cmdline: "proc1 start", + Proc: "proc1", + }, + "dst": &common.Endpoint{ + Ip: "192.145.2.5", + Port: 32232, + Name: "server2", + Cmdline: "proc2 start", + Proc: "proc2", + }, + } + + assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, event["client_ip"] == "192.145.2.4") + assert.True(t, event["direction"] == "in") +} + +func TestNoDirection(t *testing.T) { + publisher := PublisherType{} + + publisher.ipaddrs = []string{"192.145.2.6"} + + event := common.MapStr{ + "src": &common.Endpoint{ + Ip: "192.145.2.4", + Port: 3267, + Name: "server1", + Cmdline: "proc1 start", + Proc: "proc1", + }, + "dst": &common.Endpoint{ + Ip: "192.145.2.5", + Port: 32232, + Name: "server2", + Cmdline: "proc2 start", + Proc: "proc2", + }, + } + + assert.True(t, updateEventAddresses(&publisher, event)) + assert.True(t, event["client_ip"] == "192.145.2.4") + _, ok := event["direction"] + assert.False(t, ok) +}