Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Update docs for unix sockets #18009

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions filebeat/docs/inputs/input-common-unix-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ The maximum size of the message received over the socket. The default is `20MiB`

The path to the Unix socket that will receive event streams.

[float]
[id="{beatname_lc}-input-{type}-unix-group"]
==== `group`

The group ownership of the Unix socket that will be created by Filebeat.
The default is the primary group name for the user Filebeat is running as.
This option is ignored on Windows.

[float]
[id="{beatname_lc}-input-{type}-unix-mode"]
==== `mode`

The file mode of the Unix socket that will be created by Filebeat. This is
expected to be a file mode as an octal string. The default value is the system
default (generally `0755`).

[float]
[id="{beatname_lc}-input-{type}-unix-line-delimiter"]
==== `line_delimiter`
Expand Down
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-syslog.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ include::../inputs/input-common-tcp-options.asciidoc[]

===== Protocol `unix`:

beta[]

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
Expand Down
35 changes: 35 additions & 0 deletions filebeat/docs/inputs/input-unix.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
:type: unix

[id="{beatname_lc}-input-{type}"]
=== Unix input

beta[]

++++
<titleabbrev>Unix</titleabbrev>
++++

Use the `unix` input to read events over a stream-oriented Unix domain socket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to document some of the behavior. Like must the file exist when the input starts? What are the file permission requirements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, unix socket behavior is generally kind of tricky. If the file descriptor used for controlling the socket lifecycle is already on the filesystem, we'll actually get an error trying to bind. Additionally, go will actually unlink the fd automatically when the socket is closed. So WDYT about something like:

The file specified under the path argument, must not exist prior to using this input. If filebeat is not cleanly shut down and you see an error starting filebeat specifying "bind: address already in use", remove the file specified at path and restart filebeat.

Copy link
Member

@andrewkroh andrewkroh Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Filebeat controls the creation and destruction? Perhaps Filebeat should automatically unlink it to try to resolve there error without user intervention.

What will the file mode be of the created unix socket? What permissions will be needed by the writing app?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, ideally filebeat would fully control the lifecycle, but I hesitate to unlink any file that a user specifies in a configuration at startup time.

AFAIK unix sockets are always created with 755 by the underlying OS. So their permissions look like srwxr-xr-x with the s indicator saying that this is actually a reference to a socket fd.

If you feel strongly about the attempted os.Remove call at startup I can add that, but I think we'd want to put a bolded section in the docs saying, "make sure that no file exists with this path, otherwise it'll be removed"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some precedence for removing the socket if it exists in libbeat. But one thing I would add is a stat call to check if it is a socket before removing it.

I think you're right with 755 for when filebeat is running as root as a service since the default umask is 0022 for root and for systemd. So this means that only root users can pass logs to Filebeat in most deployments. I would explain this so users understand the permission requirements.

Users will probably ask for a way to configure the mode and/or the group of the socket. So they don't have to run their app as root.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added the cleanup code with a refusal error if the file isn't a socket. Also added some permission modification code to address the permissions issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this through a little more while reviewing your changes. Maybe a better route is to only allow chgrp and chmod changes. chgrp works for unprivileged users (with restrictions to groups the user is a member of) whereas chown is only available to root. This would leave the owner of the socket intact which I think is nice too.

As a user I would probably create a filebeat-writers group on the system and configure Filebeat to chgrp the file to filebeat-writers and configure it to chmod it to 770 (or whatever the least privs are). Ideally it create the socket with these attributes so there's no chance of a race condition where the socket exists but it not accessible to the app that needs to use it.

chgrp (and chown) is current prohibited in Beats by a seccomp policy. You'll have to whitelist chgrp in order to allow it in the Filebeat process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, modified the implementation to only use chown with a gid set and chmod -- WRT creating the fd with the proper umask--from the looks of golang/go#11822 our only two options are doing a syscall.Umask prior to the call to Listen which is going to affect all files created by the process or changing it after the fact. So, I figure changing the file mode after the fact is probably the lesser of the two evils.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say to use chgrp so we can be finer grained in our seccomp policy but it looks like chgrp isn't currently exposed in any readily available Go API. So LGTM.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: unix
max_message_size: 10MiB
path: "/var/run/filebeat.sock"
----


==== Configuration options

The `unix` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

include::../inputs/input-common-unix-options.asciidoc[]

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

