Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator tcp input adjustable buffer #256

Merged
merged 8 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- Added optional `max_buffer_size` parameter to tcp input operator

## [0.13.15] - 2021-02-26
- Same as 0.13.14, but released with [plugins v0.0.48](https://github.com/observIQ/stanza-plugins/releases/tag/v0.0.48)
- Adds TLS support to `vmware_vcenter` and `vmware_esxi`

## [0.13.14] - 2021-02-25

### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
Expand Down
32 changes: 29 additions & 3 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ import (
"go.uber.org/zap"
)

const (
// InitialBufferSize is the initial size used for buffering
// TCP input
InitialBufferSize = 64*1024
jsirianni marked this conversation as resolved.
Show resolved Hide resolved

// DefaultMaxBufferSize is the max buffer sized used
// if MaxBufferSize is not set
DefaultMaxBufferSize = 1024*1024
)

func init() {
operator.Register("tcp_input", func() operator.Builder { return NewTCPInputConfig("") })
}
Expand All @@ -30,6 +40,7 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig {
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`

MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
}
Expand All @@ -53,6 +64,16 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

// If MaxBufferSize not set, set sane default in order to remain
// backwards compatible with existing plugins and configurations
if c.MaxBufferSize == 0 {
c.MaxBufferSize = DefaultMaxBufferSize
}

if c.MaxBufferSize < InitialBufferSize {
return nil, fmt.Errorf("invalid value for parameter 'max_buffer_size', must be equal to or greater than %d bytes", InitialBufferSize)
}

if c.ListenAddress == "" {
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}
Expand Down Expand Up @@ -82,6 +103,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
tcpInput := &TCPInput{
InputOperator: inputOperator,
address: c.ListenAddress,
maxBufferSize: int(c.MaxBufferSize),
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
}
Expand All @@ -91,9 +113,10 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
// TCPInput is an operator that listens for log entries over tcp.
type TCPInput struct {
helper.InputOperator
address string
tlsEnable bool
tlsKeyPair tls.Certificate
address string
maxBufferSize int
tlsEnable bool
tlsKeyPair tls.Certificate

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -183,7 +206,10 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
defer t.wg.Done()
defer cancel()

// Initial buffer size is 64k
buf := make([]byte, 0, 64 * 1024)
scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.maxBufferSize * 1024)
for scanner.Scan() {
entry, err := t.NewEntry(scanner.Text())
if err != nil {
Expand Down
95 changes: 95 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,101 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func TestBuild(t *testing.T) {
cases := []struct {
name string
inputRecord TCPInputConfig
expectErr bool
}{
{
"default-auto-address",
TCPInputConfig{
ListenAddress: ":0",
},
false,
},
{
"default-fixed-address",
TCPInputConfig{
ListenAddress: "10.0.0.1:0",
},
false,
},
{
"default-fixed-address-port",
TCPInputConfig{
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-valid-default",
TCPInputConfig{
MaxBufferSize: 0,
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-valid-min",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-negative",
TCPInputConfig{
MaxBufferSize: -1,
ListenAddress: "10.0.0.1:9000",
},
true,
},
{
"tls-disabled-with-keypair-set",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
TLS: TLSConfig{
Enable: false,
Certificate: "/tmp/cert",
PrivateKey: "/tmp/key",
},
},
false,
},
{
"tls-enabled-with-no-such-file-error",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
TLS: TLSConfig{
Enable: true,
Certificate: "/tmp/cert/missing",
PrivateKey: "/tmp/key/missing",
},
},
true,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = tc.inputRecord.ListenAddress
cfg.MaxBufferSize = tc.inputRecord.MaxBufferSize
cfg.TLS = tc.inputRecord.TLS
_, err := cfg.Build(testutil.NewBuildContext(t))
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}

func TestTcpInput(t *testing.T) {
t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
Expand Down