Skip to content

Commit

Permalink
Merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jul 28, 2020
2 parents 402f72c + e137723 commit e77d3b8
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
- New parameter `start_at` for journald input
- Configurable `timeout` parameter for the `k8s_metadata_decorator`

## [0.9.4] - 2020-07-21
- Allow omitting `id`, defaulting to plugin type if unique within namespace
Expand Down
15 changes: 8 additions & 7 deletions docs/operators/k8s_metadata_decorator.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ The `k8s_metadata_decorator` operator adds labels and annotations to the entry u

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `k8s_metadata_decorator` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `namespace_field` | `namespace` | A [field](/docs/types/field.md) that contains the k8s namespace associated with the log entry |
| `pod_name_field` | `pod_name` | A [field](/docs/types/field.md) that contains the k8s pod name associated with the log entry |
| `cache_ttl` | 10m | A [duration](/docs/types/duration.md) indicating the time it takes for a cached entry to expire |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `k8s_metadata_decorator` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `namespace_field` | `namespace` | A [field](/docs/types/field.md) that contains the k8s namespace associated with the log entry |
| `pod_name_field` | `pod_name` | A [field](/docs/types/field.md) that contains the k8s pod name associated with the log entry |
| `cache_ttl` | 10m | A [duration](/docs/types/duration.md) indicating the time it takes for a cached entry to expire |
| `timeout` | 10s | A [duration](/docs/types/duration.md) indicating how long to wait for the API to respond before timing out |

### Example Configurations

Expand Down
24 changes: 12 additions & 12 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func (f *InputOperator) Start() error {
f.wg.Add(1)
go func() {
defer f.wg.Done()
defer f.syncKnownFiles()
defer f.drainMessages()

globTicker := time.NewTicker(f.PollInterval)
defer globTicker.Stop()
Expand All @@ -242,9 +244,6 @@ func (f *InputOperator) Start() error {
for {
select {
case <-ctx.Done():
f.drainMessages()
f.readerWg.Wait()
f.syncKnownFiles()
return
case <-globTicker.C:
matches := getMatches(f.Include, f.Exclude)
Expand All @@ -256,8 +255,10 @@ func (f *InputOperator) Start() error {
}
f.syncKnownFiles()
firstCheck = false
case message := <-f.fileUpdateChan:
f.updateFile(message)
case message, ok := <-f.fileUpdateChan:
if ok {
f.updateFile(message)
}
}
}
}()
Expand All @@ -269,7 +270,7 @@ func (f *InputOperator) Start() error {
func (f *InputOperator) Stop() error {
f.cancel()
f.wg.Wait()
f.syncKnownFiles()
f.fileUpdateChan = make(chan fileUpdateMessage)
f.knownFiles = nil
return nil
}
Expand Down Expand Up @@ -314,6 +315,7 @@ func (f *InputOperator) checkFile(ctx context.Context, path string, firstCheck b
go func(ctx context.Context, path string, offset, lastSeenSize int64) {
defer f.readerWg.Done()
messenger := f.newFileUpdateMessenger(path)
defer messenger.FinishedReading()
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputOperator, f.MaxLogSize, f.encoding)
if err != nil {
f.Warnw("Failed to read log file", zap.Error(err))
Expand Down Expand Up @@ -386,19 +388,17 @@ func (f *InputOperator) updateFile(message fileUpdateMessage) {
}

func (f *InputOperator) drainMessages() {
done := make(chan struct{})
go func() {
f.readerWg.Wait()
close(done)
close(f.fileUpdateChan)
}()

for {
select {
case <-done:
message, ok := <-f.fileUpdateChan
if !ok {
return
case message := <-f.fileUpdateChan:
f.updateFile(message)
}
f.updateFile(message)
}
}

Expand Down
33 changes: 33 additions & 0 deletions operator/builtin/input/file/positional_scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package file

import (
"bufio"
"io"
)

type PositionalScanner struct {
pos int64
*bufio.Scanner
}

func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner {
ps := &PositionalScanner{
pos: startOffset,
Scanner: bufio.NewScanner(r),
}

buf := make([]byte, 0, 16384)
ps.Scanner.Buffer(buf, maxLogSize)

scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
ps.pos += int64(advance)
return
}
ps.Scanner.Split(scanFunc)
return ps
}

func (ps *PositionalScanner) Pos() int64 {
return ps.pos
}
98 changes: 43 additions & 55 deletions operator/builtin/input/file/read_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/observiq/carbon/operator/helper"
"go.uber.org/zap"
"golang.org/x/text/encoding"
"golang.org/x/text/transform"
)

// ReadToEnd will read entries from a file and send them to the outputs of an input operator
Expand All @@ -28,8 +29,6 @@ func ReadToEnd(
maxLogSize int,
encoding encoding.Encoding,
) error {
defer messenger.FinishedReading()

select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -59,30 +58,34 @@ func ReadToEnd(
return fmt.Errorf("seek file: %s", err)
}

scanner := bufio.NewScanner(file)
buf := make([]byte, 0, 16384)
scanner.Buffer(buf, maxLogSize)
pos := startOffset
scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
pos += int64(advance)
return
}
scanner.Split(scanFunc)

decoder := encoding.NewDecoder()
scanner := NewPositionalScanner(file, maxLogSize, startOffset, splitFunc)

// Make a large, reusable buffer for transforming
decoder := encoding.NewDecoder()
decodeBuffer := make([]byte, 16384)

// If we're not at the end of the file, and we haven't
// advanced since last cycle, read the rest of the file as an entry
defer func() {
if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() {
readRemaining(ctx, file, pos, stat.Size(), messenger, inputOperator, filePathField, fileNameField, decoder, decodeBuffer)
emit := func(msgBuf []byte) {
decoder.Reset()
var nDst int
for {
nDst, _, err = decoder.Transform(decodeBuffer, msgBuf, true)
if err != nil && err == transform.ErrShortDst {
decodeBuffer = make([]byte, len(decodeBuffer)*2)
continue
} else if err != nil {
inputOperator.Errorw("failed to transform encoding", zap.Error(err))
return
}
break
}
}()

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, path)
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
}

// Iterate over the tokenized file, emitting entries as we go
for {
select {
case <-ctx.Done():
Expand All @@ -94,47 +97,32 @@ func ReadToEnd(
if !ok {
if err := scanner.Err(); err == bufio.ErrTooLong {
return errors.NewError("log entry too large", "increase max_log_size or ensure that multiline regex patterns terminate")
} else if err != nil {
return errors.Wrap(err, "scanner error")
}
return scanner.Err()
break
}

decoder.Reset()
nDst, _, err := decoder.Transform(decodeBuffer, scanner.Bytes(), true)
if err != nil {
return err
}

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, path)
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
messenger.SetOffset(pos)
emit(scanner.Bytes())
messenger.SetOffset(scanner.Pos())
}
}

// readRemaining will read the remaining characters in a file as a log entry.
func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputOperator helper.InputOperator, filePathField, fileNameField entry.Field, encoder *encoding.Decoder, decodeBuffer []byte) {
_, err := file.Seek(filePos, 0)
if err != nil {
inputOperator.Errorf("failed to seek to read last log entry")
return
}
// If we're not at the end of the file, and we haven't
// advanced since last cycle, read the rest of the file as an entry
if scanner.Pos() < stat.Size() && scanner.Pos() == startOffset && lastSeenFileSize == stat.Size() {
_, err := file.Seek(scanner.Pos(), 0)
if err != nil {
return errors.Wrap(err, "seeking for trailing entry")
}

msgBuf := make([]byte, fileSize-filePos)
n, err := file.Read(msgBuf)
if err != nil {
inputOperator.Errorf("failed to read trailing log")
return
}
encoder.Reset()
nDst, _, err := encoder.Transform(decodeBuffer, msgBuf, true)
if err != nil {
inputOperator.Errorw("failed to decode trailing log", zap.Error(err))
msgBuf := make([]byte, stat.Size()-scanner.Pos())
n, err := file.Read(msgBuf)
if err != nil {
return errors.Wrap(err, "reading trailing entry")
}
emit(msgBuf[:n])
messenger.SetOffset(scanner.Pos() + int64(n))
}

e := inputOperator.NewEntry(string(decodeBuffer[:nDst]))
e.Set(filePathField, file.Name())
e.Set(fileNameField, filepath.Base(file.Name()))
inputOperator.Write(ctx, e)
messenger.SetOffset(filePos + int64(n))
return nil
}
8 changes: 6 additions & 2 deletions operator/builtin/input/tcp_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package input

import (
"fmt"
"math/rand"
"net"
"testing"
"time"
Expand Down Expand Up @@ -30,8 +32,10 @@ func TestTCPInput(t *testing.T) {
}

t.Run("Simple", func(t *testing.T) {
port := rand.Int()%16000 + 49152
address := fmt.Sprintf("127.0.0.1:%d", port)
cfg := basicTCPInputConfig()
cfg.ListenAddress = "127.0.0.1:64001"
cfg.ListenAddress = address

buildContext := testutil.NewBuildContext(t)
newOperator, err := cfg.Build(buildContext)
Expand All @@ -50,7 +54,7 @@ func TestTCPInput(t *testing.T) {
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := net.Dial("tcp", "127.0.0.1:64001")
conn, err := net.Dial("tcp", address)
require.NoError(t, err)
defer conn.Close()

Expand Down
8 changes: 6 additions & 2 deletions operator/builtin/input/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package input

import (
"fmt"
"math/rand"
"net"
"testing"
"time"
Expand Down Expand Up @@ -30,8 +32,10 @@ func TestUDPInput(t *testing.T) {
}

t.Run("Simple", func(t *testing.T) {
port := rand.Int()%16000 + 49152
address := fmt.Sprintf("127.0.0.1:%d", port)
cfg := basicUDPInputConfig()
cfg.ListenAddress = "127.0.0.1:63001"
cfg.ListenAddress = address

buildContext := testutil.NewBuildContext(t)
newOperator, err := cfg.Build(buildContext)
Expand All @@ -52,7 +56,7 @@ func TestUDPInput(t *testing.T) {
require.NoError(t, err)
defer udpInput.Stop()

conn, err := net.Dial("udp", "127.0.0.1:63001")
conn, err := net.Dial("udp", address)
require.NoError(t, err)
defer conn.Close()

Expand Down
10 changes: 7 additions & 3 deletions operator/builtin/transformer/k8s_metadata_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewK8smetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfi
PodNameField: entry.NewRecordField("pod_name"),
NamespaceField: entry.NewRecordField("namespace"),
CacheTTL: operator.Duration{Duration: 10 * time.Minute},
Timeout: operator.Duration{Duration: 10 * time.Second},
}
}

Expand All @@ -33,6 +34,7 @@ type K8sMetadataDecoratorConfig struct {
PodNameField entry.Field `json:"pod_name_field,omitempty" yaml:"pod_name_field,omitempty"`
NamespaceField entry.Field `json:"namespace_field,omitempty" yaml:"namespace_field,omitempty"`
CacheTTL operator.Duration `json:"cache_ttl,omitempty" yaml:"cache_ttl,omitempty"`
Timeout operator.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
}

// Build will build a k8s_metadata_decorator operator from the supplied configuration
Expand All @@ -47,6 +49,7 @@ func (c K8sMetadataDecoratorConfig) Build(context operator.BuildContext) (operat
podNameField: c.PodNameField,
namespaceField: c.NamespaceField,
cacheTTL: c.CacheTTL.Raw(),
timeout: c.Timeout.Raw(),
}, nil
}

Expand All @@ -61,6 +64,7 @@ type K8sMetadataDecorator struct {
namespaceCache MetadataCache
podCache MetadataCache
cacheTTL time.Duration
timeout time.Duration
}

// MetadataCacheEntry is an entry in the metadata cache
Expand Down Expand Up @@ -105,7 +109,7 @@ func (k *K8sMetadataDecorator) Start() error {
}

// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), k.timeout)
defer cancel()
namespaceList, err := k.client.Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -180,7 +184,7 @@ func (k *K8sMetadataDecorator) getPodMetadata(ctx context.Context, namespace, po
}

func (k *K8sMetadataDecorator) refreshNamespaceMetadata(ctx context.Context, namespace string) (MetadataCacheEntry, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, k.timeout)
defer cancel()

// Query the API
Expand All @@ -206,7 +210,7 @@ func (k *K8sMetadataDecorator) refreshNamespaceMetadata(ctx context.Context, nam
func (k *K8sMetadataDecorator) refreshPodMetadata(ctx context.Context, namespace, podName string) (MetadataCacheEntry, error) {
key := namespace + ":" + podName

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, k.timeout)
defer cancel()

// Query the API
Expand Down
Loading

0 comments on commit e77d3b8

Please sign in to comment.