Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logstransformprocessor deadlocks under load #16604

Closed
chrislbs opened this issue Dec 4, 2022 · 4 comments
Closed

logstransformprocessor deadlocks under load #16604

chrislbs opened this issue Dec 4, 2022 · 4 comments
Labels
bug Something isn't working processor/logstransform Logs Transform processor

Comments

@chrislbs
Copy link
Contributor

chrislbs commented Dec 4, 2022

Component(s)

processor/logstransform

What happened?

Description

There is a deadlock scenario caused in the logs transform processor under load.

The primary issue is that every invocation of logstransformprocessor.processLogs shares the same stanza pipeline and a common instance of the emitter.

Each invocation pushes log messages into the processing channel pipeline (blocking).

// Add the logs to the chain
err := ltp.fromConverter.Batch(ld)
if err != nil {
   return ld, err
}

func (c *FromPdataConverter) Batch(pLogs plog.Logs) error {
	for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
		rls := pLogs.ResourceLogs().At(i)
		for j := 0; j < rls.ScopeLogs().Len(); j++ {
			scope := rls.ScopeLogs().At(j)
			item := fromConverterWorkerItem{
				Resource:       rls.Resource(),
				Scope:          scope,
				LogRecordSlice: scope.LogRecords(),
			}
			select {
			case c.workerChan <- item:
				continue
			case <-c.stopChan:
				return nil
			}
		}
	}

	return nil
}

Then waits on an output channel that is at the tail end of that pipeline.

for {
   select {
   case <-doneChan:
   	ltp.logger.Debug("loop stopped")
   	return ld, errors.New("processor interrupted")
   case output, ok := <-ltp.outputChannel:
   	if !ok {
   		return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline")
   	}
   	if output.err != nil {
   		return ld, err
   	}
   	
   	return output.logs, nil
   }
}

However, because of the implementation of the shared emitter, each invocation to processLogs isn't necessarily going to receive the logs that came through it's pipeline. As each log message being sent through the stanza pipeline is processed, it gets pushed into a batch in LogEmitter. Logs pushed to the outputChannel to be processed only occur if at least 100 (max batch size) messages were pushed in or the timeout occurs.

// emitter.go
// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
func (e *LogEmitter) flusher(ctx context.Context) {
	defer e.wg.Done()

	ticker := time.NewTicker(e.flushInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
				e.flush(ctx, oldBatch)
			}
		case <-ctx.Done():
			return
		}
	}
}

It can cause a situation where multiple concurrent invocations push messages to be processed, but only 1 invocation receives the batch to select from. Each additional invocation is blocked waiting to select from the outputChannel that has nothing in it.

// All concurrent
processLogs pushes 25 messages to stanza pipeline -> emitter puts into batch -> batch has 100 items -> ltp selects from output and returns
processLogs pushes 25 messages to stanza pipeline -> emitter puts into batch -> batch has 0 items -> no op -> ltp blocks on select from output
processLogs pushes 25 messages to stanza pipeline -> emitter puts into batch -> batch has 0 items -> no op -> ltp blocks on select from output
processLogs pushes 25 messages to stanza pipeline -> emitter puts into batch -> batch has 0 items -> no op -> ltp blocks on select from output

The bottom 3 invocations are blocked because there are no more logs being emitted. If a new invocation comes in, only one of the blocked routines will become unblocked. If the upstream receivers of those pipelines do not timeout to cancel their context, it effectively creates a deadlock.

┌───────────────────────────────────────────────────┐
│ processLogs()                                     ├───────┐
│ ┌─────────────────────────────────────────────────┴─┐     │
│ │ processLogs()                                     ├─────┐
│ │ ┌─────────────────────────────────────────────────┴─┐   │
└─│ │  processLogs()                                    │───┐
  └─┤    Pushes messages to be processed on a stanza    │   │
    │    pipeline where emitter is the last stanza in   │   │
    │    the pipeline. Then waits to receive batched    │   │
    │    logs on a output chan that receives the        │   │
    │    batches from the emitter.                      │   │
    └───────────────────────────────────────────────────┘   │
          ┌─────────────────────────────────┐               │
          │ emitter.flush()                 ◄───────────────┘
          │  emits at most 100 messages per │
          │  process logs invocation.       │
          └─────────────────────────────────┘

Steps to Reproduce

I have example test cases here where you can see the effect of this.

7fb27bb

Expected Result

Each batch of log messages pushed into logstransform should be fully processed by processLogs.

Actual Result

The logs output by each call to processLogs are somewhat arbitrary because emitted batches could contain a combination of logs from other asynchronous invocations.

Collector version

v0.66.0

Environment information

Environment

OS: (e.g., "Ubuntu 20.04")
Compiler(if manually compiled): (e.g., "go 14.2")

OpenTelemetry Collector configuration

No response

Log output

3 @ 0x43c9b6 0x44c83c 0xc8a5d4 0xc42df8 0x9f0139 0xc8e504 0x46e021
#	0xc8a5d3	github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor.(*logsTransformProcessor).processLogs+0xd3		/obfuscated/opentelemetry-collector-contrib/processor/logstransformprocessor/processor.go:129
#	0xc42df7	go.opentelemetry.io/collector/processor/processorhelper.NewLogsProcessor.func1+0xf7								/obfuscated/go/pkg/mod/go.opentelemetry.io/[email protected]/processor/processorhelper/logs.go:62
#	0x9f0138	go.opentelemetry.io/collector/consumer.ConsumeLogsFunc.ConsumeLogs+0x38										/obfuscated/go/pkg/mod/go.opentelemetry.io/collector/[email protected]/logs.go:36
#	0xc8e503	github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor.TestLogsTransformProcessor_Deadlock.func1+0x83	/obfuscated/source/oss/opentelemetry-collector-contrib/processor/logstransformprocessor/processor_test.go:185

Additional context

No response

@chrislbs chrislbs added bug Something isn't working needs triage New item requiring triage labels Dec 4, 2022
@chrislbs
Copy link
Contributor Author

chrislbs commented Dec 4, 2022

This is a similar problem to what is pointed out in #15378 . I believe the right fix is that this processor should create a completely new stanza pipeline for each invocation. Then, leveraging the back propagation work @sumo-drosiek has in his PR #16452 , loop over and receive from the channel until all messages are processed.

In my case, I want to leverage the "move" transformer because it appears to be the only transform processor that supports arbitrary path walking on an object. body.nestedObject.subNestedObject.field.

As a temporary workaround for myself, I've invoked the batch process of fromPDataConverter in a go routine and then put a timeout on the output select. However, while that does help prevent the deadlock, the number of go routines over time will continue to build up as there will be a mismatch of the number of routines selecting from the channel vs the number of messages being pushed onto the channel.

@chrislbs
Copy link
Contributor Author

chrislbs commented Dec 5, 2022

Here is a commit that works for my particular case to ensure processLogs handles the logs pushed to it synchronously. However, it might have issues if any of the stanza's are filters like in the other issue linked above. I'll wait for some feedback before I move forward on any of these fixes.

@fatsheep9146 fatsheep9146 added the processor/logstransform Logs Transform processor label Dec 5, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Dec 5, 2022

Pinging code owners for processor/logstransform: @djaglowski @dehaansa. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@djaglowski
Copy link
Member

Resolved by #17079

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working processor/logstransform Logs Transform processor
Projects
None yet
Development

No branches or pull requests

3 participants