From a1da2329330bff9675ad9213828c95c50895f321 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Tue, 21 Apr 2020 08:10:52 -0400 Subject: [PATCH] Cherry-pick #17492 to 7.x: [Filebeat] Unix stream socket input source (#17851) * [Filebeat] Unix stream socket input source (#17492) * initial common refactor * Fix up unix and add license info * Fix inputs * Clean up handlers * Update changelog and docs * Fix added input * Fix tests * Add unix socket system tests * add systems tests for syslog unix input * pep autoformat * Disable unix tests for Windows since Python 3.8 doesn't support AF_UNIX * pep autoformat * Address feedback * fix test (cherry picked from commit 3b994389d6ce8968141a18e539efc6e5ba462f0d) * Fix changelog --- CHANGELOG.next.asciidoc | 1 + .../inputs/input-common-unix-options.asciidoc | 37 +++ filebeat/docs/inputs/input-syslog.asciidoc | 14 +- filebeat/include/list.go | 1 + filebeat/input/syslog/config.go | 37 ++- filebeat/input/syslog/input.go | 4 +- filebeat/input/tcp/input.go | 18 +- filebeat/input/unix/config.go | 45 +++ filebeat/input/unix/input.go | 137 +++++++++ .../inputsource/{tcp => common}/closeref.go | 10 +- filebeat/inputsource/common/config.go | 31 ++ filebeat/inputsource/{tcp => common}/conn.go | 2 +- .../inputsource/{tcp => common}/conn_test.go | 12 +- filebeat/inputsource/common/handler.go | 95 ++++++ filebeat/inputsource/common/listener.go | 174 +++++++++++ filebeat/inputsource/{tcp => common}/scan.go | 6 +- .../inputsource/{tcp => common}/scan_test.go | 4 +- filebeat/inputsource/tcp/config.go | 2 - filebeat/inputsource/tcp/handler.go | 95 +----- filebeat/inputsource/tcp/server.go | 127 +------- filebeat/inputsource/tcp/server_test.go | 34 ++- filebeat/inputsource/unix/config.go | 44 +++ filebeat/inputsource/unix/handler.go | 29 ++ filebeat/inputsource/unix/server.go | 67 +++++ filebeat/inputsource/unix/server_test.go | 278 ++++++++++++++++++ filebeat/tests/system/test_syslog.py | 101 ++++++- filebeat/tests/system/test_unix.py | 77 +++++ 27 files changed, 1235 insertions(+), 247 deletions(-) create mode 100644 filebeat/docs/inputs/input-common-unix-options.asciidoc create mode 100644 filebeat/input/unix/config.go create mode 100644 filebeat/input/unix/input.go rename filebeat/inputsource/{tcp => common}/closeref.go (94%) create mode 100644 filebeat/inputsource/common/config.go rename filebeat/inputsource/{tcp => common}/conn.go (99%) rename filebeat/inputsource/{tcp => common}/conn_test.go (87%) create mode 100644 filebeat/inputsource/common/handler.go create mode 100644 filebeat/inputsource/common/listener.go rename filebeat/inputsource/{tcp => common}/scan.go (91%) rename filebeat/inputsource/{tcp => common}/scan_test.go (97%) create mode 100644 filebeat/inputsource/unix/config.go create mode 100644 filebeat/inputsource/unix/handler.go create mode 100644 filebeat/inputsource/unix/server.go create mode 100644 filebeat/inputsource/unix/server_test.go create mode 100644 filebeat/tests/system/test_unix.py diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a28a27583f0..5b596de4720 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -331,6 +331,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Release Google Cloud module as GA. {pull}17511[17511] - Update filebeat httpjson input to support pagination via Header and Okta module. {pull}16354[16354] - Enhance `elasticsearch/server` fileset to handle ECS-compatible logs emitted by Elasticsearch. {issue}17715[17715] {pull}17714[17714] +- Added Unix stream socket support as an input source and a syslog input source. {pull}17492[17492] *Heartbeat* diff --git a/filebeat/docs/inputs/input-common-unix-options.asciidoc b/filebeat/docs/inputs/input-common-unix-options.asciidoc new file mode 100644 index 00000000000..443fe761274 --- /dev/null +++ b/filebeat/docs/inputs/input-common-unix-options.asciidoc @@ -0,0 +1,37 @@ +////////////////////////////////////////////////////////////////////////// +//// This content is shared by Filebeat inputs that use the Unix inputsource +//// If you add IDs to sections, make sure you use attributes to create +//// unique IDs for each input that includes this file. Use the format: +//// [id="{beatname_lc}-input-{type}-option-name"] +////////////////////////////////////////////////////////////////////////// +[float] +[id="{beatname_lc}-input-{type}-unix-max-message-size"] +==== `max_message_size` + +The maximum size of the message received over the socket. The default is `20MiB`. + +[float] +[id="{beatname_lc}-input-{type}-unix-path"] +==== `path` + +The path to the Unix socket that will receive event streams. + +[float] +[id="{beatname_lc}-input-{type}-unix-line-delimiter"] +==== `line_delimiter` + +Specify the characters used to split the incoming events. The default is '\n'. + +[float] +[id="{beatname_lc}-input-{type}-unix-max-connections"] +==== `max_connections` + +The at most number of connections to accept at any given point in time. + +[float] +[id="{beatname_lc}-input-{type}-unix-timeout"] +==== `timeout` + +The number of seconds of inactivity before a connection is closed. The default is `300s`. + +See <> for more information. diff --git a/filebeat/docs/inputs/input-syslog.asciidoc b/filebeat/docs/inputs/input-syslog.asciidoc index c1d4cc66015..0c360a03f7f 100644 --- a/filebeat/docs/inputs/input-syslog.asciidoc +++ b/filebeat/docs/inputs/input-syslog.asciidoc @@ -7,7 +7,7 @@ Syslog ++++ -Use the `syslog` input to read events over TCP or UDP, this input will parse BSD (rfc3164) +Use the `syslog` input to read events over TCP, UDP, or a Unix stream socket, this input will parse BSD (rfc3164) event and some variant. Example configurations: @@ -28,6 +28,14 @@ Example configurations: host: "localhost:9000" ---- +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: syslog + protocol.unix: + path: "/path/to/syslog.sock" +---- + ==== Configuration options The `syslog` input supports protocol specific configuration options plus the @@ -41,6 +49,10 @@ include::../inputs/input-common-udp-options.asciidoc[] include::../inputs/input-common-tcp-options.asciidoc[] +===== Protocol `unix`: + +include::../inputs/input-common-unix-options.asciidoc[] + [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 519d0e71581..7cc66b3894b 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -31,6 +31,7 @@ import ( _ "github.com/elastic/beats/v7/filebeat/input/syslog" _ "github.com/elastic/beats/v7/filebeat/input/tcp" _ "github.com/elastic/beats/v7/filebeat/input/udp" + _ "github.com/elastic/beats/v7/filebeat/input/unix" _ "github.com/elastic/beats/v7/filebeat/module/apache" _ "github.com/elastic/beats/v7/filebeat/module/auditd" _ "github.com/elastic/beats/v7/filebeat/module/elasticsearch" diff --git a/filebeat/input/syslog/config.go b/filebeat/input/syslog/config.go index 28590b3ced3..5b6ac1452b4 100644 --- a/filebeat/input/syslog/config.go +++ b/filebeat/input/syslog/config.go @@ -25,9 +25,12 @@ import ( "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/inputsource" + netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/filebeat/inputsource/tcp" "github.com/elastic/beats/v7/filebeat/inputsource/udp" + "github.com/elastic/beats/v7/filebeat/inputsource/unix" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) type config struct { @@ -54,6 +57,19 @@ var defaultTCP = syslogTCP{ LineDelimiter: "\n", } +type syslogUnix struct { + unix.Config `config:",inline"` + LineDelimiter string `config:"line_delimiter" validate:"nonzero"` +} + +var defaultUnix = syslogUnix{ + Config: unix.Config{ + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, + }, + LineDelimiter: "\n", +} + var defaultUDP = udp.Config{ MaxMessageSize: 10 * humanize.KiByte, Timeout: time.Minute * 5, @@ -72,14 +88,31 @@ func factory( return nil, err } - splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter)) + splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter)) if splitFunc == nil { return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter) } - factory := tcp.SplitHandlerFactory(nf, splitFunc) + logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc) return tcp.New(&config.Config, factory) + case unix.Name: + config := defaultUnix + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter)) + if splitFunc == nil { + return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter) + } + + logger := logp.NewLogger("input.syslog.unix").With("path", config.Config.Path) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, nf, splitFunc) + + return unix.New(&config.Config, factory) + case udp.Name: config := defaultUDP if err := cfg.Unpack(&config); err != nil { diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go index 24271c6fe4b..ecbc8db9cd8 100644 --- a/filebeat/input/syslog/input.go +++ b/filebeat/input/syslog/input.go @@ -256,7 +256,9 @@ func newBeatEvent(timestamp time.Time, metadata inputsource.NetworkMetadata, fie }, Fields: fields, } - event.Fields.Put("log.source.address", metadata.RemoteAddr.String()) + if metadata.RemoteAddr != nil { + event.Fields.Put("log.source.address", metadata.RemoteAddr.String()) + } return event } diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 9c0b7476f90..5b3b8dc8394 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/filebeat/inputsource" + netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/filebeat/inputsource/tcp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -41,7 +42,7 @@ func init() { // Input for TCP connection type Input struct { - sync.Mutex + mutex sync.Mutex server *tcp.Server started bool outlet channel.Outleter @@ -78,12 +79,13 @@ func NewInput( forwarder.Send(event) } - splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter)) + splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter)) if splitFunc == nil { return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter) } - factory := tcp.SplitHandlerFactory(cb, splitFunc) + logger := logp.NewLogger("input.tcp").With("address", config.Config.Host) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc) server, err := tcp.New(&config.Config, factory) if err != nil { @@ -95,14 +97,14 @@ func NewInput( started: false, outlet: out, config: &config, - log: logp.NewLogger("tcp input").With("address", config.Config.Host), + log: logger, }, nil } // Run start a TCP input func (p *Input) Run() { - p.Lock() - defer p.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() if !p.started { p.log.Info("Starting TCP input") @@ -117,8 +119,8 @@ func (p *Input) Run() { // Stop stops TCP server func (p *Input) Stop() { defer p.outlet.Close() - p.Lock() - defer p.Unlock() + p.mutex.Lock() + defer p.mutex.Unlock() p.log.Info("Stopping TCP input") p.server.Stop() diff --git a/filebeat/input/unix/config.go b/filebeat/input/unix/config.go new file mode 100644 index 00000000000..5f65173bc46 --- /dev/null +++ b/filebeat/input/unix/config.go @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "time" + + "github.com/dustin/go-humanize" + + "github.com/elastic/beats/v7/filebeat/harvester" + "github.com/elastic/beats/v7/filebeat/inputsource/unix" +) + +type config struct { + unix.Config `config:",inline"` + harvester.ForwarderConfig `config:",inline"` + + LineDelimiter string `config:"line_delimiter" validate:"nonzero"` +} + +var defaultConfig = config{ + ForwarderConfig: harvester.ForwarderConfig{ + Type: "unix", + }, + Config: unix.Config{ + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, + }, + LineDelimiter: "\n", +} diff --git a/filebeat/input/unix/input.go b/filebeat/input/unix/input.go new file mode 100644 index 00000000000..12c091f00da --- /dev/null +++ b/filebeat/input/unix/input.go @@ -0,0 +1,137 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "fmt" + "sync" + "time" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/harvester" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/inputsource" + netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" + "github.com/elastic/beats/v7/filebeat/inputsource/unix" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func init() { + err := input.Register("unix", NewInput) + if err != nil { + panic(err) + } +} + +// Input for Unix socket connection +type Input struct { + mutex sync.Mutex + server *unix.Server + started bool + outlet channel.Outleter + config *config + log *logp.Logger +} + +// NewInput creates a new Unix socket input +func NewInput( + cfg *common.Config, + connector channel.Connector, + context input.Context, +) (input.Input, error) { + + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + + forwarder := harvester.NewForwarder(out) + + config := defaultConfig + err = cfg.Unpack(&config) + if err != nil { + return nil, err + } + + cb := func(data []byte, metadata inputsource.NetworkMetadata) { + forwarder.Send(beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "message": string(data), + }, + }) + } + + splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter)) + if splitFunc == nil { + return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter) + } + + logger := logp.NewLogger("input.unix").With("path", config.Config.Path) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, cb, splitFunc) + + server, err := unix.New(&config.Config, factory) + if err != nil { + return nil, err + } + + return &Input{ + server: server, + started: false, + outlet: out, + config: &config, + log: logger, + }, nil +} + +// Run start a Unix socket input +func (p *Input) Run() { + p.mutex.Lock() + defer p.mutex.Unlock() + + if !p.started { + p.log.Info("Starting Unix socket input") + err := p.server.Start() + if err != nil { + p.log.Errorw("Error starting the Unix socket server", "error", err) + } + p.started = true + } +} + +// Stop stops Unix socket server +func (p *Input) Stop() { + defer p.outlet.Close() + p.mutex.Lock() + defer p.mutex.Unlock() + + p.log.Info("Stopping Unix socket input") + p.server.Stop() + p.started = false +} + +// Wait stop the current server +func (p *Input) Wait() { + p.Stop() +} diff --git a/filebeat/inputsource/tcp/closeref.go b/filebeat/inputsource/common/closeref.go similarity index 94% rename from filebeat/inputsource/tcp/closeref.go rename to filebeat/inputsource/common/closeref.go index d718df42343..04ca60bbda3 100644 --- a/filebeat/inputsource/tcp/closeref.go +++ b/filebeat/inputsource/common/closeref.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package tcp +package common import ( "sync" @@ -51,7 +51,7 @@ type Closer struct { } // Close closes the closes and propagates the close to any child, on close the close callback will -// be called, this can be used for custom cleanup like closing a TCP socket. +// be called, this can be used for custom cleanup like closing a socket. func (c *Closer) Close() { c.mu.Lock() if c.err != nil { @@ -87,6 +87,12 @@ func (c *Closer) Done() <-chan struct{} { return c.done } +// SetCallback sets the underlying callback function invoked +// when the Closer is Closed. +func (c *Closer) SetCallback(callback CloserFunc) { + c.callback = callback +} + // Err returns an error if the Closer was already closed. func (c *Closer) Err() error { c.mu.Lock() diff --git a/filebeat/inputsource/common/config.go b/filebeat/inputsource/common/config.go new file mode 100644 index 00000000000..2ae5bf52e35 --- /dev/null +++ b/filebeat/inputsource/common/config.go @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "time" + + "github.com/elastic/beats/v7/libbeat/common/cfgtype" +) + +// ListenerConfig exposes the shared listener configuration. +type ListenerConfig struct { + Timeout time.Duration + MaxMessageSize cfgtype.ByteSize + MaxConnections int +} diff --git a/filebeat/inputsource/tcp/conn.go b/filebeat/inputsource/common/conn.go similarity index 99% rename from filebeat/inputsource/tcp/conn.go rename to filebeat/inputsource/common/conn.go index 3d08e47fe24..c6cf86d2292 100644 --- a/filebeat/inputsource/tcp/conn.go +++ b/filebeat/inputsource/common/conn.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package tcp +package common import ( "io" diff --git a/filebeat/inputsource/tcp/conn_test.go b/filebeat/inputsource/common/conn_test.go similarity index 87% rename from filebeat/inputsource/tcp/conn_test.go rename to filebeat/inputsource/common/conn_test.go index 8f687613b14..f5e41a58c63 100644 --- a/filebeat/inputsource/tcp/conn_test.go +++ b/filebeat/inputsource/common/conn_test.go @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -package tcp +package common import ( + "math/rand" "strings" "testing" @@ -58,3 +59,12 @@ func TestResetableLimitedReader(t *testing.T) { assert.NoError(t, err) }) } + +func randomString(l int) string { + charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789") + message := make([]byte, l) + for i := range message { + message[i] = charsets[rand.Intn(len(charsets))] + } + return string(message) +} diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go new file mode 100644 index 00000000000..84786086f4e --- /dev/null +++ b/filebeat/inputsource/common/handler.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "bufio" + "net" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/inputsource" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// HandlerFactory returns a ConnectionHandler func +type HandlerFactory func(config ListenerConfig) ConnectionHandler + +// ConnectionHandler interface provides mechanisms for handling of incoming connections +type ConnectionHandler func(CloseRef, net.Conn) error + +// MetadataFunc defines callback executed when a line is read from the split handler. +type MetadataFunc func(net.Conn) inputsource.NetworkMetadata + +// SplitHandlerFactory allows creation of a handler that has splitting capabilities. +func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { + return func(config ListenerConfig) ConnectionHandler { + return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + metadata := metadataCallback(conn) + maxMessageSize := uint64(config.MaxMessageSize) + + var log *logp.Logger + if family == FamilyUnix { + // unix sockets have an empty `RemoteAddr` value, so no need to capture it + log = logger.With("handler", "split_client") + } else { + log = logger.With("handler", "split_client", "remote_addr", conn.RemoteAddr().String()) + } + + r := NewResetableLimitedReader(NewDeadlineReader(conn, config.Timeout), maxMessageSize) + buf := bufio.NewReader(r) + scanner := bufio.NewScanner(buf) + scanner.Split(splitFunc) + //16 is ratio of MaxScanTokenSize/startBufSize + buffer := make([]byte, maxMessageSize/16) + scanner.Buffer(buffer, int(maxMessageSize)) + for { + select { + case <-closer.Done(): + break + default: + } + + // Ensure that if the Conn is already closed then dont attempt to scan again + if closer.Err() == ErrClosed { + break + } + + if !scanner.Scan() { + break + } + + err := scanner.Err() + if err != nil { + // This is a user defined limit and we should notify the user. + if IsMaxReadBufferErr(err) { + log.Errorw("split_client error", "error", err) + } + return errors.Wrap(err, string(family)+" split_client error") + } + r.Reset() + callback(scanner.Bytes(), metadata) + } + + // We are out of the scanner, either we reached EOF or another fatal error occurred. + // like we failed to complete the TLS handshake or we are missing the splitHandler certificate when + // mutual auth is on, which is the default. + return scanner.Err() + }) + } +} diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go new file mode 100644 index 00000000000..9d686f922a6 --- /dev/null +++ b/filebeat/inputsource/common/listener.go @@ -0,0 +1,174 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "bufio" + "bytes" + "net" + "strings" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// Family represents the type of connection we're handling +type Family string + +const ( + // FamilyUnix represents a unix socket listener + FamilyUnix Family = "unix" + // FamilyTCP represents a tcp socket listener + FamilyTCP Family = "tcp" +) + +func (f Family) String() string { + return strings.ToUpper(string(f)) +} + +// ListenerFactory returns a net.Listener +type ListenerFactory func() (net.Listener, error) + +// Listener represent a generic connected server +type Listener struct { + Listener net.Listener + config *ListenerConfig + family Family + wg sync.WaitGroup + done chan struct{} + log *logp.Logger + closer *Closer + clientsCount atomic.Int + handlerFactory HandlerFactory + listenerFactory ListenerFactory +} + +// NewListener creates a new Listener +func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener { + return &Listener{ + config: config, + done: make(chan struct{}), + family: family, + log: logp.NewLogger(string(family)).With("address", location), + closer: NewCloser(nil), + handlerFactory: handlerFactory, + listenerFactory: listenerFactory, + } +} + +// Start listen to the socket. +func (l *Listener) Start() error { + var err error + l.Listener, err = l.listenerFactory() + if err != nil { + return err + } + + l.closer.SetCallback(func() { l.Listener.Close() }) + l.log.Info("Started listening for " + l.family.String() + " connection") + + l.wg.Add(1) + go func() { + defer l.wg.Done() + l.run() + }() + return nil +} + +// Run start and run a new TCP listener to receive new data. When a new connection is accepted, the factory is used +// to create a ConnectionHandler. The ConnectionHandler takes the connection as input and handles the data that is +// being received via tha io.Reader. Most clients use the splitHandler which can take a bufio.SplitFunc and parse +// out each message into an appropriate event. The Close() of the ConnectionHandler can be used to clean up the +// connection either by client or server based on need. +func (l *Listener) run() { + for { + conn, err := l.Listener.Accept() + if err != nil { + select { + case <-l.closer.Done(): + return + default: + l.log.Debugw("Can not accept the connection", "error", err) + continue + } + } + + handler := l.handlerFactory(*l.config) + closer := WithCloser(l.closer, func() { conn.Close() }) + + l.wg.Add(1) + go func() { + defer logp.Recover("recovering from a " + l.family.String() + " client crash") + defer l.wg.Done() + defer closer.Close() + + l.registerHandler() + defer l.unregisterHandler() + + if l.family == FamilyUnix { + // unix sockets have an empty `RemoteAddr` value, so no need to capture it + l.log.Debugw("New client", "total", l.clientsCount.Load()) + } else { + l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) + } + + err := handler(closer, conn) + if err != nil { + l.log.Debugw("client error", "error", err) + } + + defer func() { + if l.family == FamilyUnix { + // unix sockets have an empty `RemoteAddr` value, so no need to capture it + l.log.Debugw("client disconnected", "total", l.clientsCount.Load()) + } else { + l.log.Debugw("client disconnected", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) + } + }() + }() + } +} + +// Stop stops accepting new incoming connections and Close any active clients +func (l *Listener) Stop() { + l.log.Info("Stopping" + l.family.String() + "server") + l.closer.Close() + l.wg.Wait() + l.log.Info(l.family.String() + " server stopped") +} + +func (l *Listener) registerHandler() { + l.clientsCount.Inc() +} + +func (l *Listener) unregisterHandler() { + l.clientsCount.Dec() +} + +// SplitFunc allows to create a `bufio.SplitFunc` based on a delimiter provided. +func SplitFunc(lineDelimiter []byte) bufio.SplitFunc { + ld := []byte(lineDelimiter) + if bytes.Equal(ld, []byte("\n")) { + // This will work for most usecases and will also strip \r if present. + // CustomDelimiter, need to match completely and the delimiter will be completely removed from + // the returned byte slice + return bufio.ScanLines + } + return FactoryDelimiter(ld) +} diff --git a/filebeat/inputsource/tcp/scan.go b/filebeat/inputsource/common/scan.go similarity index 91% rename from filebeat/inputsource/tcp/scan.go rename to filebeat/inputsource/common/scan.go index 597deaae025..63653006238 100644 --- a/filebeat/inputsource/tcp/scan.go +++ b/filebeat/inputsource/common/scan.go @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -package tcp +package common import ( "bufio" "bytes" ) -// factoryDelimiter return a function to split line using a custom delimiter supporting multibytes +// FactoryDelimiter return a function to split line using a custom delimiter supporting multibytes // delimiter, the delimiter is stripped from the returned value. -func factoryDelimiter(delimiter []byte) bufio.SplitFunc { +func FactoryDelimiter(delimiter []byte) bufio.SplitFunc { return func(data []byte, eof bool) (int, []byte, error) { if eof && len(data) == 0 { return 0, nil, nil diff --git a/filebeat/inputsource/tcp/scan_test.go b/filebeat/inputsource/common/scan_test.go similarity index 97% rename from filebeat/inputsource/tcp/scan_test.go rename to filebeat/inputsource/common/scan_test.go index bb73739913b..5e266141193 100644 --- a/filebeat/inputsource/tcp/scan_test.go +++ b/filebeat/inputsource/common/scan_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package tcp +package common import ( "bufio" @@ -97,7 +97,7 @@ func TestCustomDelimiter(t *testing.T) { t.Run(test.name, func(t *testing.T) { buf := strings.NewReader(test.text) scanner := bufio.NewScanner(buf) - scanner.Split(factoryDelimiter(test.delimiter)) + scanner.Split(FactoryDelimiter(test.delimiter)) var elements []string for scanner.Scan() { elements = append(elements, scanner.Text()) diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go index 3eb25c42d9a..02039a6c1a6 100644 --- a/filebeat/inputsource/tcp/config.go +++ b/filebeat/inputsource/tcp/config.go @@ -28,8 +28,6 @@ import ( // Name is the human readable name and identifier. const Name = "tcp" -type size uint64 - // Config exposes the tcp configuration. type Config struct { Host string `config:"host"` diff --git a/filebeat/inputsource/tcp/handler.go b/filebeat/inputsource/tcp/handler.go index 455cc76909f..838fe476a81 100644 --- a/filebeat/inputsource/tcp/handler.go +++ b/filebeat/inputsource/tcp/handler.go @@ -18,109 +18,20 @@ package tcp import ( - "bufio" "crypto/tls" "crypto/x509" "net" - "time" - - "github.com/pkg/errors" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" ) -// splitHandler is a TCP client that has splitting capabilities. -type splitHandler struct { - callback inputsource.NetworkFunc - done chan struct{} - metadata inputsource.NetworkMetadata - splitFunc bufio.SplitFunc - maxMessageSize uint64 - timeout time.Duration -} - -// HandlerFactory returns a ConnectionHandler func -type HandlerFactory func(config Config) ConnectionHandler - -// ConnectionHandler interface provides mechanisms for handling of incoming TCP connections -type ConnectionHandler interface { - Handle(CloseRef, net.Conn) error -} - -// SplitHandlerFactory allows creation of a ConnectionHandler that can do splitting of messages received on a TCP connection. -func SplitHandlerFactory(callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { - return func(config Config) ConnectionHandler { - return newSplitHandler( - callback, - splitFunc, - uint64(config.MaxMessageSize), - config.Timeout, - ) - } -} - -// newSplitHandler allows creation of a TCP client that has splitting capabilities. -func newSplitHandler( - callback inputsource.NetworkFunc, - splitFunc bufio.SplitFunc, - maxReadMessage uint64, - timeout time.Duration, -) ConnectionHandler { - client := &splitHandler{ - callback: callback, - done: make(chan struct{}), - splitFunc: splitFunc, - maxMessageSize: maxReadMessage, - timeout: timeout, - } - return client -} - -// Handle takes a connection as input and processes data received on it. -func (c *splitHandler) Handle(closer CloseRef, conn net.Conn) error { - c.metadata = inputsource.NetworkMetadata{ +// MetadataCallback returns common metadata about a tcp connection +func MetadataCallback(conn net.Conn) inputsource.NetworkMetadata { + return inputsource.NetworkMetadata{ RemoteAddr: conn.RemoteAddr(), TLS: extractSSLInformation(conn), } - - log := logp.NewLogger("split_client").With("remote_addr", conn.RemoteAddr().String()) - - r := NewResetableLimitedReader(NewDeadlineReader(conn, c.timeout), c.maxMessageSize) - buf := bufio.NewReader(r) - scanner := bufio.NewScanner(buf) - scanner.Split(c.splitFunc) - //16 is ratio of MaxScanTokenSize/startBufSize - buffer := make([]byte, c.maxMessageSize/16) - scanner.Buffer(buffer, int(c.maxMessageSize)) - for scanner.Scan() { - err := scanner.Err() - if err != nil { - // we are forcing a Close on the socket, lets ignore any error that could happen. - select { - case <-closer.Done(): - break - default: - } - // This is a user defined limit and we should notify the user. - if IsMaxReadBufferErr(err) { - log.Errorw("split_client error", "error", err) - } - return errors.Wrap(err, "tcp split_client error") - } - r.Reset() - c.callback(scanner.Bytes(), c.metadata) - } - - // We are out of the scanner, either we reached EOF or another fatal error occurred. - // like we failed to complete the TLS handshake or we are missing the splitHandler certificate when - // mutual auth is on, which is the default. - if err := scanner.Err(); err != nil { - return err - } - - return nil } func extractSSLInformation(c net.Conn) *inputsource.TLSMetadata { diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go index 49827ec3c74..eaf83c8526b 100644 --- a/filebeat/inputsource/tcp/server.go +++ b/filebeat/inputsource/tcp/server.go @@ -18,37 +18,28 @@ package tcp import ( - "bufio" - "bytes" "crypto/tls" "fmt" "net" - "sync" "golang.org/x/net/netutil" - "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" ) // Server represent a TCP server type Server struct { - config *Config - Listener net.Listener - wg sync.WaitGroup - done chan struct{} - factory HandlerFactory - log *logp.Logger - tlsConfig *tlscommon.TLSConfig - closer *Closer - clientsCount atomic.Int + *common.Listener + + config *Config + tlsConfig *tlscommon.TLSConfig } // New creates a new tcp server func New( config *Config, - factory HandlerFactory, + factory common.HandlerFactory, ) (*Server, error) { tlsConfig, err := tlscommon.LoadTLSServerConfig(config.TLS) if err != nil { @@ -59,96 +50,17 @@ func New( return nil, fmt.Errorf("HandlerFactory can't be empty") } - return &Server{ + server := &Server{ config: config, - done: make(chan struct{}), - factory: factory, - log: logp.NewLogger("tcp").With("address", config.Host), tlsConfig: tlsConfig, - closer: NewCloser(nil), - }, nil -} - -// Start listen to the TCP socket. -func (s *Server) Start() error { - var err error - s.Listener, err = s.createServer() - if err != nil { - return err } + server.Listener = common.NewListener(common.FamilyTCP, config.Host, factory, server.createServer, &common.ListenerConfig{ + Timeout: config.Timeout, + MaxMessageSize: config.MaxMessageSize, + MaxConnections: config.MaxConnections, + }) - s.closer.callback = func() { s.Listener.Close() } - s.log.Info("Started listening for TCP connection") - - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.run() - }() - return nil -} - -// Run start and run a new TCP listener to receive new data. When a new connection is accepted, the factory is used -// to create a ConnectionHandler. The ConnectionHandler takes the connection as input and handles the data that is -// being received via tha io.Reader. Most clients use the splitHandler which can take a bufio.SplitFunc and parse -// out each message into an appropriate event. The Close() of the ConnectionHandler can be used to clean up the -// connection either by client or server based on need. -func (s *Server) run() { - for { - conn, err := s.Listener.Accept() - if err != nil { - select { - case <-s.closer.Done(): - return - default: - s.log.Debugw("Can not accept the connection", "error", err) - continue - } - } - - handler := s.factory(*s.config) - closer := WithCloser(s.closer, func() { conn.Close() }) - - s.wg.Add(1) - go func() { - defer logp.Recover("recovering from a tcp client crash") - defer s.wg.Done() - defer closer.Close() - - s.registerHandler() - defer s.unregisterHandler() - s.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", s.clientsCount.Load()) - - err := handler.Handle(closer, conn) - if err != nil { - s.log.Debugw("client error", "error", err) - } - - defer s.log.Debugw( - "client disconnected", - "remote_address", - conn.RemoteAddr(), - "total", - s.clientsCount.Load(), - ) - }() - } -} - -// Stop stops accepting new incoming TCP connection and Close any active clients -func (s *Server) Stop() { - s.log.Info("Stopping TCP server") - s.closer.Close() - s.wg.Wait() - s.log.Info("TCP server stopped") -} - -func (s *Server) registerHandler() { - s.clientsCount.Inc() -} - -func (s *Server) unregisterHandler() { - s.clientsCount.Dec() + return server, nil } func (s *Server) createServer() (net.Listener, error) { @@ -156,7 +68,6 @@ func (s *Server) createServer() (net.Listener, error) { var err error if s.tlsConfig != nil { t := s.tlsConfig.BuildModuleConfig(s.config.Host) - s.log.Info("Listening over TLS") l, err = tls.Listen("tcp", s.config.Host, t) if err != nil { return nil, err @@ -173,15 +84,3 @@ func (s *Server) createServer() (net.Listener, error) { } return l, nil } - -// SplitFunc allows to create a `bufio.SplitFunc` based on a delimiter provided. -func SplitFunc(lineDelimiter []byte) bufio.SplitFunc { - ld := []byte(lineDelimiter) - if bytes.Equal(ld, []byte("\n")) { - // This will work for most usecases and will also strip \r if present. - // CustomDelimiter, need to match completely and the delimiter will be completely removed from - // the returned byte slice - return bufio.ScanLines - } - return factoryDelimiter(ld) -} diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go index 4e25ef74892..15831666206 100644 --- a/filebeat/inputsource/tcp/server_test.go +++ b/filebeat/inputsource/tcp/server_test.go @@ -31,7 +31,9 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/filebeat/inputsource" + netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) var defaultConfig = Config{ @@ -67,76 +69,76 @@ func TestReceiveEventsAndMetadata(t *testing.T) { { name: "NewLine", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), expectedMessages: expectedMessages, messageSent: strings.Join(expectedMessages, "\n"), }, { name: "NewLineWithCR", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("\r\n")), + splitFunc: netcommon.SplitFunc([]byte("\r\n")), expectedMessages: expectedMessages, messageSent: strings.Join(expectedMessages, "\r\n"), }, { name: "CustomDelimiter", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte(";")), + splitFunc: netcommon.SplitFunc([]byte(";")), expectedMessages: expectedMessages, messageSent: strings.Join(expectedMessages, ";"), }, { name: "MultipleCharsCustomDelimiter", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("")), + splitFunc: netcommon.SplitFunc([]byte("")), expectedMessages: expectedMessages, messageSent: strings.Join(expectedMessages, ""), }, { name: "SingleCharCustomDelimiterMessageWithoutBoundaries", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte(";")), + splitFunc: netcommon.SplitFunc([]byte(";")), expectedMessages: []string{"hello"}, messageSent: "hello", }, { name: "MultipleCharCustomDelimiterMessageWithoutBoundaries", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("")), + splitFunc: netcommon.SplitFunc([]byte("")), expectedMessages: []string{"hello"}, messageSent: "hello", }, { name: "NewLineMessageWithoutBoundaries", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), expectedMessages: []string{"hello"}, messageSent: "hello", }, { name: "NewLineLargeMessagePayload", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), expectedMessages: largeMessages, messageSent: strings.Join(largeMessages, "\n"), }, { name: "CustomLargeMessagePayload", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte(";")), + splitFunc: netcommon.SplitFunc([]byte(";")), expectedMessages: largeMessages, messageSent: strings.Join(largeMessages, ";"), }, { name: "ReadRandomLargePayload", cfg: map[string]interface{}{}, - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), expectedMessages: []string{randomGeneratedText}, messageSent: randomGeneratedText, }, { name: "MaxReadBufferReachedUserConfigured", - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), cfg: map[string]interface{}{ "max_message_size": 50000, }, @@ -145,7 +147,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { }, { name: "MaxBufferSizeSet", - splitFunc: SplitFunc([]byte("\n")), + splitFunc: netcommon.SplitFunc([]byte("\n")), cfg: map[string]interface{}{ "max_message_size": 66 * 1024, }, @@ -169,7 +171,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { return } - factory := SplitHandlerFactory(to, test.splitFunc) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logp.NewLogger("test"), MetadataCallback, to, test.splitFunc) server, err := New(&config, factory) if !assert.NoError(t, err) { return @@ -180,7 +182,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { } defer server.Stop() - conn, err := net.Dial("tcp", server.Listener.Addr().String()) + conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String()) require.NoError(t, err) fmt.Fprint(conn, test.messageSent) conn.Close() @@ -221,7 +223,7 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { return } - factory := SplitHandlerFactory(to, bufio.ScanLines) + factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logp.NewLogger("test"), MetadataCallback, to, bufio.ScanLines) server, err := New(&config, factory) if !assert.NoError(t, err) { @@ -236,7 +238,7 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { samples := generateMessages(eventsCount, 1024) for w := 0; w < workers; w++ { go func() { - conn, err := net.Dial("tcp", server.Listener.Addr().String()) + conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String()) defer conn.Close() assert.NoError(t, err) for _, sample := range samples { diff --git a/filebeat/inputsource/unix/config.go b/filebeat/inputsource/unix/config.go new file mode 100644 index 00000000000..79b2a43dd08 --- /dev/null +++ b/filebeat/inputsource/unix/config.go @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/common/cfgtype" +) + +// Name is the human readable name and identifier. +const Name = "unix" + +// Config exposes the unix configuration. +type Config struct { + Path string `config:"path"` + Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` + MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"` + MaxConnections int `config:"max_connections"` +} + +// Validate validates the Config option for the unix input. +func (c *Config) Validate() error { + if len(c.Path) == 0 { + return fmt.Errorf("need to specify the path to the unix socket") + } + return nil +} diff --git a/filebeat/inputsource/unix/handler.go b/filebeat/inputsource/unix/handler.go new file mode 100644 index 00000000000..70391e35fb3 --- /dev/null +++ b/filebeat/inputsource/unix/handler.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "net" + + "github.com/elastic/beats/v7/filebeat/inputsource" +) + +// MetadataCallback returns common metadata about a unix connection +func MetadataCallback(conn net.Conn) inputsource.NetworkMetadata { + return inputsource.NetworkMetadata{} +} diff --git a/filebeat/inputsource/unix/server.go b/filebeat/inputsource/unix/server.go new file mode 100644 index 00000000000..965a3300282 --- /dev/null +++ b/filebeat/inputsource/unix/server.go @@ -0,0 +1,67 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "fmt" + "net" + + "golang.org/x/net/netutil" + + "github.com/elastic/beats/v7/filebeat/inputsource/common" +) + +// Server represent a unix server +type Server struct { + *common.Listener + + config *Config +} + +// New creates a new unix server +func New( + config *Config, + factory common.HandlerFactory, +) (*Server, error) { + if factory == nil { + return nil, fmt.Errorf("HandlerFactory can't be empty") + } + + server := &Server{ + config: config, + } + server.Listener = common.NewListener(common.FamilyUnix, config.Path, factory, server.createServer, &common.ListenerConfig{ + Timeout: config.Timeout, + MaxMessageSize: config.MaxMessageSize, + MaxConnections: config.MaxConnections, + }) + + return server, nil +} + +func (s *Server) createServer() (net.Listener, error) { + l, err := net.Listen("unix", s.config.Path) + if err != nil { + return nil, err + } + + if s.config.MaxConnections > 0 { + return netutil.LimitListener(l, s.config.MaxConnections), nil + } + return l, nil +} diff --git a/filebeat/inputsource/unix/server_test.go b/filebeat/inputsource/unix/server_test.go new file mode 100644 index 00000000000..36e75c757e9 --- /dev/null +++ b/filebeat/inputsource/unix/server_test.go @@ -0,0 +1,278 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package unix + +import ( + "bufio" + "fmt" + "math/rand" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/filebeat/inputsource" + netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var defaultConfig = Config{ + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, +} + +type info struct { + message string + mt inputsource.NetworkMetadata +} + +func TestErrorOnEmptyLineDelimiter(t *testing.T) { + c := common.NewConfig() + config := defaultConfig + err := c.Unpack(&config) + assert.Error(t, err) +} + +func TestReceiveEventsAndMetadata(t *testing.T) { + expectedMessages := generateMessages(5, 100) + largeMessages := generateMessages(10, 4096) + extraLargeMessages := generateMessages(2, 65*1024) + randomGeneratedText := randomString(900000) + + tests := []struct { + name string + cfg map[string]interface{} + splitFunc bufio.SplitFunc + expectedMessages []string + messageSent string + }{ + { + name: "NewLine", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("\n")), + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\n"), + }, + { + name: "NewLineWithCR", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("\r\n")), + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\r\n"), + }, + { + name: "CustomDelimiter", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte(";")), + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ";"), + }, + { + name: "MultipleCharsCustomDelimiter", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("")), + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ""), + }, + { + name: "SingleCharCustomDelimiterMessageWithoutBoundaries", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte(";")), + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "MultipleCharCustomDelimiterMessageWithoutBoundaries", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("")), + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineMessageWithoutBoundaries", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("\n")), + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineLargeMessagePayload", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("\n")), + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, "\n"), + }, + { + name: "CustomLargeMessagePayload", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte(";")), + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, ";"), + }, + { + name: "ReadRandomLargePayload", + cfg: map[string]interface{}{}, + splitFunc: netcommon.SplitFunc([]byte("\n")), + expectedMessages: []string{randomGeneratedText}, + messageSent: randomGeneratedText, + }, + { + name: "MaxReadBufferReachedUserConfigured", + splitFunc: netcommon.SplitFunc([]byte("\n")), + cfg: map[string]interface{}{ + "max_message_size": 50000, + }, + expectedMessages: []string{}, + messageSent: randomGeneratedText, + }, + { + name: "MaxBufferSizeSet", + splitFunc: netcommon.SplitFunc([]byte("\n")), + cfg: map[string]interface{}{ + "max_message_size": 66 * 1024, + }, + expectedMessages: extraLargeMessages, + messageSent: strings.Join(extraLargeMessages, "\n"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ch := make(chan *info, len(test.expectedMessages)) + defer close(ch) + to := func(message []byte, mt inputsource.NetworkMetadata) { + ch <- &info{message: string(message), mt: mt} + } + path := filepath.Join(os.TempDir(), "test.sock") + test.cfg["path"] = path + cfg, _ := common.NewConfigFrom(test.cfg) + config := defaultConfig + err := cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, to, test.splitFunc) + server, err := New(&config, factory) + if !assert.NoError(t, err) { + return + } + err = server.Start() + if !assert.NoError(t, err) { + return + } + defer server.Stop() + + conn, err := net.Dial("unix", path) + require.NoError(t, err) + fmt.Fprint(conn, test.messageSent) + conn.Close() + + var events []*info + + for len(events) < len(test.expectedMessages) { + select { + case event := <-ch: + events = append(events, event) + default: + } + } + + for idx, e := range events { + assert.Equal(t, test.expectedMessages[idx], e.message) + } + }) + } +} + +func TestReceiveNewEventsConcurrently(t *testing.T) { + workers := 4 + eventsCount := 100 + path := filepath.Join(os.TempDir(), "test.sock") + ch := make(chan *info, eventsCount*workers) + defer close(ch) + to := func(message []byte, mt inputsource.NetworkMetadata) { + ch <- &info{message: string(message), mt: mt} + } + cfg, err := common.NewConfigFrom(map[string]interface{}{"path": path}) + if !assert.NoError(t, err) { + return + } + config := defaultConfig + err = cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + + factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, to, bufio.ScanLines) + + server, err := New(&config, factory) + if !assert.NoError(t, err) { + return + } + err = server.Start() + if !assert.NoError(t, err) { + return + } + defer server.Stop() + + samples := generateMessages(eventsCount, 1024) + for w := 0; w < workers; w++ { + go func() { + conn, err := net.Dial("unix", path) + defer conn.Close() + assert.NoError(t, err) + for _, sample := range samples { + fmt.Fprintln(conn, sample) + } + }() + } + + var events []*info + for len(events) < eventsCount*workers { + select { + case event := <-ch: + events = append(events, event) + default: + } + } +} + +func randomString(l int) string { + charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789") + message := make([]byte, l) + for i := range message { + message[i] = charsets[rand.Intn(len(charsets))] + } + return string(message) +} + +func generateMessages(c int, l int) []string { + messages := make([]string, c) + for i := range messages { + messages[i] = randomString(l) + } + return messages +} diff --git a/filebeat/tests/system/test_syslog.py b/filebeat/tests/system/test_syslog.py index ec2d80a56c2..ed8f5a58f2f 100644 --- a/filebeat/tests/system/test_syslog.py +++ b/filebeat/tests/system/test_syslog.py @@ -1,5 +1,8 @@ from filebeat import BaseTest import socket +import os +import tempfile +import unittest class Test(BaseTest): @@ -129,7 +132,100 @@ def test_syslog_with_udp(self): assert len(output) == 2 self.assert_syslog(output[0]) - def assert_syslog(self, syslog): + # AF_UNIX support in python isn't available until + # Python 3.9, see https://bugs.python.org/issue33408 + @unittest.skipIf(not hasattr(socket, 'AF_UNIX'), "No Windows AF_UNIX support before Python 3.9") + def test_syslog_with_unix(self): + """ + Test syslog input with events from UNIX. + """ + # we create the socket in a temporary directory because + # go will fail to create a unix socket if the path length + # is longer than 108 characters. See https://github.com/golang/go/issues/6895 + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(tempdir, "filebeat.sock") + input_raw = """ +- type: syslog + protocol: + unix: + path: {} +""" + + input_raw = input_raw.format(path) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for UNIX connection")) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # UNIX + + sock.connect(path) + + for n in range(0, 2): + m = "<13>Oct 11 22:14:15 wopr.mymachine.co postfix/smtpd[2000]:" \ + " 'su root' failed for lonvick on /dev/pts/8 {}\n" + m = m.format(n) + sock.send(m.encode("utf-8")) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + self.assert_syslog(output[0], False) + + # AF_UNIX support in python isn't available until + # Python 3.9, see https://bugs.python.org/issue33408 + @unittest.skipIf(not hasattr(socket, 'AF_UNIX'), "No Windows AF_UNIX support before Python 3.9") + def test_syslog_with_unix_invalid_message(self): + """ + Test syslog input with invalid events from UNIX. + """ + # we create the socket in a temporary directory because + # go will fail to create a unix socket if the path length + # is longer than 108 characters. See https://github.com/golang/go/issues/6895 + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(tempdir, "filebeat.sock") + input_raw = """ +- type: syslog + protocol: + unix: + path: {} +""" + + input_raw = input_raw.format(path) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for UNIX connection")) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # UNIX + + sock.connect(path) + for n in range(0, 2): + sock.send("invalid\n".encode("utf-8")) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["message"] == "invalid" + sock.close() + + def assert_syslog(self, syslog, has_address=True): assert syslog["event.severity"] == 5 assert syslog["hostname"] == "wopr.mymachine.co" assert syslog["input.type"] == "syslog" @@ -140,4 +236,5 @@ def assert_syslog(self, syslog): assert syslog["syslog.priority"] == 13 assert syslog["syslog.severity_label"] == "Notice" assert syslog["syslog.facility_label"] == "user-level" - assert len(syslog["log.source.address"]) > 0 + if has_address: + assert len(syslog["log.source.address"]) > 0 diff --git a/filebeat/tests/system/test_unix.py b/filebeat/tests/system/test_unix.py new file mode 100644 index 00000000000..66d261f3c91 --- /dev/null +++ b/filebeat/tests/system/test_unix.py @@ -0,0 +1,77 @@ +from filebeat import BaseTest +import os +import socket +import tempfile +import unittest + +# AF_UNIX support in python isn't available until +# Python 3.9, see https://bugs.python.org/issue33408 + + +@unittest.skipIf(not hasattr(socket, 'AF_UNIX'), "No Windows AF_UNIX support before Python 3.9") +class Test(BaseTest): + """ + Test filebeat UNIX input + """ + + def test_unix_with_newline_delimiter(self): + """ + Test UNIX input with a new line delimiter + """ + self.send_events_with_delimiter("\n") + + def test_unix_with_custom_char_delimiter(self): + """ + Test UNIX input with a custom single char delimiter + """ + self.send_events_with_delimiter(";") + + def test_unix_with_custom_word_delimiter(self): + """ + Test UNIX input with a custom single char delimiter + """ + self.send_events_with_delimiter("") + + def send_events_with_delimiter(self, delimiter): + # we create the socket in a temporary directory because + # go will fail to create a unix socket if the path length + # is longer than 108 characters. See https://github.com/golang/go/issues/6895 + with tempfile.TemporaryDirectory() as tempdir: + path = os.path.join(tempdir, "filebeat.sock") + input_raw = """ +- type: unix + path: {} + enabled: true +""" + + # Use default of \n and stripping \r + if delimiter != "": + input_raw += "\n line_delimiter: {}".format(delimiter) + + input_raw = input_raw.format(path) + + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for UNIX connection")) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # UNIX + sock.connect(path) + + for n in range(0, 2): + sock.send(bytes("Hello World: " + str(n) + delimiter, "utf-8")) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["input.type"] == "unix" + + sock.close()