diff --git a/CHANGELOG.md b/CHANGELOG.md index 924972a6..798617ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Fixed +- TCP Input Operator panic ([Issue](https://github.com/open-telemetry/opentelemetry-log-collection/issues/129)) + ## [0.17.0] - 2020-04-07 ### Added diff --git a/go.mod b/go.mod index 03883b18..f48045b4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/antonmedv/expr v1.8.9 github.com/bmatcuk/doublestar/v3 v3.0.0 + github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.11 github.com/mitchellh/mapstructure v1.4.1 github.com/observiq/ctimefmt v1.0.0 diff --git a/go.sum b/go.sum index d40f4254..085fbf65 100644 --- a/go.sum +++ b/go.sum @@ -542,6 +542,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 077795ca..1c37da20 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/jpillora/backoff" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-log-collection/operator" @@ -109,6 +110,9 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato addAttributes: c.AddAttributes, encoding: encoding, splitFunc: splitFunc, + backoff: backoff.Backoff{ + Max: 3 * time.Second, + }, } if c.TLS != nil { @@ -132,6 +136,7 @@ type TCPInput struct { cancel context.CancelFunc wg sync.WaitGroup tls *tls.Config + backoff backoff.Backoff encoding helper.Encoding splitFunc bufio.SplitFunc @@ -186,8 +191,11 @@ func (t *TCPInput) goListen(ctx context.Context) { return default: t.Debugw("Listener accept error", zap.Error(err)) + time.Sleep(t.backoff.Duration()) + continue } } + t.backoff.Reset() t.Debugf("Received connection: %s", conn.RemoteAddr().String()) subctx, cancel := context.WithCancel(ctx)