Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/stanza: add backpropagation of number of processed entries #16452

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/drosiek-investigation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add backpropagation of number of emitted entries, so it can be handled properly in processors

# One or more tracking issues related to the change
issues: [15378]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
5 changes: 3 additions & 2 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ func (e *LogEmitter) OutChannel() <-chan []*entry.Entry {
}

// Process will emit an entry to the output channel
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) (int, error) {
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {
e.flush(ctx, oldBatch)
}

return nil
// always returns 1 as the entry is going to be emitted now or later
return 1, nil
}

// appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
Expand Down
11 changes: 7 additions & 4 deletions pkg/stanza/adapter/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestLogEmitter(t *testing.T) {
in := entry.New()

go func() {
require.NoError(t, emitter.Process(context.Background(), in))
_, err := emitter.Process(context.Background(), in)
require.NoError(t, err)
}()

select {
Expand Down Expand Up @@ -65,7 +66,8 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) {
go func() {
ctx := context.Background()
for _, e := range entries {
require.NoError(t, emitter.Process(ctx, e))
_, err := emitter.Process(ctx, e)
require.NoError(t, err)
}
}()

Expand Down Expand Up @@ -94,8 +96,9 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) {
entry := complexEntry()

go func() {
ctx := context.Background()
require.NoError(t, emitter.Process(ctx, entry))
processed, err := emitter.Process(context.Background(), entry)
require.NoError(t, err)
require.Equal(t, 1, processed)
}()

timeoutChan := time.After(timeout)
Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func BenchmarkEmitterToConsumer(b *testing.B) {
go func() {
ctx := context.Background()
for _, e := range entries {
_ = logsReceiver.emitter.Process(ctx, e)
_, _ = logsReceiver.emitter.Process(ctx, e)
}
}()

Expand Down Expand Up @@ -125,7 +125,9 @@ func TestEmitterToConsumer(t *testing.T) {
go func() {
ctx := context.Background()
for _, e := range entries {
require.NoError(t, logsReceiver.emitter.Process(ctx, e))
processed, err := logsReceiver.emitter.Process(ctx, e)
require.NoError(t, err)
require.Equal(t, processed, 1)
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/adapter/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (o *UnstartableOperator) Start(_ operator.Persister) error {
}

// Process will return nil
func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) error {
return nil
func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) (int, error) {
return 0, nil
}

type mockLogsRejecter struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/docs/operators/key_value_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The `key_value_parser` operator parses the string-type field selected by `parse_
| --- | --- | --- |
| `id` | `key_value_parser` | A unique identifier for the operator. |
| `delimiter` | `=` | The delimiter used for splitting a value into a key value pair. |
| `pair_delimiter` | | The delimiter used for seperating key value pairs, defaults to whitespace. |
| `pair_delimiter` | | The delimiter used for separating key value pairs, defaults to whitespace. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `parse_from` | `body` | A [field](../types/field.md) that indicates the field to be parsed into key value pairs. |
| `parse_to` | `attributes` | A [field](../types/field.md) that indicates the field to be parsed as into key value pairs. |
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/docs/operators/noop.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ operators:

#### Why is this necessary?

The last operator is always responsible for emitting logs from the receiver. In non-linear pipelines, it is sometimes necessary to explictly direct logs to the final operator. In many such cases, the final operator performs some work. However, if no more work is required, the `noop` operator can serve as a final operator.
The last operator is always responsible for emitting logs from the receiver. In non-linear pipelines, it is sometimes necessary to explicitly direct logs to the final operator. In many such cases, the final operator performs some work. However, if no more work is required, the `noop` operator can serve as a final operator.
4 changes: 2 additions & 2 deletions pkg/stanza/docs/operators/syslog_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TCP Configuration:
```yaml
- type: syslog_input
tcp:
listen_adress: "0.0.0.0:54526"
listen_address: "0.0.0.0:54526"
syslog:
protocol: rfc5424
```
Expand All @@ -36,7 +36,7 @@ UDP Configuration:
```yaml
- type: syslog_input
udp:
listen_adress: "0.0.0.0:54526"
listen_address: "0.0.0.0:54526"
syslog:
protocol: rfc3164
location: UTC
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/docs/operators/udp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Configuration:

```yaml
- type: udp_input
listen_adress: "0.0.0.0:54526"
listen_address: "0.0.0.0:54526"
```

Send a log:
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/docs/types/entry.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ Represented in `json` format, an entry may look like the following:
}
```

Throughout the documentation, `json` format is used to represent entries. Fields are typically ommitted unless relevant to the behavior being described.
Throughout the documentation, `json` format is used to represent entries. Fields are typically omitted unless relevant to the behavior being described.
4 changes: 2 additions & 2 deletions pkg/stanza/operator/helper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func (i *InputOperator) CanProcess() bool {
}

// Process will always return an error if called.
func (i *InputOperator) Process(ctx context.Context, entry *entry.Entry) error {
func (i *InputOperator) Process(ctx context.Context, entry *entry.Entry) (int, error) {
i.Errorw("Operator received an entry, but can not process", zap.Any("entry", entry))
return errors.NewError(
return 0, errors.NewError(
"Operator can not process logs.",
"Ensure that operator is not configured to receive logs from other operators",
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/operator/helper/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func TestInputOperatorProcess(t *testing.T) {
}
entry := entry.New()
ctx := context.Background()
err := input.Process(ctx, entry)
processed, err := input.Process(ctx, entry)
require.Error(t, err)
require.Equal(t, 0, processed)
require.Equal(t, err.Error(), "Operator can not process logs.")
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/stanza/operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,38 @@ type ParserOperator struct {
}

// ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators.
func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error {
// Returns number of entries being processed by pipeline and error eventually.
func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) (int, error) {
return p.ProcessWithCallback(ctx, entry, parse, nil)
}

func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error {
func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) (int, error) {
// Short circuit if the "if" condition does not match
skip, err := p.Skip(ctx, entry)
if err != nil {
return p.HandleEntryError(ctx, entry, err)
}
if skip {
p.Write(ctx, entry)
return nil
return p.Write(ctx, entry), nil
}

if err = p.ParseWith(ctx, entry, parse); err != nil {
return err
var processed int
if processed, err = p.ParseWith(ctx, entry, parse); err != nil {
return processed, err
}
if cb != nil {
err = cb(entry)
if err != nil {
return err
return 0, err
}
}

p.Write(ctx, entry)
return nil
return p.Write(ctx, entry), nil
}

// ParseWith will process an entry's field with a parser function.
func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error {
// Returns number of entries being processed by pipeline and error eventually.
func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) (int, error) {
value, ok := entry.Get(p.ParseFrom)
if !ok {
err := errors.NewError(
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars
if scopeNameParserErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(scopeNameParserErr, "scope_name parser"))
}
return nil
return 0, nil
}

// ParseFunction is function that parses a raw value.
Expand Down
32 changes: 21 additions & 11 deletions pkg/stanza/operator/helper/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ func TestParserMissingField(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.")
require.Equal(t, 0, processed)
}

func TestParserInvalidParseDrop(t *testing.T) {
Expand All @@ -145,9 +146,10 @@ func TestParserInvalidParseDrop(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
require.Equal(t, 0, processed)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

Expand All @@ -165,10 +167,11 @@ func TestParserInvalidParseSend(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectEntry(t, testEntry)
require.Equal(t, 1, processed)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

Expand All @@ -193,9 +196,10 @@ func TestParserInvalidTimeParseDrop(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
require.Equal(t, 0, processed)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

Expand All @@ -220,9 +224,10 @@ func TestParserInvalidTimeParseSend(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
require.Equal(t, 1, processed)
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}
Expand All @@ -244,9 +249,10 @@ func TestParserInvalidSeverityParseDrop(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
require.Equal(t, 0, processed)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

Expand Down Expand Up @@ -285,9 +291,10 @@ func TestParserInvalidTimeValidSeverityParse(t *testing.T) {
err := testEntry.Set(entry.NewBodyField("severity"), "info")
require.NoError(t, err)

err = parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
require.Equal(t, 0, processed)

// But, this should have been set anyways
require.Equal(t, entry.Info, testEntry.Severity)
Expand Down Expand Up @@ -337,17 +344,18 @@ func TestParserValidTimeInvalidSeverityParse(t *testing.T) {
err = testEntry.Set(entry.NewBodyField("timestamp"), sample)
require.NoError(t, err)

err = parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
require.Equal(t, 0, processed)

require.Equal(t, expected, testEntry.Timestamp)
}

func TestParserOutput(t *testing.T) {
output := &testutil.Operator{}
output.On("ID").Return("test-output")
output.On("Process", mock.Anything, mock.Anything).Return(nil)
output.On("Process", mock.Anything, mock.Anything).Return(1, nil)

parser := ParserOperator{
TransformerOperator: TransformerOperator{
Expand All @@ -369,9 +377,10 @@ func TestParserOutput(t *testing.T) {
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
processed, err := parser.ProcessWith(ctx, testEntry, parse)
require.NoError(t, err)
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
require.Equal(t, 1, processed)
}

func TestParserFields(t *testing.T) {
Expand Down Expand Up @@ -642,10 +651,11 @@ func TestParserFields(t *testing.T) {
require.NoError(t, err)

e := tc.input()
err = parser.ProcessWith(context.Background(), e, parse)
processed, err := parser.ProcessWith(context.Background(), e, parse)

require.NoError(t, err)
require.Equal(t, tc.output(), e)
require.Equal(t, 0, processed)
})
}
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/stanza/operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,30 @@ func (t *TransformerOperator) CanProcess() bool {
}

// ProcessWith will process an entry with a transform function.
func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) error {
// Returns number of entries being processed by pipeline and error eventually.
func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) (int, error) {
// Short circuit if the "if" condition does not match
skip, err := t.Skip(ctx, entry)
if err != nil {
return t.HandleEntryError(ctx, entry, err)
}
if skip {
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry), nil
}

if err := transform(entry); err != nil {
return t.HandleEntryError(ctx, entry, err)
}
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry), nil
}

// HandleEntryError will handle an entry error using the on_error strategy.
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error {
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) (int, error) {
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
return t.Write(ctx, entry), err
}
return err
return 0, err
}

func (t *TransformerOperator) Skip(ctx context.Context, entry *entry.Entry) (bool, error) {
Expand Down
Loading