Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Decouple and Batch Processors to Collector #959

Merged
merged 9 commits into from
Nov 2, 2023

Conversation

adcharre
Copy link
Contributor

Problem

The introduction of force flush before a Lambda function finishes causes an increase in responses times due to the need to wait for the collector pipelines to complete before the lambda function returns.

Solution

By adding a new processor that decouples the receiver and exporter sides of the pipeline and is aware of lambda lifecycle events, the response time of the function is unaffected by the time it takes to export opentelemetry data.

In addition, adding the Batch processor to the list of available processors can reduce the cost of lambda function invocation by not needing to send opentelemetry data on every run. This is at the expense of data being delayed.

Add a new processor that decouples the receiver and exporter sides of the pipeline and is aware of lambda lifecycle events.

Also add the Batch processor to the list of available processor to reduce the cost of lambda function invocation at the expense of data being delayed.
@adcharre adcharre requested a review from a team October 20, 2023 11:05
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Oct 20, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

@tylerbenson
Copy link
Member

@adcharre please go through the CLA process.

@adcharre
Copy link
Contributor Author

@adcharre please go through the CLA process.

Trying to organise that at the moment - unfortunately my company hasn't yet signed the CLA so it's taking a bit of probing internally to get this done.

Copy link
Contributor

@codeboten codeboten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @adcharre! I'm curious if the batch processor alone would work if the collector is able to respond to shutdown events from the environment.

I remember a few years ago, the batch processor was not useful, since batches would get stuck and never sent when lambdas were frozen.

This processor decouples the receiver and exporter ends of the pipeline allowing the lambda function to finish before traces/metrics/logs are exported by the collector. The processor is aware of the Lambda lifecycle and will prevent the environment from being frozen or shutdown until any pending traces/metrics/logs have been exported.
In this way the response times of the Lambda function is not impacted by the need to export data, however the billed duration will include the time taken to export data as well as runtime of the lambda function.

When combined with the batch processor, the number of exports required can be significantly reduced and therefore the cost of running the lambda. This is with the trade-off that the data will not be available at your chosen endpoint until some time after the invocation, up to a maximum of 5 minutes (the timeout that the environment is shutdown when no further invocations are received).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the decouple processor would be better written as a lifecycle-aware batch processor. I worry that without this, data would likely end up sitting in the existing batch processor whether it was placed before or after the decouple processor.

Another question that comes to mind is whether the batch processor alone would work here since it would in theory be able to send all the data left in any existing batches through the normal shutdown process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the decouple processor would be better written as a lifecycle-aware batch processor. I worry that without this, data would likely end up sitting in the existing batch processor whether it was placed before or after the decouple processor.

Actually I was going to update the README to indicate that the batch processor must come before the decouple processor otherwise you run into problems (as mentioned below). It would be possible to rewrite this as you say, however the simplest approach was to do one thing and one thing well and re-use the existing batch processor in front of the decouple processor.
Also it's reasonable to just use the decouple processor on it's own to reduce the response time of the lambda while not delaying the delivery of otel data.

Another question that comes to mind is whether the batch processor alone would work here since it would in theory be able to send all the data left in any existing batches through the normal shutdown process.

Not on it's on no, I did also consider it. The problem is that the collector is only shutdown when the environment is about to be destroyed. However it's frozen after each function invocation.
Without the ability to prevent the environment being frozen until the exports have successfully completed you will end up with data occasionally being lost due network interruptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adcharre I'm not very familiar with extensions (or the collector)... could you highlight for me how the code you added handles the frozen scenario? (How does it prevent from being frozen.) Perhaps some additional comments in your code would be helpful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
}

// Check other components are ready before allowing the freezing of the environment.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tylerbenson - here's a comment to explain that we notifier other components that the function has finished and therefore need to be ready to freeze.

The new code I've added hooks into the existing lifecycle code,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good understanding of the extension lifecycle. If the lambda function returns and starts waiting for the next event, are you sure it waits for events to finish processing before freezing? Is this documented somewhere? How did you verify this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifecycle docs say it waits until the extension also calls Next.

But does the lambda function get the next event only after all extensions have called Next? If that were the case then wouldn't it still be waiting until the extension finishes, unless the extension calls Next before doing the export, but then it might be frozen before actually exporting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tylerbenson according to the lifecycle documentation (https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle)

Each phase starts with an event from Lambda to the runtime and to all registered extensions. The runtime and each extension signal completion by sending a Next API request. Lambda freezes the execution environment when each process has completed and there are no pending events.

@tsloughter the lambda runtime will only get the next invocation once all extensions have also signalled they are ready.
The sequence of events is:

  1. Lambda and Extensions receive Next event
  2. Lambda function completes and signals it's ready for the next event.
  3. Collector receives notification from Telemetry API that the function has finished.
  4. Decouple processor finishes forwarding data.
  5. Collector signals it's ready for the next event.
  6. Back to 1.

Also see the "Invoke phase" in the same link:

After receiving the function response from the runtime, Lambda returns the response to the client, even if extensions are still running.

and

The Invoke phase ends after the runtime and all extensions signal that they are done by sending a Next API request.

