diff --git a/CHANGELOG.md b/CHANGELOG.md index 11fbf28d1..512f1f84b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added - New parameter `start_at` for journald input +- Configurable `timeout` parameter for the `k8s_metadata_decorator` ## [0.9.4] - 2020-07-21 - Allow omitting `id`, defaulting to plugin type if unique within namespace diff --git a/docs/operators/k8s_metadata_decorator.md b/docs/operators/k8s_metadata_decorator.md index da562a2ca..95f006c0b 100644 --- a/docs/operators/k8s_metadata_decorator.md +++ b/docs/operators/k8s_metadata_decorator.md @@ -4,13 +4,14 @@ The `k8s_metadata_decorator` operator adds labels and annotations to the entry u ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `k8s_metadata_decorator` | A unique identifier for the operator | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | -| `namespace_field` | `namespace` | A [field](/docs/types/field.md) that contains the k8s namespace associated with the log entry | -| `pod_name_field` | `pod_name` | A [field](/docs/types/field.md) that contains the k8s pod name associated with the log entry | -| `cache_ttl` | 10m | A [duration](/docs/types/duration.md) indicating the time it takes for a cached entry to expire | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `k8s_metadata_decorator` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `namespace_field` | `namespace` | A [field](/docs/types/field.md) that contains the k8s namespace associated with the log entry | +| `pod_name_field` | `pod_name` | A [field](/docs/types/field.md) that contains the k8s pod name associated with the log entry | +| `cache_ttl` | 10m | A [duration](/docs/types/duration.md) indicating the time it takes for a cached entry to expire | +| `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out | ### Example Configurations diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index d04c42780..90390e254 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -230,6 +230,8 @@ func (f *InputOperator) Start() error { f.wg.Add(1) go func() { defer f.wg.Done() + defer f.syncKnownFiles() + defer f.drainMessages() globTicker := time.NewTicker(f.PollInterval) defer globTicker.Stop() @@ -242,9 +244,6 @@ func (f *InputOperator) Start() error { for { select { case <-ctx.Done(): - f.drainMessages() - f.readerWg.Wait() - f.syncKnownFiles() return case <-globTicker.C: matches := getMatches(f.Include, f.Exclude) @@ -256,8 +255,10 @@ func (f *InputOperator) Start() error { } f.syncKnownFiles() firstCheck = false - case message := <-f.fileUpdateChan: - f.updateFile(message) + case message, ok := <-f.fileUpdateChan: + if ok { + f.updateFile(message) + } } } }() @@ -269,7 +270,7 @@ func (f *InputOperator) Start() error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() - f.syncKnownFiles() + f.fileUpdateChan = make(chan fileUpdateMessage) f.knownFiles = nil return nil } @@ -314,6 +315,7 @@ func (f *InputOperator) checkFile(ctx context.Context, path string, firstCheck b go func(ctx context.Context, path string, offset, lastSeenSize int64) { defer f.readerWg.Done() messenger := f.newFileUpdateMessenger(path) + defer messenger.FinishedReading() err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputOperator, f.MaxLogSize, f.encoding) if err != nil { f.Warnw("Failed to read log file", zap.Error(err)) @@ -386,19 +388,17 @@ func (f *InputOperator) updateFile(message fileUpdateMessage) { } func (f *InputOperator) drainMessages() { - done := make(chan struct{}) go func() { f.readerWg.Wait() - close(done) + close(f.fileUpdateChan) }() for { - select { - case <-done: + message, ok := <-f.fileUpdateChan + if !ok { return - case message := <-f.fileUpdateChan: - f.updateFile(message) } + f.updateFile(message) } } diff --git a/operator/builtin/input/file/positional_scanner.go b/operator/builtin/input/file/positional_scanner.go new file mode 100644 index 000000000..0e30144ad --- /dev/null +++ b/operator/builtin/input/file/positional_scanner.go @@ -0,0 +1,33 @@ +package file + +import ( + "bufio" + "io" +) + +type PositionalScanner struct { + pos int64 + *bufio.Scanner +} + +func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner { + ps := &PositionalScanner{ + pos: startOffset, + Scanner: bufio.NewScanner(r), + } + + buf := make([]byte, 0, 16384) + ps.Scanner.Buffer(buf, maxLogSize) + + scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + ps.pos += int64(advance) + return + } + ps.Scanner.Split(scanFunc) + return ps +} + +func (ps *PositionalScanner) Pos() int64 { + return ps.pos +} diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index 0f2bf76f9..7b8a8f824 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -12,6 +12,7 @@ import ( "github.com/observiq/carbon/operator/helper" "go.uber.org/zap" "golang.org/x/text/encoding" + "golang.org/x/text/transform" ) // ReadToEnd will read entries from a file and send them to the outputs of an input operator @@ -28,8 +29,6 @@ func ReadToEnd( maxLogSize int, encoding encoding.Encoding, ) error { - defer messenger.FinishedReading() - select { case <-ctx.Done(): return nil @@ -59,30 +58,34 @@ func ReadToEnd( return fmt.Errorf("seek file: %s", err) } - scanner := bufio.NewScanner(file) - buf := make([]byte, 0, 16384) - scanner.Buffer(buf, maxLogSize) - pos := startOffset - scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - pos += int64(advance) - return - } - scanner.Split(scanFunc) - - decoder := encoding.NewDecoder() + scanner := NewPositionalScanner(file, maxLogSize, startOffset, splitFunc) // Make a large, reusable buffer for transforming + decoder := encoding.NewDecoder() decodeBuffer := make([]byte, 16384) - // If we're not at the end of the file, and we haven't - // advanced since last cycle, read the rest of the file as an entry - defer func() { - if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { - readRemaining(ctx, file, pos, stat.Size(), messenger, inputOperator, filePathField, fileNameField, decoder, decodeBuffer) + emit := func(msgBuf []byte) { + decoder.Reset() + var nDst int + for { + nDst, _, err = decoder.Transform(decodeBuffer, msgBuf, true) + if err != nil && err == transform.ErrShortDst { + decodeBuffer = make([]byte, len(decodeBuffer)*2) + continue + } else if err != nil { + inputOperator.Errorw("failed to transform encoding", zap.Error(err)) + return + } + break } - }() + e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) + e.Set(filePathField, path) + e.Set(fileNameField, filepath.Base(file.Name())) + inputOperator.Write(ctx, e) + } + + // Iterate over the tokenized file, emitting entries as we go for { select { case <-ctx.Done(): @@ -94,47 +97,32 @@ func ReadToEnd( if !ok { if err := scanner.Err(); err == bufio.ErrTooLong { return errors.NewError("log entry too large", "increase max_log_size or ensure that multiline regex patterns terminate") + } else if err != nil { + return errors.Wrap(err, "scanner error") } - return scanner.Err() + break } - decoder.Reset() - nDst, _, err := decoder.Transform(decodeBuffer, scanner.Bytes(), true) - if err != nil { - return err - } - - e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) - e.Set(filePathField, path) - e.Set(fileNameField, filepath.Base(file.Name())) - inputOperator.Write(ctx, e) - messenger.SetOffset(pos) + emit(scanner.Bytes()) + messenger.SetOffset(scanner.Pos()) } -} -// readRemaining will read the remaining characters in a file as a log entry. -func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputOperator helper.InputOperator, filePathField, fileNameField entry.Field, encoder *encoding.Decoder, decodeBuffer []byte) { - _, err := file.Seek(filePos, 0) - if err != nil { - inputOperator.Errorf("failed to seek to read last log entry") - return - } + // If we're not at the end of the file, and we haven't + // advanced since last cycle, read the rest of the file as an entry + if scanner.Pos() < stat.Size() && scanner.Pos() == startOffset && lastSeenFileSize == stat.Size() { + _, err := file.Seek(scanner.Pos(), 0) + if err != nil { + return errors.Wrap(err, "seeking for trailing entry") + } - msgBuf := make([]byte, fileSize-filePos) - n, err := file.Read(msgBuf) - if err != nil { - inputOperator.Errorf("failed to read trailing log") - return - } - encoder.Reset() - nDst, _, err := encoder.Transform(decodeBuffer, msgBuf, true) - if err != nil { - inputOperator.Errorw("failed to decode trailing log", zap.Error(err)) + msgBuf := make([]byte, stat.Size()-scanner.Pos()) + n, err := file.Read(msgBuf) + if err != nil { + return errors.Wrap(err, "reading trailing entry") + } + emit(msgBuf[:n]) + messenger.SetOffset(scanner.Pos() + int64(n)) } - e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) - e.Set(filePathField, file.Name()) - e.Set(fileNameField, filepath.Base(file.Name())) - inputOperator.Write(ctx, e) - messenger.SetOffset(filePos + int64(n)) + return nil } diff --git a/operator/builtin/input/tcp_test.go b/operator/builtin/input/tcp_test.go index cd0cb0f08..697f9fb7c 100644 --- a/operator/builtin/input/tcp_test.go +++ b/operator/builtin/input/tcp_test.go @@ -1,6 +1,8 @@ package input import ( + "fmt" + "math/rand" "net" "testing" "time" @@ -30,8 +32,10 @@ func TestTCPInput(t *testing.T) { } t.Run("Simple", func(t *testing.T) { + port := rand.Int()%16000 + 49152 + address := fmt.Sprintf("127.0.0.1:%d", port) cfg := basicTCPInputConfig() - cfg.ListenAddress = "127.0.0.1:64001" + cfg.ListenAddress = address buildContext := testutil.NewBuildContext(t) newOperator, err := cfg.Build(buildContext) @@ -50,7 +54,7 @@ func TestTCPInput(t *testing.T) { require.NoError(t, err) defer tcpInput.Stop() - conn, err := net.Dial("tcp", "127.0.0.1:64001") + conn, err := net.Dial("tcp", address) require.NoError(t, err) defer conn.Close() diff --git a/operator/builtin/input/udp_test.go b/operator/builtin/input/udp_test.go index cf2af503e..affd226cc 100644 --- a/operator/builtin/input/udp_test.go +++ b/operator/builtin/input/udp_test.go @@ -1,6 +1,8 @@ package input import ( + "fmt" + "math/rand" "net" "testing" "time" @@ -30,8 +32,10 @@ func TestUDPInput(t *testing.T) { } t.Run("Simple", func(t *testing.T) { + port := rand.Int()%16000 + 49152 + address := fmt.Sprintf("127.0.0.1:%d", port) cfg := basicUDPInputConfig() - cfg.ListenAddress = "127.0.0.1:63001" + cfg.ListenAddress = address buildContext := testutil.NewBuildContext(t) newOperator, err := cfg.Build(buildContext) @@ -52,7 +56,7 @@ func TestUDPInput(t *testing.T) { require.NoError(t, err) defer udpInput.Stop() - conn, err := net.Dial("udp", "127.0.0.1:63001") + conn, err := net.Dial("udp", address) require.NoError(t, err) defer conn.Close() diff --git a/operator/builtin/transformer/k8s_metadata_decorator.go b/operator/builtin/transformer/k8s_metadata_decorator.go index 5a7fdf9aa..b65ee3a2c 100644 --- a/operator/builtin/transformer/k8s_metadata_decorator.go +++ b/operator/builtin/transformer/k8s_metadata_decorator.go @@ -24,6 +24,7 @@ func NewK8smetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi PodNameField: entry.NewRecordField("pod_name"), NamespaceField: entry.NewRecordField("namespace"), CacheTTL: operator.Duration{Duration: 10 * time.Minute}, + Timeout: operator.Duration{Duration: 10 * time.Second}, } } @@ -33,6 +34,7 @@ type K8sMetadataDecoratorConfig struct { PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"` NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"` CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"` + Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` } // Build will build a k8s_metadata_decorator operator from the supplied configuration @@ -47,6 +49,7 @@ func (c K8sMetadataDecoratorConfig) Build(context operator.BuildContext) (operat podNameField: c.PodNameField, namespaceField: c.NamespaceField, cacheTTL: c.CacheTTL.Raw(), + timeout: c.Timeout.Raw(), }, nil } @@ -61,6 +64,7 @@ type K8sMetadataDecorator struct { namespaceCache MetadataCache podCache MetadataCache cacheTTL time.Duration + timeout time.Duration } // MetadataCacheEntry is an entry in the metadata cache @@ -105,7 +109,7 @@ func (k *K8sMetadataDecorator) Start() error { } // Test connection - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), k.timeout) defer cancel() namespaceList, err := k.client.Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { @@ -180,7 +184,7 @@ func (k *K8sMetadataDecorator) getPodMetadata(ctx context.Context, namespace, po } func (k *K8sMetadataDecorator) refreshNamespaceMetadata(ctx context.Context, namespace string) (MetadataCacheEntry, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, k.timeout) defer cancel() // Query the API @@ -206,7 +210,7 @@ func (k *K8sMetadataDecorator) refreshNamespaceMetadata(ctx context.Context, nam func (k *K8sMetadataDecorator) refreshPodMetadata(ctx context.Context, namespace, podName string) (MetadataCacheEntry, error) { key := namespace + ":" + podName - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, k.timeout) defer cancel() // Query the API diff --git a/operator/builtin/transformer/k8s_metadata_decorator_test.go b/operator/builtin/transformer/k8s_metadata_decorator_test.go index 076720ced..1cae95ca8 100644 --- a/operator/builtin/transformer/k8s_metadata_decorator_test.go +++ b/operator/builtin/transformer/k8s_metadata_decorator_test.go @@ -53,6 +53,7 @@ func TestK8sMetadataDecoratorBuildDefault(t *testing.T) { podNameField: entry.NewRecordField("pod_name"), namespaceField: entry.NewRecordField("namespace"), cacheTTL: 10 * time.Minute, + timeout: 10 * time.Second, } operator, err := cfg.Build(testutil.NewBuildContext(t))