Skip to content

Commit

Permalink
Syslog input as wrapper (#376)
Browse files Browse the repository at this point in the history
* Refactor syslog_input into a single operator

The syslog_input operator was previously implemented
as a builder that built two operators. While this is
a perfectly fine solution, it happens to be the only
place in this codebase where multiple operators are
returned from a builder, excepting plugins.

This new implementation builds the same two operators,
but then wraps them together within a single operator.
This will allow for the eventual simplification of
other code, once plugins are also removed from this
codebase.
  • Loading branch information
djaglowski authored Feb 15, 2022
1 parent 3222787 commit 8e7555b
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 117 deletions.
114 changes: 71 additions & 43 deletions operator/builtin/input/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,76 +33,104 @@ func NewSyslogInputConfig(operatorID string) *SyslogInputConfig {
}
}

type BaseSyslogInputConfig struct {
helper.InputConfig `yaml:",inline"`
Tcp *tcp.TCPInputConfig `json:"tcp" yaml:"tcp"`
Udp *udp.UDPInputConfig `json:"udp" yaml:"udp"`
}

type SyslogInputConfig struct {
syslog.SyslogParserConfig `yaml:"-"`
helper.InputConfig `yaml:",inline"`
Tcp *tcp.TCPInputConfig `json:"tcp" yaml:"tcp"`
Udp *udp.UDPInputConfig `json:"udp" yaml:"udp"`
helper.InputConfig `yaml:",inline"`
syslog.SyslogBaseConfig `yaml:",inline"`
Tcp *tcp.TCPBaseConfig `json:"tcp" yaml:"tcp"`
Udp *udp.UDPBaseConfig `json:"udp" yaml:"udp"`
}

func (c SyslogInputConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
if c.Tcp == nil && c.Udp == nil {
return nil, fmt.Errorf("need tcp config or udp config")
}
parentID := c.InputConfig.ID()
if parentID == "" {
parentID = c.InputConfig.Type()
}
subContext := context.WithSubNamespace(parentID)
if c.Tcp == nil && c.Udp == nil {
return nil, fmt.Errorf("need tcp config or udp config")
inputBase, err := c.InputConfig.Build(context)
if err != nil {
return nil, err
}

c.SyslogParserConfig.OutputIDs = c.OutputIDs
ops, err := c.SyslogParserConfig.Build(subContext)
syslogParserCfg := syslog.NewSyslogParserConfig(inputBase.ID() + "_internal_tcp")
syslogParserCfg.SyslogBaseConfig = c.SyslogBaseConfig
syslogParserCfg.SetID(inputBase.ID() + "_internal_parser")
syslogParserCfg.OutputIDs = c.OutputIDs
parserOps, err := syslogParserCfg.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve syslog config: %s", err)
}
syslogParser := parserOps[0]

if c.Tcp != nil {
c.Tcp.OutputIDs = []string{ops[0].ID()}
inputOps, err := c.Tcp.Build(subContext)
tcpInputCfg := tcp.NewTCPInputConfig(inputBase.ID() + "_internal_tcp")
tcpInputCfg.TCPBaseConfig = *c.Tcp

inputOps, err := tcpInputCfg.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve tcp config: %s", err)
}
ops = append(inputOps, ops...)
inputOp := inputOps[0]

inputOp.SetOutputIDs([]string{syslogParser.ID()})
if err := inputOp.SetOutputs([]operator.Operator{syslogParser}); err != nil {
return nil, fmt.Errorf("failed to set outputs")
}

syslogInput := &SyslogInput{
InputOperator: inputBase,
tcp: inputOp.(*tcp.TCPInput),
parser: syslogParser.(*syslog.SyslogParser),
}
return []operator.Operator{syslogInput}, nil
}

if c.Udp != nil {
c.Udp.OutputIDs = []string{ops[0].ID()}
inputOps, err := c.Udp.Build(context)
udpInputCfg := udp.NewUDPInputConfig(inputBase.ID() + "_internal_udp")
udpInputCfg.UDPBaseConfig = *c.Udp

inputOps, err := udpInputCfg.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve upd config: %s", err)
}
ops = append(inputOps, ops...)
inputOp := inputOps[0]

inputOp.SetOutputIDs([]string{syslogParser.ID()})
if err := inputOp.SetOutputs([]operator.Operator{syslogParser}); err != nil {
return nil, fmt.Errorf("failed to set outputs")
}

syslogInput := &SyslogInput{
InputOperator: inputBase,
udp: inputOp.(*udp.UDPInput),
parser: syslogParser.(*syslog.SyslogParser),
}
return []operator.Operator{syslogInput}, nil
}

