Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19716 to 7.x: Move unix socket to v2 input API #19778

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
Expand All @@ -32,5 +33,7 @@ func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.P
}

func genericInputs() []v2.Plugin {
return []v2.Plugin{}
return []v2.Plugin{
unix.Plugin(),
}
}
2 changes: 1 addition & 1 deletion filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ import (
type osComponents interface{}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
return nil
return []v2.Plugin{}
}
22 changes: 9 additions & 13 deletions filebeat/input/unix/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,20 @@ import (

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
)

type config struct {
unix.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "unix",
},
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
func defaultConfig() config {
return config{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}
}
141 changes: 60 additions & 81 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,118 +18,97 @@
package unix

import (
"bufio"
"fmt"
"sync"
"net"
"time"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
input "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/go-concert/ctxtool"
)

func init() {
err := input.Register("unix", NewInput)
if err != nil {
panic(err)
}
}

// Input for Unix socket connection
type Input struct {
mutex sync.Mutex
server *unix.Server
started bool
outlet channel.Outleter
config *config
log *logp.Logger
type server struct {
config
splitFunc bufio.SplitFunc
}

// NewInput creates a new Unix socket input
func NewInput(
cfg *common.Config,
connector channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.Connect(cfg)
if err != nil {
return nil, err
func Plugin() input.Plugin {
return input.Plugin{
Name: "unix",
Stability: feature.Beta,
Deprecated: false,
Info: "unix socket server",
Manager: stateless.NewInputManager(configure),
}
}

forwarder := harvester.NewForwarder(out)

config := defaultConfig
err = cfg.Unpack(&config)
if err != nil {
func configure(cfg *common.Config) (stateless.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

cb := func(data []byte, metadata inputsource.NetworkMetadata) {
forwarder.Send(beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"message": string(data),
},
})
}
return newServer(config)
}

func newServer(config config) (*server, error) {
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.unix").With("path", config.Config.Path)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, cb, splitFunc)
return &server{config: config, splitFunc: splitFunc}, nil
}

func (s *server) Name() string { return "unix" }

server, err := unix.New(&config.Config, factory)
func (s *server) Test(_ input.TestContext) error {
l, err := net.Listen("unix", s.config.Path)
if err != nil {
return nil, err
return err
}

return &Input{
server: server,
started: false,
outlet: out,
config: &config,
log: logger,
}, nil
return l.Close()
}

// Run start a Unix socket input
func (p *Input) Run() {
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
p.log.Info("Starting Unix socket input")
err := p.server.Start()
if err != nil {
p.log.Errorw("Error starting the Unix socket server", "error", err)
}
p.started = true
func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log := ctx.Logger.Named("input.unix").With("path", s.config.Config.Path)

log.Info("Starting Unix socket input")
defer log.Info("Unix socket input stopped")

cb := func(data []byte, metadata inputsource.NetworkMetadata) {
event := createEvent(data, metadata)
publisher.Publish(event)
}
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, log, unix.MetadataCallback, cb, s.splitFunc)
server, err := unix.New(&s.config.Config, factory)
if err != nil {
return err
}
}

// Stop stops Unix socket server
func (p *Input) Stop() {
defer p.outlet.Close()
p.mutex.Lock()
defer p.mutex.Unlock()
log.Debugf("TCP Input '%v' initialized", ctx.ID)

p.log.Info("Stopping Unix socket input")
p.server.Stop()
p.started = false
err = server.Run(ctxtool.FromCanceller(ctx.Cancelation))

// ignore error from 'Run' in case shutdown was signaled.
if ctxerr := ctx.Cancelation.Err(); ctxerr != nil {
err = ctxerr
}
return err
}

// Wait stop the current server
func (p *Input) Wait() {
p.Stop()
func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event {
return beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"message": string(raw),
},
}
}