Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Operator tcp input scanner buffer #35

Merged
merged 3 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Added TLS support to `tcp_input` operator
- Added optional `max_buffer_size` parameter to `tcp_input` operator

## [0.15.0] - 2020-02-25

Expand Down
1 change: 1 addition & 0 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| --- | --- | --- |
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
Expand Down
56 changes: 41 additions & 15 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import (
"go.uber.org/zap"
)

const (
// minBufferSize is the initial size used for buffering
// TCP input
minBufferSize = 64*1024

// DefaultMaxBufferSize is the max buffer sized used
// if MaxBufferSize is not set
DefaultMaxBufferSize = 1024*1024
)

func init() {
operator.Register("tcp_input", func() operator.Builder { return NewTCPInputConfig("") })
}
Expand All @@ -44,6 +54,7 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig {
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`

MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
}
Expand All @@ -67,6 +78,16 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, err
}

// If MaxBufferSize not set, set sane default in order to remain
// backwards compatible with existing plugins and configurations
if c.MaxBufferSize == 0 {
c.MaxBufferSize = DefaultMaxBufferSize
}

if c.MaxBufferSize < minBufferSize {
return nil, fmt.Errorf("invalid value for parameter 'max_buffer_size', must be equal to or greater than %d bytes", minBufferSize)
}

if c.ListenAddress == "" {
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}
Expand Down Expand Up @@ -96,6 +117,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
tcpInput := &TCPInput{
InputOperator: inputOperator,
address: c.ListenAddress,
maxBufferSize: int(c.MaxBufferSize),
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
}
Expand All @@ -105,9 +127,10 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
// TCPInput is an operator that listens for log entries over tcp.
type TCPInput struct {
helper.InputOperator
address string
tlsEnable bool
tlsKeyPair tls.Certificate
address string
maxBufferSize int
tlsEnable bool
tlsKeyPair tls.Certificate

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -197,18 +220,21 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c
defer t.wg.Done()
defer cancel()

scanner := bufio.NewScanner(conn)
for scanner.Scan() {
entry, err := t.NewEntry(scanner.Text())
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
continue
}
t.Write(ctx, entry)
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))
}
// Initial buffer size is 64k
buf := make([]byte, 0, 64 * 1024)
scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.maxBufferSize * 1024)
for scanner.Scan() {
entry, err := t.NewEntry(scanner.Text())
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
continue
}
t.Write(ctx, entry)
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))
}
}()
}

Expand Down
95 changes: 95 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,101 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func TestBuild(t *testing.T) {
cases := []struct {
name string
inputRecord TCPInputConfig
expectErr bool
}{
{
"default-auto-address",
TCPInputConfig{
ListenAddress: ":0",
},
false,
},
{
"default-fixed-address",
TCPInputConfig{
ListenAddress: "10.0.0.1:0",
},
false,
},
{
"default-fixed-address-port",
TCPInputConfig{
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-valid-default",
TCPInputConfig{
MaxBufferSize: 0,
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-valid-min",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
},
false,
},
{
"buffer-size-negative",
TCPInputConfig{
MaxBufferSize: -1,
ListenAddress: "10.0.0.1:9000",
},
true,
},
{
"tls-disabled-with-keypair-set",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
TLS: TLSConfig{
Enable: false,
Certificate: "/tmp/cert",
PrivateKey: "/tmp/key",
},
},
false,
},
{
"tls-enabled-with-no-such-file-error",
TCPInputConfig{
MaxBufferSize: 65536,
ListenAddress: "10.0.0.1:9000",
TLS: TLSConfig{
Enable: true,
Certificate: "/tmp/cert/missing",
PrivateKey: "/tmp/key/missing",
},
},
true,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = tc.inputRecord.ListenAddress
cfg.MaxBufferSize = tc.inputRecord.MaxBufferSize
cfg.TLS = tc.inputRecord.TLS
_, err := cfg.Build(testutil.NewBuildContext(t))
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
})
}
}

func TestTcpInput(t *testing.T) {
t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
Expand Down