Skip to content

Commit

Permalink
Add UDP prospector type with plain harvester
Browse files Browse the repository at this point in the history
event looks as following

```
{
  "@timestamp": "2017-05-26T11:57:17.68711727+02:00",
  "beat": {
    "hostname": "ruflin",
    "name": "ruflin",
    "version": "6.0.0-alpha2"
  },
  "message": "Hello, World!99",
  "prospector": {
    "harvester": "plain",
    "type": "udp"
  }
}
```
  • Loading branch information
ruflin authored and exekias committed Jun 23, 2017
1 parent 1213483 commit 51d8637
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Nginx module: use the first not-private IP address as the remote_ip. {pull}4417[4417]
- Load Ingest Node pipelines when the Elasticsearch connection is established, instead of only once at startup. {pull}4479[4479]
- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]
- Add udp prospector type. {pull}4452[4452]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/common.full.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ filebeat.prospectors:
# Redis AUTH password. Empty by default.
#password: foobared

#- type: udp

# Maximum size of the message received over UDP
#max_message_size: 10240
#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ filebeat.prospectors:
# Redis AUTH password. Empty by default.
#password: foobared

#- type: udp

# Maximum size of the message received over UDP
#max_message_size: 10240
#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
2 changes: 2 additions & 0 deletions filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ const (
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
UdpType = "udp"
)

// ValidType of valid input types
var ValidType = map[string]struct{}{
StdinType: {},
LogType: {},
RedisType: {},
UdpType: {},
}

// MatchAny checks if the text matches any of the regular expressions
Expand Down
3 changes: 3 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/elastic/beats/filebeat/prospector/log"
"github.com/elastic/beats/filebeat/prospector/redis"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/filebeat/prospector/udp"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -77,6 +78,8 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta
prospectorer, err = redis.NewProspector(config, outlet)
case harvester.LogType:
prospectorer, err = log.NewProspector(config, states, outlet, p.done)
case harvester.UdpType:
prospectorer, err = udp.NewProspector(config, outlet)
default:
return fmt.Errorf("invalid prospector type: %v. Change type", p.config.Type)
}
Expand Down
20 changes: 20 additions & 0 deletions filebeat/prospector/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package udp

import (
"github.com/elastic/beats/filebeat/harvester"
)

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "udp",
},
MaxMessageSize: 10240,
// TODO: What should be default port?
Host: "localhost:8080",
}

type config struct {
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host"`
MaxMessageSize int `config:"max_message_size"`
}
72 changes: 72 additions & 0 deletions filebeat/prospector/udp/harvester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package udp

import (
"net"
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Harvester struct {
forwarder *harvester.Forwarder
done chan struct{}
cfg *common.Config
listener net.PacketConn
}

func NewHarvester(forwarder *harvester.Forwarder, cfg *common.Config) *Harvester {
return &Harvester{
done: make(chan struct{}),
cfg: cfg,
forwarder: forwarder,
}
}

func (h *Harvester) Run() error {

config := defaultConfig
err := h.cfg.Unpack(&config)
if err != nil {
return err
}

h.listener, err = net.ListenPacket("udp", config.Host)
if err != nil {
return err
}
defer h.listener.Close()

logp.Info("Started listening for udp on: %s", config.Host)

buffer := make([]byte, config.MaxMessageSize)

for {
select {
case <-h.done:
return nil
default:
}

length, _, err := h.listener.ReadFrom(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
data := util.NewData()
event := common.MapStr{
"message": string(buffer[:length]),
"@timestamp": time.Now(),
}
data.Event = event
h.forwarder.Send(data)
}
}

func (h *Harvester) Stop() {
logp.Info("Stopping udp harvester")
close(h.done)
h.listener.Close()
}
47 changes: 47 additions & 0 deletions filebeat/prospector/udp/prospector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package udp

import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Prospector struct {
harvester *Harvester
started bool
}

func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Prospector, error) {
forwarder, err := harvester.NewForwarder(cfg, outlet)
if err != nil {
return nil, err
}
return &Prospector{
harvester: NewHarvester(forwarder, cfg),
started: false,
}, nil
}

func (p *Prospector) Run() {
logp.Info("Starting udp prospector")

if !p.started {
p.started = true
go func() {
err := p.harvester.Run()
if err != nil {
logp.Err("Error running harvester:: %v", err)
}
}()
}
}

func (p *Prospector) Stop() {
logp.Info("Stopping udp prospector")
p.harvester.Stop()
}

func (p *Prospector) Wait() {
p.Stop()
}
41 changes: 41 additions & 0 deletions filebeat/tests/system/test_udp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from filebeat import BaseTest
import socket
import time


class Test(BaseTest):

def test_udp(self):

host = "127.0.0.1"
port = 8080
prospector_raw = """
- type: udp
host: "{}:{}"
enabled: true
"""

prospector_raw = prospector_raw.format(host, port)

self.render_config_template(
prospector_raw=prospector_raw,
prospectors=False,
)

filebeat = self.start_beat()

self.wait_until(lambda: self.log_contains("Started listening for udp"))

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP

for n in range(0, 2):
sock.sendto("Hello World: " + str(n), (host, port))

self.wait_until(lambda: self.output_count(lambda x: x >= 2))

filebeat.check_kill_and_wait()

output = self.read_output()

assert len(output) == 2
assert output[0]["prospector.type"] == "udp"

0 comments on commit 51d8637

Please sign in to comment.