Skip to content

Commit

Permalink
Cherry-pick #17492 to 7.x: [Filebeat] Unix stream socket input source (
Browse files Browse the repository at this point in the history
…#17851)

* [Filebeat] Unix stream socket input source (#17492)

* initial common refactor

* Fix up unix and add license info

* Fix inputs

* Clean up handlers

* Update changelog and docs

* Fix added input

* Fix tests

* Add unix socket system tests

* add systems tests for syslog unix input

* pep autoformat

* Disable unix tests for Windows since Python 3.8 doesn't support AF_UNIX

* pep autoformat

* Address feedback

* fix test

(cherry picked from commit 3b99438)

* Fix changelog
  • Loading branch information
Andrew Stucki authored Apr 21, 2020
1 parent dd3fa32 commit a1da232
Show file tree
Hide file tree
Showing 27 changed files with 1,235 additions and 247 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Release Google Cloud module as GA. {pull}17511[17511]
- Update filebeat httpjson input to support pagination via Header and Okta module. {pull}16354[16354]
- Enhance `elasticsearch/server` fileset to handle ECS-compatible logs emitted by Elasticsearch. {issue}17715[17715] {pull}17714[17714]
- Added Unix stream socket support as an input source and a syslog input source. {pull}17492[17492]

*Heartbeat*

Expand Down
37 changes: 37 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//////////////////////////////////////////////////////////////////////////
//// This content is shared by Filebeat inputs that use the Unix inputsource
//// If you add IDs to sections, make sure you use attributes to create
//// unique IDs for each input that includes this file. Use the format:
//// [id="{beatname_lc}-input-{type}-option-name"]
//////////////////////////////////////////////////////////////////////////
[float]
[id="{beatname_lc}-input-{type}-unix-max-message-size"]
==== `max_message_size`

The maximum size of the message received over the socket. The default is `20MiB`.

[float]
[id="{beatname_lc}-input-{type}-unix-path"]
==== `path`

The path to the Unix socket that will receive event streams.

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`

Specify the characters used to split the incoming events. The default is '\n'.

[float]
[id="{beatname_lc}-input-{type}-unix-max-connections"]
==== `max_connections`

The at most number of connections to accept at any given point in time.

[float]
[id="{beatname_lc}-input-{type}-unix-timeout"]
==== `timeout`

The number of seconds of inactivity before a connection is closed. The default is `300s`.

See <<configuration-ssl>> for more information.
14 changes: 13 additions & 1 deletion filebeat/docs/inputs/input-syslog.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<titleabbrev>Syslog</titleabbrev>
++++

Use the `syslog` input to read events over TCP or UDP, this input will parse BSD (rfc3164)
Use the `syslog` input to read events over TCP, UDP, or a Unix stream socket, this input will parse BSD (rfc3164)
event and some variant.

Example configurations:
Expand All @@ -28,6 +28,14 @@ Example configurations:
host: "localhost:9000"
----

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: syslog
protocol.unix:
path: "/path/to/syslog.sock"
----

==== Configuration options

The `syslog` input supports protocol specific configuration options plus the
Expand All @@ -41,6 +49,10 @@ include::../inputs/input-common-udp-options.asciidoc[]

include::../inputs/input-common-tcp-options.asciidoc[]

===== Protocol `unix`:

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

type config struct {
Expand All @@ -54,6 +57,19 @@ var defaultTCP = syslogTCP{
LineDelimiter: "\n",
}

type syslogUnix struct {
unix.Config `config:",inline"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultUnix = syslogUnix{
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}

var defaultUDP = udp.Config{
MaxMessageSize: 10 * humanize.KiByte,
Timeout: time.Minute * 5,
Expand All @@ -72,14 +88,31 @@ func factory(
return nil, err
}

splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter))
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

factory := tcp.SplitHandlerFactory(nf, splitFunc)
logger := logp.NewLogger("input.syslog.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, nf, splitFunc)

return tcp.New(&config.Config, factory)
case unix.Name:
config := defaultUnix
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter)
}

logger := logp.NewLogger("input.syslog.unix").With("path", config.Config.Path)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logger, unix.MetadataCallback, nf, splitFunc)

return unix.New(&config.Config, factory)

case udp.Name:
config := defaultUDP
if err := cfg.Unpack(&config); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ func newBeatEvent(timestamp time.Time, metadata inputsource.NetworkMetadata, fie
},
Fields: fields,
}
event.Fields.Put("log.source.address", metadata.RemoteAddr.String())
if metadata.RemoteAddr != nil {
event.Fields.Put("log.source.address", metadata.RemoteAddr.String())
}
return event
}

Expand Down
18 changes: 10 additions & 8 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -41,7 +42,7 @@ func init() {

// Input for TCP connection
type Input struct {
sync.Mutex
mutex sync.Mutex
server *tcp.Server
started bool
outlet channel.Outleter
Expand Down Expand Up @@ -78,12 +79,13 @@ func NewInput(
forwarder.Send(event)
}

splitFunc := tcp.SplitFunc([]byte(config.LineDelimiter))
splitFunc := netcommon.SplitFunc([]byte(config.LineDelimiter))
if splitFunc == nil {
return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter)
}

factory := tcp.SplitHandlerFactory(cb, splitFunc)
logger := logp.NewLogger("input.tcp").With("address", config.Config.Host)
factory := netcommon.SplitHandlerFactory(netcommon.FamilyTCP, logger, tcp.MetadataCallback, cb, splitFunc)

server, err := tcp.New(&config.Config, factory)
if err != nil {
Expand All @@ -95,14 +97,14 @@ func NewInput(
started: false,
outlet: out,
config: &config,
log: logp.NewLogger("tcp input").With("address", config.Config.Host),
log: logger,
}, nil
}

// Run start a TCP input
func (p *Input) Run() {
p.Lock()
defer p.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.started {
p.log.Info("Starting TCP input")
Expand All @@ -117,8 +119,8 @@ func (p *Input) Run() {
// Stop stops TCP server
func (p *Input) Stop() {
defer p.outlet.Close()
p.Lock()
defer p.Unlock()
p.mutex.Lock()
defer p.mutex.Unlock()

p.log.Info("Stopping TCP input")
p.server.Stop()
Expand Down
45 changes: 45 additions & 0 deletions filebeat/input/unix/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package unix

import (
"time"

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
)

type config struct {
unix.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`

LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "unix",
},
Config: unix.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}
Loading

0 comments on commit a1da232

Please sign in to comment.