Skip to content

Commit

Permalink
Move unix socket to v2 input API (elastic#19716)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jul 9, 2020
1 parent 2c88c84 commit 2320b3f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 97 deletions.
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -32,5 +33,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P
}

func genericInputs() []v2.Plugin {
return []v2.Plugin{}
return []v2.Plugin{
unix.Plugin(),
}
}
2 changes: 1 addition & 1 deletion filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ import (
type osComponents interface{}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
return nil
return []v2.Plugin{}
}
22 changes: 9 additions & 13 deletions filebeat/input/unix/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,20 @@ import (

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
)

type config struct {
unix.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "unix",
},
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
func defaultConfig() config {
return config{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}
}
141 changes: 60 additions & 81 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,118 +18,97 @@
package unix

import (
"bufio"
"fmt"
"sync"
"net"
"time"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
input "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/go-concert/ctxtool"
)

func init() {
err := input.Register("unix", NewInput)
if err != nil {
panic(err)
}
}

// Input for Unix socket connection
type Input struct {
mutex sync.Mutex
server *unix.Server
started bool
outlet channel.Outleter
config *config
log *logp.Logger
type server struct {
config
splitFunc bufio.SplitFunc
}

// NewInput creates a new Unix socket input
func NewInput(
cfg *common.Config,
connector channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.Connect(cfg)
if err != nil {
return nil, err
func Plugin() input.Plugin {
return input.Plugin{
Name: "unix",
Stability: feature.Beta,
Deprecated: false,
Info: "unix socket server",
Manager: stateless.NewInputManager(configure),
}
}

forwarder := harvester.NewForwarder(out)

config := defaultConfig
err = cfg.Unpack(&config)
if err != nil {
func configure(cfg *common.Config) (stateless.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

cb := func(data []byte, metadata inputsource.NetworkMetadata) {
forwarder.Send(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"message": string(data),
},
})
}
return newServer(config)
}

func newServer(config config) (*server, error) {
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.unix").With("path", config.Config.Path)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, cb, splitFunc)
return &server{config: config, splitFunc: splitFunc}, nil
}

func (s *server) Name() string { return "unix" }

server, err := unix.New(&config.Config, factory)
func (s *server) Test(_ input.TestContext) error {
l, err := net.Listen("unix", s.config.Path)
if err != nil {
return nil, err
return err
}

return &Input{
server: server,
started: false,
outlet: out,
config: &config,
log: logger,
}, nil
return l.Close()
}

// Run start a Unix socket input
func (p *Input) Run() {
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
p.log.Info("Starting Unix socket input")
err := p.server.Start()
if err != nil {
p.log.Errorw("Error starting the Unix socket server", "error", err)
}
p.started = true
func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log := ctx.Logger.Named("input.unix").With("path", s.config.Config.Path)

log.Info("Starting Unix socket input")
defer log.Info("Unix socket input stopped")

cb := func(data []byte, metadata inputsource.NetworkMetadata) {
event := createEvent(data, metadata)
publisher.Publish(event)
}
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, log, unix.MetadataCallback, cb, s.splitFunc)
server, err := unix.New(&s.config.Config, factory)
if err != nil {
return err
}
}

// Stop stops Unix socket server
func (p *Input) Stop() {
defer p.outlet.Close()
p.mutex.Lock()
defer p.mutex.Unlock()
log.Debugf("TCP Input '%v' initialized", ctx.ID)

p.log.Info("Stopping Unix socket input")
p.server.Stop()
p.started = false
err = server.Run(ctxtool.FromCanceller(ctx.Cancelation))

// ignore error from 'Run' in case shutdown was signaled.
if ctxerr := ctx.Cancelation.Err(); ctxerr != nil {
err = ctxerr
}
return err
}

// Wait stop the current server
func (p *Input) Wait() {
p.Stop()
func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event {
return beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"message": string(raw),
},
}
}

0 comments on commit 2320b3f

Please sign in to comment.