As to the comment about verifying this behaviour - from Cloudwatch logs to confirm what's documented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, cool, this is great.


type Listener interface {
FunctionInvoked()
FunctionFinished()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this is where some additional comments are required, I'll look at adding them tomorrow.

@adcharre
Copy link
Contributor Author

@codeboten & @tylerbenson - finally through the CLA and I've updated the readme and code with some more comments around lifecyle.

Copy link
Contributor

@codeboten codeboten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the submission @adcharre, would you consider submitting the decouple processor to the collector contrib repo? minor change i would suggest is calling it the queueingprocessor

specifically i think it would be beneficial to have a processor that can decouple the export pipeline from the batch processor into queues. many collector exporters do this today via the queueing supported in the exporter helper. a processor would take that responsibility out of the exporter altogether

@codeboten
Copy link
Contributor

Please take a look at the failing test:

--- FAIL: TestLifecycle (0.40s)
    --- FAIL: TestLifecycle/full_lifecycle_with_data_from_function (0.20s)
        testing.go:1465: race detected during execution of test
    --- FAIL: TestLifecycle/full_lifecycle_with_data_before_shutdown (0.20s)
        testing.go:1465: race detected during execution of test
    testing.go:1465: race detected during execution of test

@adcharre
Copy link
Contributor Author

adcharre commented Nov 1, 2023

Please take a look at the failing test:

--- FAIL: TestLifecycle (0.40s)
    --- FAIL: TestLifecycle/full_lifecycle_with_data_from_function (0.20s)
        testing.go:1465: race detected during execution of test
    --- FAIL: TestLifecycle/full_lifecycle_with_data_before_shutdown (0.20s)
        testing.go:1465: race detected during execution of test
    testing.go:1465: race detected during execution of test

I've reworked the 2 tests and tested locally and no more errors from the race detector.

@adcharre
Copy link
Contributor Author

adcharre commented Nov 1, 2023

Thanks for the submission @adcharre, would you consider submitting the decouple processor to the collector contrib repo? minor change i would suggest is calling it the queueingprocessor

specifically i think it would be beneficial to have a processor that can decouple the export pipeline from the batch processor into queues. many collector exporters do this today via the queueing supported in the exporter helper. a processor would take that responsibility out of the exporter altogether

I'll have a go if you think it'll be useful. I had assumed there would be no need as the batch processor can do the breaking the receive -> export pipeline. It should just be a matter of removing some code and renaming a few things...

@tylerbenson
Copy link
Member

@adcharre If you'd like to apply @codeboten's suggested naming change of queueingprocessor to this PR, we can move forward with merging. This way in the future when that processor is in the collector repo we can migrate without breaking users.

Thanks for your effort here. This is great! I really appreciate the explanations you've provided.

@adcharre
Copy link
Contributor Author

adcharre commented Nov 2, 2023

@adcharre If you'd like to apply @codeboten's suggested naming change of queueingprocessor to this PR, we can move forward with merging. This way in the future when that processor is in the collector repo we can migrate without breaking users.

Thanks for your effort here. This is great! I really appreciate the explanations you've provided.

@tylerbenson I don't think that makes sense as there will need to be 2 separate plugins!
This plugin is lambda lifecycle aware and makes use of packages that are not available in the main collector. The main collector plugin would just be the simple queuing mechanism to break the link between receiver and exporter.

Stepping back a bit, there could be 2 separate implementations of the queueingprocessor one here with the lambda lifecycle awareness and a separate implementation in opentelemetry-collector-contrib which doesn't include the lambda stuff?

@codeboten
Copy link
Contributor

Right, the proposal to submit the queueing portion of the processor (queueingprocessor) as its own processor was specifically around giving the processing pipeline the ability to decouple the exporting from the batch processor (as I mentioned earlier this is largely accomplished via queueing in the exporter helper today)

If in the future there was a facility for registering the lambda lifecycle mechanism in the queueingprocessor (which could be doable if it were done generically enough, not sure if theres a use-case beyond lambda), then the decoupleprocessor could be deprecated in favour of this queueingprocessor.

In the short term though, i think it's fine for them to be separate processors.

@tylerbenson
Copy link
Member

Ok, sorry for the confusion. Please resolve the conflicts from main then I can merge.

@codeboten
Copy link
Contributor

@adcharre please ignore my comment suggesting opening an issue/pr to the main collector repository. I was thankfully reminded that this used to exist in the core repository as the queuedprocessor and was deprecated in November 2021 😄 open-telemetry/opentelemetry-collector@e820370

@adcharre
Copy link
Contributor Author

adcharre commented Nov 2, 2023

@tylerbenson conflicts should now be resolved and I've updated the decoupleprocessors dependencies to be inline with the other processors.

@tylerbenson tylerbenson merged commit 2027890 into open-telemetry:main Nov 2, 2023
11 checks passed
@tylerbenson
Copy link
Member

@adcharre one thing I forgot to add here... Please update the readme with updated guidelines for using this new feature.

@adcharre
Copy link
Contributor Author

adcharre commented Dec 7, 2023

@adcharre one thing I forgot to add here... Please update the readme with updated guidelines for using this new feature.

@tylerbenson - Will do! I'll try and get round to it next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants