Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Decouple and Batch Processors to Collector #959

Merged
merged 9 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ go 1.20

replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents => ./lambdacomponents

replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ./lambdalifecycle

replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor => ./processor/coldstartprocessor

replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor => ./processor/decoupleprocessor

replace github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver => ./receiver/telemetryapireceiver

// fixes ambiguous import error: found package cloud.google.com/go/compute/metadata in multiple modules:
Expand All @@ -18,6 +22,7 @@ require (
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider v0.87.0
github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents v0.84.0
github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.87.0
go.opentelemetry.io/collector/confmap v0.87.0
Expand Down Expand Up @@ -94,6 +99,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.87.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.87.0 // indirect
github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor v0.84.0 // indirect
github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor v0.0.0-00010101000000-000000000000 // indirect
github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver v0.84.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down Expand Up @@ -138,6 +144,7 @@ require (
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0016 // indirect
go.opentelemetry.io/collector/pdata v1.0.0-rcv0016 // indirect
go.opentelemetry.io/collector/processor v0.87.0 // indirect
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0 // indirect
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0 // indirect
go.opentelemetry.io/collector/receiver v0.87.0 // indirect
go.opentelemetry.io/collector/receiver/otlpreceiver v0.87.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ go.opentelemetry.io/collector/pdata v1.0.0-rcv0016 h1:qCPXSQCoD3qeWFb1RuIks8fw9A
go.opentelemetry.io/collector/pdata v1.0.0-rcv0016/go.mod h1:OdN0alYOlYhHXu6BDlGehrZWgtBuiDsz/rlNeJeXiNg=
go.opentelemetry.io/collector/processor v0.87.0 h1:aUGtRyeQk0WgQwp2rZBvJ1j+6+WJO8XMb1kjtanIWo8=
go.opentelemetry.io/collector/processor v0.87.0/go.mod h1:FHqpqdm/uyjjhNQxXJBhvQDIwjnP01EW9M6t0xVaRR4=
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0 h1:/a2yjC8XMg1j/9hnpDbxTKbG/AyWac2xsQSx0PmFz1M=
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0/go.mod h1:uY8Lu7zFtNZC39ylu8bphgqO0c3VIqVdegKxXlHo9Po=
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0 h1:pWR4fPyKOBo0YWi745pai6ae7jFdlRvRiEg7VmtpGNw=
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0/go.mod h1:Id8f4KVl5p5Uzn7RlfFwufdaiINQTKILcTCLQFsSH6c=
go.opentelemetry.io/collector/receiver v0.87.0 h1:4HpA5Rxb1jcMywCB8y5aNTXiqSt3n7oaFLfQbAkSaWM=
Expand Down
40 changes: 35 additions & 5 deletions collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lifecycle
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"os"
"os/signal"
"path/filepath"
Expand All @@ -42,11 +43,12 @@ type collectorWrapper interface {
}

type manager struct {
logger *zap.Logger
collector collectorWrapper
extensionClient *extensionapi.Client
listener *telemetryapi.Listener
wg sync.WaitGroup
logger *zap.Logger
collector collectorWrapper
extensionClient *extensionapi.Client
listener *telemetryapi.Listener
wg sync.WaitGroup
lifecycleListeners []lambdalifecycle.Listener
}

func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) {
Expand Down Expand Up @@ -132,6 +134,7 @@ func (lm *manager) processEvents(ctx context.Context) error {
// Exit if we receive a SHUTDOWN event
if res.EventType == extensionapi.Shutdown {
lm.logger.Info("Received SHUTDOWN event")
lm.notifyEnvironmentShutdown()
lm.listener.Shutdown()
err = lm.collector.Stop()
if err != nil {
Expand All @@ -142,10 +145,37 @@ func (lm *manager) processEvents(ctx context.Context) error {
return err
}

lm.notifyFunctionInvoked()

err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
}

// Check other components are ready before allowing the freezing of the environment.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tylerbenson - here's a comment to explain that we notifier other components that the function has finished and therefore need to be ready to freeze.

The new code I've added hooks into the existing lifecycle code,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good understanding of the extension lifecycle. If the lambda function returns and starts waiting for the next event, are you sure it waits for events to finish processing before freezing? Is this documented somewhere? How did you verify this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifecycle docs say it waits until the extension also calls Next.

But does the lambda function get the next event only after all extensions have called Next? If that were the case then wouldn't it still be waiting until the extension finishes, unless the extension calls Next before doing the export, but then it might be frozen before actually exporting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tylerbenson according to the lifecycle documentation (https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle)

Each phase starts with an event from Lambda to the runtime and to all registered extensions. The runtime and each extension signal completion by sending a Next API request. Lambda freezes the execution environment when each process has completed and there are no pending events.

@tsloughter the lambda runtime will only get the next invocation once all extensions have also signalled they are ready.
The sequence of events is:

  1. Lambda and Extensions receive Next event
  2. Lambda function completes and signals it's ready for the next event.
  3. Collector receives notification from Telemetry API that the function has finished.
  4. Decouple processor finishes forwarding data.
  5. Collector signals it's ready for the next event.
  6. Back to 1.

Also see the "Invoke phase" in the same link:

After receiving the function response from the runtime, Lambda returns the response to the client, even if extensions are still running.

and

The Invoke phase ends after the runtime and all extensions signal that they are done by sending a Next API request.

As to the comment about verifying this behaviour - from Cloudwatch logs to confirm what's documented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, cool, this is great.

lm.notifyFunctionFinished()
}
}
}

func (lm *manager) notifyFunctionInvoked() {
for _, listener := range lm.lifecycleListeners {
listener.FunctionInvoked()
}
}

func (lm *manager) notifyFunctionFinished() {
for _, listener := range lm.lifecycleListeners {
listener.FunctionFinished()
}
}

func (lm *manager) notifyEnvironmentShutdown() {
for _, listener := range lm.lifecycleListeners {
listener.EnvironmentShutdown()
}
}

func (lm *manager) AddListener(listener lambdalifecycle.Listener) {
lm.lifecycleListeners = append(lm.lifecycleListeners, listener)
}
4 changes: 4 additions & 0 deletions collector/lambdacomponents/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
"github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
Expand Down Expand Up @@ -67,6 +69,8 @@ func Components(extensionID string) (otelcol.Factories, error) {
resourceprocessor.NewFactory(),
spanprocessor.NewFactory(),
coldstartprocessor.NewFactory(),
decoupleprocessor.NewFactory(),
batchprocessor.NewFactory(),
)
if err != nil {
errs = append(errs, err)
Expand Down
7 changes: 7 additions & 0 deletions collector/lambdacomponents/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.87.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.87.0
github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor v0.84.0
github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver v0.84.0
go.opentelemetry.io/collector/exporter v0.87.0
go.opentelemetry.io/collector/exporter/loggingexporter v0.87.0
Expand All @@ -19,6 +20,7 @@ require (
go.opentelemetry.io/collector/extension v0.87.0
go.opentelemetry.io/collector/otelcol v0.87.0
go.opentelemetry.io/collector/processor v0.87.0
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0
go.opentelemetry.io/collector/receiver v0.87.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.87.0
Expand Down Expand Up @@ -82,6 +84,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.87.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.87.0 // indirect
github.com/open-telemetry/opentelemetry-lambda/collector v0.81.0 // indirect
github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down Expand Up @@ -163,6 +166,10 @@ replace cloud.google.com/go => cloud.google.com/go v0.107.0

replace github.com/open-telemetry/opentelemetry-lambda/collector => ../

replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ../lambdalifecycle

replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/coldstartprocessor => ../processor/coldstartprocessor

replace github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor => ../processor/decoupleprocessor

replace github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver => ../receiver/telemetryapireceiver
2 changes: 2 additions & 0 deletions collector/lambdacomponents/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ go.opentelemetry.io/collector/pdata v1.0.0-rcv0016 h1:qCPXSQCoD3qeWFb1RuIks8fw9A
go.opentelemetry.io/collector/pdata v1.0.0-rcv0016/go.mod h1:OdN0alYOlYhHXu6BDlGehrZWgtBuiDsz/rlNeJeXiNg=
go.opentelemetry.io/collector/processor v0.87.0 h1:aUGtRyeQk0WgQwp2rZBvJ1j+6+WJO8XMb1kjtanIWo8=
go.opentelemetry.io/collector/processor v0.87.0/go.mod h1:FHqpqdm/uyjjhNQxXJBhvQDIwjnP01EW9M6t0xVaRR4=
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0 h1:/a2yjC8XMg1j/9hnpDbxTKbG/AyWac2xsQSx0PmFz1M=
go.opentelemetry.io/collector/processor/batchprocessor v0.87.0/go.mod h1:uY8Lu7zFtNZC39ylu8bphgqO0c3VIqVdegKxXlHo9Po=
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0 h1:pWR4fPyKOBo0YWi745pai6ae7jFdlRvRiEg7VmtpGNw=
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.87.0/go.mod h1:Id8f4KVl5p5Uzn7RlfFwufdaiINQTKILcTCLQFsSH6c=
go.opentelemetry.io/collector/receiver v0.87.0 h1:4HpA5Rxb1jcMywCB8y5aNTXiqSt3n7oaFLfQbAkSaWM=
Expand Down
1 change: 1 addition & 0 deletions collector/lambdalifecycle/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../Makefile.Common
3 changes: 3 additions & 0 deletions collector/lambdalifecycle/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle

go 1.20
43 changes: 43 additions & 0 deletions collector/lambdalifecycle/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lambdalifecycle

// Listener interface used to notify objects of Lambda lifecycle events.
type Listener interface {
// FunctionInvoked is called after the extension receives a "Next" notification.
FunctionInvoked()
// FunctionFinished is called after the extension is notified that the function has completed, but before the environment is frozen.
// The environment is only frozen once all listeners have returned.
FunctionFinished()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this is where some additional comments are required, I'll look at adding them tomorrow.

// EnvironmentShutdown is called when the extension is notified that the environment is about to shut down.
// Shutting down of the collector components only happens after all listeners have returned.
EnvironmentShutdown()
}

type Notifier interface {
AddListener(listener Listener)
}

var (
notifier Notifier
)

func SetNotifier(n Notifier) {
notifier = n
}

func GetNotifier() Notifier {
return notifier
}
4 changes: 4 additions & 0 deletions collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"os"

"go.uber.org/zap"
Expand Down Expand Up @@ -47,6 +48,9 @@ func main() {

ctx, lm := lifecycle.NewManager(context.Background(), logger, Version)

// Set the new lifecycle manager as the lifecycle notifier for all other components.
lambdalifecycle.SetNotifier(lm)

// Will block until shutdown event is received or cancelled via the context.
logger.Info("done", zap.Error(lm.Run(ctx)))
}
Expand Down
1 change: 1 addition & 0 deletions collector/processor/decoupleprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
28 changes: 28 additions & 0 deletions collector/processor/decoupleprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Decouple Processor

| Status | |
| ------------------------ |-----------------------|
| Stability | [in development] |
| Supported pipeline types | traces, metrics, logs |
| Distributions | [extension] |

This processor decouples the receiver and exporter ends of the pipeline. This allows the lambda function to finish before traces/metrics/logs have been exported by the collector. The processor is aware of the Lambda [lifecycle] and will prevent the environment from being frozen or shutdown until any pending traces/metrics/logs have been exported.
In this way the response times of the Lambda function is not impacted by the need to export data, however the billed duration will include the time taken to export data as well as runtime of the lambda function.

The decouple processor should always be the last processor in the list to ensure that there are no issues with data being sent while the environment is about to be frozen, which could result in lost data.

When combined with the batch processor, the number of exports required can be significantly reduced and therefore the cost of running the lambda. This is with the trade-off that the data will not be available at your chosen endpoint until some time after the invocation, up to a maximum of 5 minutes (the timeout that the environment is shutdown when no further invocations are received).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the decouple processor would be better written as a lifecycle-aware batch processor. I worry that without this, data would likely end up sitting in the existing batch processor whether it was placed before or after the decouple processor.

Another question that comes to mind is whether the batch processor alone would work here since it would in theory be able to send all the data left in any existing batches through the normal shutdown process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the decouple processor would be better written as a lifecycle-aware batch processor. I worry that without this, data would likely end up sitting in the existing batch processor whether it was placed before or after the decouple processor.

Actually I was going to update the README to indicate that the batch processor must come before the decouple processor otherwise you run into problems (as mentioned below). It would be possible to rewrite this as you say, however the simplest approach was to do one thing and one thing well and re-use the existing batch processor in front of the decouple processor.
Also it's reasonable to just use the decouple processor on it's own to reduce the response time of the lambda while not delaying the delivery of otel data.

Another question that comes to mind is whether the batch processor alone would work here since it would in theory be able to send all the data left in any existing batches through the normal shutdown process.

Not on it's on no, I did also consider it. The problem is that the collector is only shutdown when the environment is about to be destroyed. However it's frozen after each function invocation.
Without the ability to prevent the environment being frozen until the exports have successfully completed you will end up with data occasionally being lost due network interruptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adcharre I'm not very familiar with extensions (or the collector)... could you highlight for me how the code you added handles the frozen scenario? (How does it prevent from being frozen.) Perhaps some additional comments in your code would be helpful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


## Processor Configuration

```yaml
processors:
decouple:
# max_queue_size allows you to control how many spans etc. are accepted before the pipeline blocks
# until an export has been completed. Default value is 200.
max_queue_size: 20
```

[in development]: https://github.com/open-telemetry/opentelemetry-collector#development
[extension]: https://github.com/open-telemetry/opentelemetry-lambda/collector
[lifecycle]: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle
31 changes: 31 additions & 0 deletions collector/processor/decoupleprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package decoupleprocessor // import "github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor"
import "errors"

// Config defines the configuration for the various elements of the processor.
type Config struct {
MaxQueueSize uint32 `mapstructure:"max_queue_size"`
}

var invalidMaxQueueSizeError = errors.New("max_queue_size must be greater than 0")

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
if cfg.MaxQueueSize == 0 {
return invalidMaxQueueSizeError
}
return nil
}
Loading
Loading