Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/logstransformprocessor] Processor hangs waiting for logs that were filtered out #15378

Closed
antonblock opened this issue Oct 21, 2022 · 11 comments
Labels
bug Something isn't working priority:p2 Medium processor/logstransform Logs Transform processor

Comments

@antonblock
Copy link
Contributor

What happened?

Description

When using the logstransformprocessor to filter logs, the loop in processLogs will eventually hang waiting for the filtered out logs. processLogs is blocking, so the pipeline is blocked there.

Steps to Reproduce

Using the supplied config:

  1. Use some tool, e.g. flog to write logs to /tmp/test.log at a constant rate.
flog -f json -o /tmp/test.log -t log -w -l --delay=500ms
  1. Run the collector, and observe the exporter stop sending logs after several seconds.

Expected Result

Logs would continue to flow despite some records being dropped by the processor.

Actual Result

All pipelines using the filelog receiver stall once the processLogs method gets stuck waiting on the channel. The exporter doesn't send any logs despite new records being written constantly.

Collector version

c62e003

Environment information

Environment

OS: Ubuntu 20.04, macOS 12.6
Compiler(if manually compiled): go 1.18.7

OpenTelemetry Collector configuration

receivers:
  filelog:
    include:
      - /tmp/test.log

processors:
  logstransform:
    operators:
      - drop_ratio: 0.5
        expr: "true"
        type: filter

exporters:
  otlphttp:
    endpoint: http://localhost:9123

service:
  pipelines:
    logs:
      receivers: [filelog]
      processors: [logstransform]
      exporters: [otlphttp]

Log output

Processor forwarding one log, then getting stalled:


2022-10-21T09:36:37.178-0400	info	pipelines/pipelines.go:106	Receiver started.	{"kind": "receiver", "name": "filelog", "pipeline": "logs"}
2022-10-21T09:36:37.178-0400	info	service/service.go:105	Everything is ready. Begin running and processing data.
2022-10-21T09:36:37.379-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:37.379-0400	info	fileconsumer/file.go:161	Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer", "path": "/tmp/test.log"}
2022-10-21T09:36:37.779-0400	debug	otlphttpexporter/otlp.go:127	Preparing to make HTTP request	{"kind": "exporter", "data_type": "logs", "name": "otlphttp", "url": "http://localhost:9123/v1/logs"}
2022-10-21T09:36:38.380-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:39.579-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:40.579-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:41.779-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:42.979-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:43.979-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
2022-10-21T09:36:44.979-0400	debug	fileconsumer/file.go:124	Consuming files	{"kind": "receiver", "name": "filelog", "pipeline": "logs", "component": "fileconsumer"}
^C2022-10-21T09:36:45.208-0400	info	service/collector.go:193	Received signal from OS	{"signal": "interrupt"}
2022-10-21T09:36:45.208-0400	info	service/service.go:114	Starting shutdown...


### Additional context

_No response_
@antonblock antonblock added bug Something isn't working needs triage New item requiring triage labels Oct 21, 2022
@evan-bradley evan-bradley added priority:p2 Medium processor/logstransform Logs Transform processor and removed needs triage New item requiring triage labels Oct 24, 2022
@github-actions
Copy link
Contributor

Pinging code owners: @djaglowski @dehaansa. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@djaglowski
Copy link
Member

@dehaansa, @cpheps, any ideas on this one?

@cpheps
Copy link
Contributor

cpheps commented Oct 25, 2022

@djaglowski We've done a bit of digging and I thought it was the main select statement in the processLogs function.

select {
case <-doneChan:
	ltp.logger.Debug("loop stopped")
	return ld, errors.New("processor interrupted")
case output, ok := <-ltp.outputChannel:
	if !ok {
		return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline")
	}
	if output.err != nil {
		return ld, err
	}

	return output.logs, nil
}

If everything is filtered out the ltp.outputChannel blocks forever as nothing is sent as output. I'm not sure how the core stanza loop handles this case. I would think if everything in a batch is filtered out it should return an empty batch.

@sumo-drosiek
Copy link
Member

@djaglowski @cpheps, I made my own investigation and the flow for processor is more less the following:

  1. processor.processLogs
  2. fromPdataConverter.Batch
  3. fromPdataConverter.workerLoop
  4. fromPdataConverter.convertFromLogs
  5. processor.converterLoop
  6. filter.Process
  7. writer.Write
  8. adapter/emitter
  9. adapter/emitter.flush
  10. processor.emitterLoop
  11. converter.Batch
  12. converter.workerLoop
  13. converter.aggregationLoop
  14. converter.flushLoop
  15. converter.flush
  16. processor.consumerLoop
  17. processor.processLogs

The issue is that filter.Process doesn't call writer.Write when the log is being filtered out. Because of that processor.emitterLoop is waiting for any data forever, so processLogs as well.
Not sure how many processLogs can be run in parallel, but at the end it seems that all of them are waiting.

I'm not too familiar with the design intentions, but based on what I see I think that filter.Process and all other stanza processors should return some additional information if writer.Write has been run or not.
Next, this information could be handled by stanza or propagated to the channels, so processLogs flow can be either finished or aborted correctly.

@djaglowski
Copy link
Member

Thank for the detailed investigation @sumo-drosiek.

I think that filter.Process and all other stanza processors should return some additional information if writer.Write has been run or not.

I think currently we are not backpropagating anything between operators, but it makes sense to me that we should, either a boolean or checkable error. The other option would be to emit []*entry.Entry to writer.Write, and check for empty slices down the line.

@sumo-drosiek
Copy link
Member

@djaglowski I am making an attempt to fix it in #16452. Please make conceptional review of the approach

I think checkable error require less changes and make it more readable for operators which are not a writers, but on the other way its overloading the error

@djaglowski
Copy link
Member

@djaglowski I am making an attempt to fix it in #16452. Please make conceptional review of the approach

I think checkable error require less changes and make it more readable for operators which are not a writers, but on the other way its overloading the error

I think your assessment and approach look reasonable to me.

The signature returning (bool, error) seems unusual and makes me wonder if we're missing something idiomatic here, but I can't immediately identify anything better.

Given the unusual signature, I think it will be especially important to include a decent comment to explain how to interpret each value. My understanding is:

  • ok is false when the entry was dropped (filtered out) before being passed along to the next operator, and true otherwise. There is no intention of indicating a problem.
  • err typically indicates a parsing error, but could in theory be an unexpected problem.

@sumo-drosiek
Copy link
Member

@djaglowski

Given the unusual signature, I think it will be especially important to include a decent comment to explain how to interpret each value. My understanding is:

* `ok` is false when the entry was dropped (filtered out) before being passed along to the next operator, and true otherwise. There is no intention of indicating a problem.

* `err` typically indicates a parsing error, but could in theory be an unexpected problem.

I agree with that.

In addition I'm going to document writer.Write. My assumption for now is that it should return false only if all outputs will drop the entry. However, I'm not sure about that statement and I'm afraid that it can be error prone.

@djaglowski
Copy link
Member

In addition I'm going to document writer.Write. My assumption for now is that it should return false only if all outputs will drop the entry. However, I'm not sure about that statement and I'm afraid that it can be error prone.

Perhaps we should return the number of logs emitted.

@sumo-drosiek
Copy link
Member

I changed signature to return number of entries and error eventually. PR is ready for review

@djaglowski
Copy link
Member

Resolved by #17079

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working priority:p2 Medium processor/logstransform Logs Transform processor
Projects
None yet
Development

No branches or pull requests

5 participants