return ops, nil
return nil, fmt.Errorf("need tcp config or udp config")
}

func (c *SyslogInputConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
parserCfg := syslog.NewSyslogParserConfig("syslog_parser")
// SyslogInput is an operator that listens for log entries over tcp.
type SyslogInput struct {
helper.InputOperator
tcp *tcp.TCPInput
udp *udp.UDPInput
parser *syslog.SyslogParser
}

err := unmarshal(parserCfg)
if err != nil {
return err
// Start will start listening for log entries over tcp or udp.
func (t *SyslogInput) Start(p operator.Persister) error {
if t.tcp != nil {
return t.tcp.Start(p)
}
c.SyslogParserConfig = *parserCfg
return t.udp.Start(p)
}

base := &BaseSyslogInputConfig{}
err = unmarshal(base)
if err != nil {
return err
// Stop will stop listening for messages.
func (t *SyslogInput) Stop() error {
if t.tcp != nil {
return t.tcp.Stop()
}
return t.udp.Stop()
}

c.InputConfig = base.InputConfig
c.Tcp = base.Tcp
c.Udp = base.Udp
return nil
// SetOutputs will set the outputs of the internal syslog parser.
func (t *SyslogInput) SetOutputs(operators []operator.Operator) error {
t.parser.SetOutputIDs(t.GetOutputIDs())
return t.parser.SetOutputs(operators)
}
101 changes: 45 additions & 56 deletions operator/builtin/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func TestSyslogInput(t *testing.T) {

for _, tc := range cases {
t.Run(fmt.Sprintf("TCP-%s", tc.Name), func(t *testing.T) {
SyslogInputTest(t, NewSyslogInputConfigWithTcp(tc.Config), tc)
SyslogInputTest(t, NewSyslogInputConfigWithTcp(&tc.Config.SyslogBaseConfig), tc)
})
t.Run(fmt.Sprintf("UDP-%s", tc.Name), func(t *testing.T) {
SyslogInputTest(t, NewSyslogInputConfigWithUdp(tc.Config), tc)
SyslogInputTest(t, NewSyslogInputConfigWithUdp(&tc.Config.SyslogBaseConfig), tc)
})
}
}
Expand Down Expand Up @@ -94,75 +94,57 @@ func SyslogInputTest(t *testing.T, cfg *SyslogInputConfig, tc syslog.Case) {
}

func TestSyslogIDs(t *testing.T) {
basicConfig := func() *syslog.SyslogParserConfig {
basicConfig := func() *syslog.SyslogBaseConfig {
cfg := syslog.NewSyslogParserConfig("test_syslog_parser")
return cfg
}

cases := []struct {
Name string
Cfg *syslog.SyslogParserConfig
ExpectedOpIDs []string
UDPorTCP string
}{
{
Name: "default",
Cfg: func() *syslog.SyslogParserConfig {
sysCfg := basicConfig()
sysCfg.Protocol = "RFC3164"
return sysCfg
}(),
UDPorTCP: "UDP",
ExpectedOpIDs: []string{
"$.test_syslog.test_syslog_parser",
"$.fake",
},
},
cfg.Protocol = "RFC3164"
return &cfg.SyslogBaseConfig
}

for _, tc := range cases {
t.Run(fmt.Sprintf("TCP-%s", tc.Name), func(t *testing.T) {
cfg := NewSyslogInputConfigWithTcp(tc.Cfg)
bc := testutil.NewBuildContext(t)
ops, err := cfg.Build(bc)
require.NoError(t, err)
for i, op := range ops {
out := op.GetOutputIDs()
require.Equal(t, tc.ExpectedOpIDs[i], out[0])
}
})
t.Run(fmt.Sprintf("UDP-%s", tc.Name), func(t *testing.T) {
cfg := NewSyslogInputConfigWithUdp(tc.Cfg)
bc := testutil.NewBuildContext(t)
ops, err := cfg.Build(bc)
require.NoError(t, err)
for i, op := range ops {
out := op.GetOutputIDs()
require.Equal(t, tc.ExpectedOpIDs[i], out[0])
}
})
}
t.Run("TCP", func(t *testing.T) {
cfg := NewSyslogInputConfigWithTcp(basicConfig())
bc := testutil.NewBuildContext(t)
ops, err := cfg.Build(bc)
require.NoError(t, err)
syslogInputOp := ops[0].(*SyslogInput)
require.Equal(t, "$.test_syslog_internal_tcp", syslogInputOp.tcp.ID())
require.Equal(t, "$.test_syslog_internal_parser", syslogInputOp.parser.ID())
require.Equal(t, []string{syslogInputOp.parser.ID()}, syslogInputOp.tcp.GetOutputIDs())
require.Equal(t, []string{"$.fake"}, syslogInputOp.parser.GetOutputIDs())
require.Equal(t, []string{"$.fake"}, syslogInputOp.GetOutputIDs())
})
t.Run("UDP", func(t *testing.T) {
cfg := NewSyslogInputConfigWithUdp(basicConfig())
bc := testutil.NewBuildContext(t)
ops, err := cfg.Build(bc)
require.NoError(t, err)
syslogInputOp := ops[0].(*SyslogInput)
require.Equal(t, "$.test_syslog_internal_udp", syslogInputOp.udp.ID())
require.Equal(t, "$.test_syslog_internal_parser", syslogInputOp.parser.ID())
require.Equal(t, []string{syslogInputOp.parser.ID()}, syslogInputOp.udp.GetOutputIDs())
require.Equal(t, []string{"$.fake"}, syslogInputOp.parser.GetOutputIDs())
require.Equal(t, []string{"$.fake"}, syslogInputOp.GetOutputIDs())
})
}

func NewSyslogInputConfigWithTcp(syslogCfg *syslog.SyslogParserConfig) *SyslogInputConfig {
func NewSyslogInputConfigWithTcp(syslogCfg *syslog.SyslogBaseConfig) *SyslogInputConfig {
cfg := NewSyslogInputConfig("test_syslog")
cfg.SyslogParserConfig = *syslogCfg
cfg.Tcp = tcp.NewTCPInputConfig("test_syslog_tcp")
cfg.SyslogBaseConfig = *syslogCfg
cfg.Tcp = &tcp.NewTCPInputConfig("test_syslog_tcp").TCPBaseConfig
cfg.Tcp.ListenAddress = ":14201"
cfg.OutputIDs = []string{"$.fake"}
return cfg
}

func NewSyslogInputConfigWithUdp(syslogCfg *syslog.SyslogParserConfig) *SyslogInputConfig {
func NewSyslogInputConfigWithUdp(syslogCfg *syslog.SyslogBaseConfig) *SyslogInputConfig {
cfg := NewSyslogInputConfig("test_syslog")
cfg.SyslogParserConfig = *syslogCfg
cfg.Udp = udp.NewUDPInputConfig("test_syslog_udp")
cfg.SyslogBaseConfig = *syslogCfg
cfg.Udp = &udp.NewUDPInputConfig("test_syslog_udp").UDPBaseConfig
cfg.Udp.ListenAddress = ":12032"
cfg.OutputIDs = []string{"$.fake"}
return cfg
}

func TestConfigYamlUnmarshal(t *testing.T) {
func TestConfigYamlUnmarshalUDP(t *testing.T) {
base := `type: syslog_input
protocol: rfc5424
udp:
Expand All @@ -172,18 +154,25 @@ udp:
err := yaml.Unmarshal([]byte(base), &cfg)
require.NoError(t, err)
require.Equal(t, syslog.RFC5424, cfg.Protocol)
require.Nil(t, cfg.Tcp)
require.NotNil(t, cfg.Udp)
require.Equal(t, "localhost:1234", cfg.Udp.ListenAddress)
}

base = `type: syslog_input
func TestConfigYamlUnmarshalTCP(t *testing.T) {
base := `type: syslog_input
protocol: rfc5424
tcp:
listen_address: localhost:1234
tls:
ca_file: /tmp/test.ca
`
err = yaml.Unmarshal([]byte(base), &cfg)
var cfg SyslogInputConfig
err := yaml.Unmarshal([]byte(base), &cfg)
require.NoError(t, err)
require.Equal(t, syslog.RFC5424, cfg.Protocol)
require.Nil(t, cfg.Udp)
require.NotNil(t, cfg.Tcp)
require.Equal(t, "localhost:1234", cfg.Tcp.ListenAddress)
require.Equal(t, "/tmp/test.ca", cfg.Tcp.TLS.CAFile)
}
10 changes: 8 additions & 2 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,21 @@ func init() {
func NewTCPInputConfig(operatorID string) *TCPInputConfig {
return &TCPInputConfig{
InputConfig: helper.NewInputConfig(operatorID, "tcp_input"),
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
TCPBaseConfig: TCPBaseConfig{
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
},
}
}

// TCPInputConfig is the configuration of a tcp input operator.
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`
TCPBaseConfig `yaml:",inline"`
}

// TCPBaseConfig is the detailed configuration of a tcp input operator.
type TCPBaseConfig struct {
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty" json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS *helper.TLSServerConfig `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"`
Expand Down
Loading

0 comments on commit 8e7555b

Please sign in to comment.