From 51d86375b2c034c407faaac458b351fc97c04282 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Fri, 23 Jun 2017 17:46:22 +0200 Subject: [PATCH] Add UDP prospector type with plain harvester event looks as following ``` { "@timestamp": "2017-05-26T11:57:17.68711727+02:00", "beat": { "hostname": "ruflin", "name": "ruflin", "version": "6.0.0-alpha2" }, "message": "Hello, World!99", "prospector": { "harvester": "plain", "type": "udp" } } ``` --- CHANGELOG.asciidoc | 1 + filebeat/_meta/common.full.p2.yml | 4 ++ filebeat/filebeat.full.yml | 4 ++ filebeat/harvester/util.go | 2 + filebeat/prospector/prospector.go | 3 ++ filebeat/prospector/udp/config.go | 20 ++++++++ filebeat/prospector/udp/harvester.go | 72 +++++++++++++++++++++++++++ filebeat/prospector/udp/prospector.go | 47 +++++++++++++++++ filebeat/tests/system/test_udp.py | 41 +++++++++++++++ 9 files changed, 194 insertions(+) create mode 100644 filebeat/prospector/udp/config.go create mode 100644 filebeat/prospector/udp/harvester.go create mode 100644 filebeat/prospector/udp/prospector.go create mode 100644 filebeat/tests/system/test_udp.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ea94e03c7fa..19e81013564 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d - Nginx module: use the first not-private IP address as the remote_ip. {pull}4417[4417] - Load Ingest Node pipelines when the Elasticsearch connection is established, instead of only once at startup. {pull}4479[4479] - Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506] +- Add udp prospector type. {pull}4452[4452] *Heartbeat* diff --git a/filebeat/_meta/common.full.p2.yml b/filebeat/_meta/common.full.p2.yml index 8dd083b5ec8..2339507e3fb 100644 --- a/filebeat/_meta/common.full.p2.yml +++ b/filebeat/_meta/common.full.p2.yml @@ -231,6 +231,10 @@ filebeat.prospectors: # Redis AUTH password. Empty by default. #password: foobared +#- type: udp + + # Maximum size of the message received over UDP + #max_message_size: 10240 #========================= Filebeat global options ============================ # Event count spool threshold - forces network flush if exceeded diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 6f64d7aec55..d4bcf8a60a4 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -400,6 +400,10 @@ filebeat.prospectors: # Redis AUTH password. Empty by default. #password: foobared +#- type: udp + + # Maximum size of the message received over UDP + #max_message_size: 10240 #========================= Filebeat global options ============================ # Event count spool threshold - forces network flush if exceeded diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go index 2817b13c85f..8b091d6dd7c 100644 --- a/filebeat/harvester/util.go +++ b/filebeat/harvester/util.go @@ -7,6 +7,7 @@ const ( LogType = "log" StdinType = "stdin" RedisType = "redis" + UdpType = "udp" ) // ValidType of valid input types @@ -14,6 +15,7 @@ var ValidType = map[string]struct{}{ StdinType: {}, LogType: {}, RedisType: {}, + UdpType: {}, } // MatchAny checks if the text matches any of the regular expressions diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 223fb7907e5..7769aa4b04d 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/filebeat/prospector/log" "github.com/elastic/beats/filebeat/prospector/redis" "github.com/elastic/beats/filebeat/prospector/stdin" + "github.com/elastic/beats/filebeat/prospector/udp" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -77,6 +78,8 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta prospectorer, err = redis.NewProspector(config, outlet) case harvester.LogType: prospectorer, err = log.NewProspector(config, states, outlet, p.done) + case harvester.UdpType: + prospectorer, err = udp.NewProspector(config, outlet) default: return fmt.Errorf("invalid prospector type: %v. Change type", p.config.Type) } diff --git a/filebeat/prospector/udp/config.go b/filebeat/prospector/udp/config.go new file mode 100644 index 00000000000..f3f43feea27 --- /dev/null +++ b/filebeat/prospector/udp/config.go @@ -0,0 +1,20 @@ +package udp + +import ( + "github.com/elastic/beats/filebeat/harvester" +) + +var defaultConfig = config{ + ForwarderConfig: harvester.ForwarderConfig{ + Type: "udp", + }, + MaxMessageSize: 10240, + // TODO: What should be default port? + Host: "localhost:8080", +} + +type config struct { + harvester.ForwarderConfig `config:",inline"` + Host string `config:"host"` + MaxMessageSize int `config:"max_message_size"` +} diff --git a/filebeat/prospector/udp/harvester.go b/filebeat/prospector/udp/harvester.go new file mode 100644 index 00000000000..58b2e054968 --- /dev/null +++ b/filebeat/prospector/udp/harvester.go @@ -0,0 +1,72 @@ +package udp + +import ( + "net" + "time" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Harvester struct { + forwarder *harvester.Forwarder + done chan struct{} + cfg *common.Config + listener net.PacketConn +} + +func NewHarvester(forwarder *harvester.Forwarder, cfg *common.Config) *Harvester { + return &Harvester{ + done: make(chan struct{}), + cfg: cfg, + forwarder: forwarder, + } +} + +func (h *Harvester) Run() error { + + config := defaultConfig + err := h.cfg.Unpack(&config) + if err != nil { + return err + } + + h.listener, err = net.ListenPacket("udp", config.Host) + if err != nil { + return err + } + defer h.listener.Close() + + logp.Info("Started listening for udp on: %s", config.Host) + + buffer := make([]byte, config.MaxMessageSize) + + for { + select { + case <-h.done: + return nil + default: + } + + length, _, err := h.listener.ReadFrom(buffer) + if err != nil { + logp.Err("Error reading from buffer: %v", err.Error()) + continue + } + data := util.NewData() + event := common.MapStr{ + "message": string(buffer[:length]), + "@timestamp": time.Now(), + } + data.Event = event + h.forwarder.Send(data) + } +} + +func (h *Harvester) Stop() { + logp.Info("Stopping udp harvester") + close(h.done) + h.listener.Close() +} diff --git a/filebeat/prospector/udp/prospector.go b/filebeat/prospector/udp/prospector.go new file mode 100644 index 00000000000..f4ce6fb5d23 --- /dev/null +++ b/filebeat/prospector/udp/prospector.go @@ -0,0 +1,47 @@ +package udp + +import ( + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type Prospector struct { + harvester *Harvester + started bool +} + +func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Prospector, error) { + forwarder, err := harvester.NewForwarder(cfg, outlet) + if err != nil { + return nil, err + } + return &Prospector{ + harvester: NewHarvester(forwarder, cfg), + started: false, + }, nil +} + +func (p *Prospector) Run() { + logp.Info("Starting udp prospector") + + if !p.started { + p.started = true + go func() { + err := p.harvester.Run() + if err != nil { + logp.Err("Error running harvester:: %v", err) + } + }() + } +} + +func (p *Prospector) Stop() { + logp.Info("Stopping udp prospector") + p.harvester.Stop() +} + +func (p *Prospector) Wait() { + p.Stop() +} diff --git a/filebeat/tests/system/test_udp.py b/filebeat/tests/system/test_udp.py new file mode 100644 index 00000000000..2efddfabd0b --- /dev/null +++ b/filebeat/tests/system/test_udp.py @@ -0,0 +1,41 @@ +from filebeat import BaseTest +import socket +import time + + +class Test(BaseTest): + + def test_udp(self): + + host = "127.0.0.1" + port = 8080 + prospector_raw = """ +- type: udp + host: "{}:{}" + enabled: true +""" + + prospector_raw = prospector_raw.format(host, port) + + self.render_config_template( + prospector_raw=prospector_raw, + prospectors=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for udp")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + + for n in range(0, 2): + sock.sendto("Hello World: " + str(n), (host, port)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["prospector.type"] == "udp"