From a298794aa3646d5b641443a69091509e62589f26 Mon Sep 17 00:00:00 2001 From: Joseph Sirianni Date: Mon, 1 Mar 2021 15:40:47 -0500 Subject: [PATCH 1/2] Added optional parameter to operator --- CHANGELOG.md | 1 + docs/operators/tcp_input.md | 1 + operator/builtin/input/tcp/tcp.go | 56 +++++++++++---- operator/builtin/input/tcp/tcp_test.go | 95 ++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 605d44c3..35b8f95c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Added TLS support to `tcp_input` operator +- Added optional `max_buffer_size` parameter to `tcp_input` operator ## [0.15.0] - 2020-02-25 diff --git a/docs/operators/tcp_input.md b/docs/operators/tcp_input.md index 07a62800..aba939c2 100644 --- a/docs/operators/tcp_input.md +++ b/docs/operators/tcp_input.md @@ -8,6 +8,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op | --- | --- | --- | | `id` | `tcp_input` | A unique identifier for the operator | | `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input | | `listen_address` | required | A listen address of the form `:` | | `tls` | | An optional `TLS` configuration (see the TLS configuration section) | | `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry | diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 25f2a367..499a6356 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -29,6 +29,16 @@ import ( "go.uber.org/zap" ) +const ( + // minBufferSize is the initial size used for buffering + // TCP input + minBufferSize = 64*1024 + + // DefaultMaxBufferSize is the max buffer sized used + // if MaxBufferSize is not set + DefaultMaxBufferSize = 1024*1024 +) + func init() { operator.Register("tcp_input", func() operator.Builder { return NewTCPInputConfig("") }) } @@ -44,6 +54,7 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig { type TCPInputConfig struct { helper.InputConfig `yaml:",inline"` + MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"` ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"` } @@ -67,6 +78,16 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + // If MaxBufferSize not set, set sane default in order to remain + // backwards compatible with existing plugins and configurations + if c.MaxBufferSize == 0 { + c.MaxBufferSize = DefaultMaxBufferSize + } + + if c.MaxBufferSize < minBufferSize { + return nil, fmt.Errorf("invalid value for parameter 'max_buffer_size', must be equal to or greater than %d bytes", minBufferSize) + } + if c.ListenAddress == "" { return nil, fmt.Errorf("missing required parameter 'listen_address'") } @@ -96,6 +117,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato tcpInput := &TCPInput{ InputOperator: inputOperator, address: c.ListenAddress, + maxBufferSize: int(c.MaxBufferSize), tlsEnable: c.TLS.Enable, tlsKeyPair: cert, } @@ -105,9 +127,10 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato // TCPInput is an operator that listens for log entries over tcp. type TCPInput struct { helper.InputOperator - address string - tlsEnable bool - tlsKeyPair tls.Certificate + address string + maxBufferSize int + tlsEnable bool + tlsKeyPair tls.Certificate listener net.Listener cancel context.CancelFunc @@ -197,18 +220,21 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c defer t.wg.Done() defer cancel() - scanner := bufio.NewScanner(conn) - for scanner.Scan() { - entry, err := t.NewEntry(scanner.Text()) - if err != nil { - t.Errorw("Failed to create entry", zap.Error(err)) - continue - } - t.Write(ctx, entry) - } - if err := scanner.Err(); err != nil { - t.Errorw("Scanner error", zap.Error(err)) - } + // Initial buffer size is 64k + buf := make([]byte, 0, 64 * 1024) + scanner := bufio.NewScanner(conn) + scanner.Buffer(buf, t.maxBufferSize * 1024) + for scanner.Scan() { + entry, err := t.NewEntry(scanner.Text()) + if err != nil { + t.Errorw("Failed to create entry", zap.Error(err)) + continue + } + t.Write(ctx, entry) + } + if err := scanner.Err(); err != nil { + t.Errorw("Scanner error", zap.Error(err)) + } }() } diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 6b762d08..96dcdabc 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -194,6 +194,101 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) { } } +func TestBuild(t *testing.T) { + cases := []struct { + name string + inputRecord TCPInputConfig + expectErr bool + }{ + { + "default-auto-address", + TCPInputConfig{ + ListenAddress: ":0", + }, + false, + }, + { + "default-fixed-address", + TCPInputConfig{ + ListenAddress: "10.0.0.1:0", + }, + false, + }, + { + "default-fixed-address-port", + TCPInputConfig{ + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-valid-default", + TCPInputConfig{ + MaxBufferSize: 0, + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-valid-min", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-negative", + TCPInputConfig{ + MaxBufferSize: -1, + ListenAddress: "10.0.0.1:9000", + }, + true, + }, + { + "tls-disabled-with-keypair-set", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + TLS: TLSConfig{ + Enable: false, + Certificate: "/tmp/cert", + PrivateKey: "/tmp/key", + }, + }, + false, + }, + { + "tls-enabled-with-no-such-file-error", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + TLS: TLSConfig{ + Enable: true, + Certificate: "/tmp/cert/missing", + PrivateKey: "/tmp/key/missing", + }, + }, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := NewTCPInputConfig("test_id") + cfg.ListenAddress = tc.inputRecord.ListenAddress + cfg.MaxBufferSize = tc.inputRecord.MaxBufferSize + cfg.TLS = tc.inputRecord.TLS + _, err := cfg.Build(testutil.NewBuildContext(t)) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + func TestTcpInput(t *testing.T) { t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"})) t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"})) From fe1d7818f1f8c371c3a3408ee772b577965c0dd3 Mon Sep 17 00:00:00 2001 From: Joseph Sirianni Date: Mon, 1 Mar 2021 15:40:47 -0500 Subject: [PATCH 2/2] Added optional max_buffer_size parameter to tcp_input operator --- CHANGELOG.md | 1 + docs/operators/tcp_input.md | 1 + operator/builtin/input/tcp/tcp.go | 56 +++++++++++---- operator/builtin/input/tcp/tcp_test.go | 95 ++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 605d44c3..35b8f95c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Added TLS support to `tcp_input` operator +- Added optional `max_buffer_size` parameter to `tcp_input` operator ## [0.15.0] - 2020-02-25 diff --git a/docs/operators/tcp_input.md b/docs/operators/tcp_input.md index 07a62800..aba939c2 100644 --- a/docs/operators/tcp_input.md +++ b/docs/operators/tcp_input.md @@ -8,6 +8,7 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op | --- | --- | --- | | `id` | `tcp_input` | A unique identifier for the operator | | `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `max_buffer_size` | `1024kib` | Maximum size of buffer that may be allocated while reading TCP input | | `listen_address` | required | A listen address of the form `:` | | `tls` | | An optional `TLS` configuration (see the TLS configuration section) | | `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry | diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 25f2a367..499a6356 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -29,6 +29,16 @@ import ( "go.uber.org/zap" ) +const ( + // minBufferSize is the initial size used for buffering + // TCP input + minBufferSize = 64*1024 + + // DefaultMaxBufferSize is the max buffer sized used + // if MaxBufferSize is not set + DefaultMaxBufferSize = 1024*1024 +) + func init() { operator.Register("tcp_input", func() operator.Builder { return NewTCPInputConfig("") }) } @@ -44,6 +54,7 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig { type TCPInputConfig struct { helper.InputConfig `yaml:",inline"` + MaxBufferSize helper.ByteSize `json:"max_buffer_size,omitempty" yaml:"max_buffer_size,omitempty"` ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"` TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"` } @@ -67,6 +78,16 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + // If MaxBufferSize not set, set sane default in order to remain + // backwards compatible with existing plugins and configurations + if c.MaxBufferSize == 0 { + c.MaxBufferSize = DefaultMaxBufferSize + } + + if c.MaxBufferSize < minBufferSize { + return nil, fmt.Errorf("invalid value for parameter 'max_buffer_size', must be equal to or greater than %d bytes", minBufferSize) + } + if c.ListenAddress == "" { return nil, fmt.Errorf("missing required parameter 'listen_address'") } @@ -96,6 +117,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato tcpInput := &TCPInput{ InputOperator: inputOperator, address: c.ListenAddress, + maxBufferSize: int(c.MaxBufferSize), tlsEnable: c.TLS.Enable, tlsKeyPair: cert, } @@ -105,9 +127,10 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato // TCPInput is an operator that listens for log entries over tcp. type TCPInput struct { helper.InputOperator - address string - tlsEnable bool - tlsKeyPair tls.Certificate + address string + maxBufferSize int + tlsEnable bool + tlsKeyPair tls.Certificate listener net.Listener cancel context.CancelFunc @@ -197,18 +220,21 @@ func (t *TCPInput) goHandleMessages(ctx context.Context, conn net.Conn, cancel c defer t.wg.Done() defer cancel() - scanner := bufio.NewScanner(conn) - for scanner.Scan() { - entry, err := t.NewEntry(scanner.Text()) - if err != nil { - t.Errorw("Failed to create entry", zap.Error(err)) - continue - } - t.Write(ctx, entry) - } - if err := scanner.Err(); err != nil { - t.Errorw("Scanner error", zap.Error(err)) - } + // Initial buffer size is 64k + buf := make([]byte, 0, 64 * 1024) + scanner := bufio.NewScanner(conn) + scanner.Buffer(buf, t.maxBufferSize * 1024) + for scanner.Scan() { + entry, err := t.NewEntry(scanner.Text()) + if err != nil { + t.Errorw("Failed to create entry", zap.Error(err)) + continue + } + t.Write(ctx, entry) + } + if err := scanner.Err(); err != nil { + t.Errorw("Scanner error", zap.Error(err)) + } }() } diff --git a/operator/builtin/input/tcp/tcp_test.go b/operator/builtin/input/tcp/tcp_test.go index 6b762d08..96dcdabc 100644 --- a/operator/builtin/input/tcp/tcp_test.go +++ b/operator/builtin/input/tcp/tcp_test.go @@ -194,6 +194,101 @@ func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) { } } +func TestBuild(t *testing.T) { + cases := []struct { + name string + inputRecord TCPInputConfig + expectErr bool + }{ + { + "default-auto-address", + TCPInputConfig{ + ListenAddress: ":0", + }, + false, + }, + { + "default-fixed-address", + TCPInputConfig{ + ListenAddress: "10.0.0.1:0", + }, + false, + }, + { + "default-fixed-address-port", + TCPInputConfig{ + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-valid-default", + TCPInputConfig{ + MaxBufferSize: 0, + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-valid-min", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + }, + false, + }, + { + "buffer-size-negative", + TCPInputConfig{ + MaxBufferSize: -1, + ListenAddress: "10.0.0.1:9000", + }, + true, + }, + { + "tls-disabled-with-keypair-set", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + TLS: TLSConfig{ + Enable: false, + Certificate: "/tmp/cert", + PrivateKey: "/tmp/key", + }, + }, + false, + }, + { + "tls-enabled-with-no-such-file-error", + TCPInputConfig{ + MaxBufferSize: 65536, + ListenAddress: "10.0.0.1:9000", + TLS: TLSConfig{ + Enable: true, + Certificate: "/tmp/cert/missing", + PrivateKey: "/tmp/key/missing", + }, + }, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := NewTCPInputConfig("test_id") + cfg.ListenAddress = tc.inputRecord.ListenAddress + cfg.MaxBufferSize = tc.inputRecord.MaxBufferSize + cfg.TLS = tc.inputRecord.TLS + _, err := cfg.Build(testutil.NewBuildContext(t)) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + func TestTcpInput(t *testing.T) { t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"})) t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))