:type!:
3 changes: 3 additions & 0 deletions filebeat/input/syslog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -98,6 +99,8 @@ func factory(

return tcp.New(&config.Config, factory)
case unix.Name:
cfgwarn.Beta("Syslog Unix socket support is beta.")

config := defaultUnix
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/unix"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -56,6 +57,7 @@ func NewInput(
connector channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
Expand Down
2 changes: 2 additions & 0 deletions filebeat/inputsource/unix/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const Name = "unix"
// Config exposes the unix configuration.
type Config struct {
Path string `config:"path"`
Group *string `config:"group"`
Mode *string `config:"mode"`
Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"`
MaxConnections int `config:"max_connections"`
Expand Down
81 changes: 81 additions & 0 deletions filebeat/inputsource/unix/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ package unix
import (
"fmt"
"net"
"os"
"os/user"
"runtime"
"strconv"

"github.com/pkg/errors"
"golang.org/x/net/netutil"

"github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Server represent a unix server
Expand Down Expand Up @@ -55,13 +61,88 @@ func New(
}

func (s *Server) createServer() (net.Listener, error) {
if err := s.cleanupStaleSocket(); err != nil {
return nil, err
}

l, err := net.Listen("unix", s.config.Path)
if err != nil {
return nil, err
}

if err := s.setSocketOwnership(); err != nil {
return nil, err
}

if err := s.setSocketMode(); err != nil {
return nil, err
}

if s.config.MaxConnections > 0 {
return netutil.LimitListener(l, s.config.MaxConnections), nil
}
return l, nil
}

func (s *Server) cleanupStaleSocket() error {
path := s.config.Path
info, err := os.Lstat(path)
if err != nil {
// If the file does not exist, then the cleanup can be considered successful.
if os.IsNotExist(err) {
return nil
}
return errors.Wrapf(err, "cannot lstat unix socket file at location %s", path)
}

if info.Mode()&os.ModeSocket == 0 {
return fmt.Errorf("refusing to remove file at location %s, it is not a socket", path)
}

if err := os.Remove(path); err != nil {
return errors.Wrapf(err, "cannot remove existing unix socket file at location %s", path)
}

return nil
}

func (s *Server) setSocketOwnership() error {
if s.config.Group != nil {
if runtime.GOOS == "windows" {
logp.NewLogger("unix").Warn("windows does not support the 'group' configuration option, ignoring")
return nil
}
g, err := user.LookupGroup(*s.config.Group)
if err != nil {
return err
}
gid, err := strconv.Atoi(g.Gid)
if err != nil {
return err
}
return os.Chown(s.config.Path, -1, gid)
}
return nil
}

func (s *Server) setSocketMode() error {
if s.config.Mode != nil {
mode, err := parseFileMode(*s.config.Mode)
if err != nil {
return err
}
return os.Chmod(s.config.Path, mode)
}
return nil
}

func parseFileMode(mode string) (os.FileMode, error) {
parsed, err := strconv.ParseUint(mode, 8, 32)
if err != nil {
return 0, err
}
if parsed > 0777 {
return 0, errors.New("invalid file mode")
}
return os.FileMode(parsed), nil
}
86 changes: 86 additions & 0 deletions filebeat/inputsource/unix/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"math/rand"
"net"
"os"
"os/user"
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource"
netcommon "github.com/elastic/beats/v7/filebeat/inputsource/common"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -207,6 +211,88 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
}
}

func TestSocketOwnershipAndMode(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("changing socket ownership is only supported on non-windows")
return
}

groups, err := os.Getgroups()
require.NoError(t, err)

if len(groups) <= 1 {
t.Skip("no group that we can change to")
return
}

group, err := user.LookupGroupId(strconv.Itoa(groups[1]))
require.NoError(t, err)

path := filepath.Join(os.TempDir(), "test.sock")
cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
"group": group.Name,
"mode": "0740",
})
config := defaultConfig
err = cfg.Unpack(&config)
require.NoError(t, err)

factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.NoError(t, err)
defer server.Stop()

info, err := file.Lstat(path)
require.NoError(t, err)
require.NotEqual(t, 0, info.Mode()&os.ModeSocket)
require.Equal(t, os.FileMode(0740), info.Mode().Perm())
gid, err := info.GID()
require.NoError(t, err)
require.Equal(t, group.Gid, strconv.Itoa(gid))
}

func TestSocketCleanup(t *testing.T) {
path := filepath.Join(os.TempDir(), "test.sock")
mockStaleSocket, err := net.Listen("unix", path)
require.NoError(t, err)
defer mockStaleSocket.Close()

cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
})
config := defaultConfig
require.NoError(t, cfg.Unpack(&config))
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.NoError(t, err)
server.Stop()
}

func TestSocketCleanupRefusal(t *testing.T) {
path := filepath.Join(os.TempDir(), "test.sock")
f, err := os.Create(path)
require.NoError(t, err)
require.NoError(t, f.Close())
defer os.Remove(path)

cfg, _ := common.NewConfigFrom(map[string]interface{}{
"path": path,
})
config := defaultConfig
require.NoError(t, cfg.Unpack(&config))
factory := netcommon.SplitHandlerFactory(netcommon.FamilyUnix, logp.NewLogger("test"), MetadataCallback, nil, netcommon.SplitFunc([]byte("\n")))
server, err := New(&config, factory)
require.NoError(t, err)
err = server.Start()
require.Error(t, err)
require.Contains(t, err.Error(), "refusing to remove file at location")
}

func TestReceiveNewEventsConcurrently(t *testing.T) {
workers := 4
eventsCount := 100
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/seccomp/policy_linux_386.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func init() {
"access",
"brk",
"chmod",
"chown",
"clock_gettime",
"clone",
"close",
Expand Down
1 change: 1 addition & 0 deletions libbeat/common/seccomp/policy_linux_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
"bind",
"brk",
"chmod",
"chown",
"clock_gettime",
"clone",
"close",
Expand Down