-
Notifications
You must be signed in to change notification settings - Fork 176
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Decouple and Batch Processors to Collector (#959)
* Add Decouple and Batch Processors Add a new processor that decouples the receiver and exporter sides of the pipeline and is aware of lambda lifecycle events. Also add the Batch processor to the list of available processor to reduce the cost of lambda function invocation at the expense of data being delayed. * Fix missing go.sum entries * Add link to lambda lifecycle * Add additional comments to clarify lifecycle * Update README.md * Add Makefile to lambdalifecycle * Fix race detector error * Update dependencies
- Loading branch information
Showing
22 changed files
with
1,356 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle | ||
|
||
go 1.20 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package lambdalifecycle | ||
|
||
// Listener interface used to notify objects of Lambda lifecycle events. | ||
type Listener interface { | ||
// FunctionInvoked is called after the extension receives a "Next" notification. | ||
FunctionInvoked() | ||
// FunctionFinished is called after the extension is notified that the function has completed, but before the environment is frozen. | ||
// The environment is only frozen once all listeners have returned. | ||
FunctionFinished() | ||
// EnvironmentShutdown is called when the extension is notified that the environment is about to shut down. | ||
// Shutting down of the collector components only happens after all listeners have returned. | ||
EnvironmentShutdown() | ||
} | ||
|
||
type Notifier interface { | ||
AddListener(listener Listener) | ||
} | ||
|
||
var ( | ||
notifier Notifier | ||
) | ||
|
||
func SetNotifier(n Notifier) { | ||
notifier = n | ||
} | ||
|
||
func GetNotifier() Notifier { | ||
return notifier | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Decouple Processor | ||
|
||
| Status | | | ||
| ------------------------ |-----------------------| | ||
| Stability | [in development] | | ||
| Supported pipeline types | traces, metrics, logs | | ||
| Distributions | [extension] | | ||
|
||
This processor decouples the receiver and exporter ends of the pipeline. This allows the lambda function to finish before traces/metrics/logs have been exported by the collector. The processor is aware of the Lambda [lifecycle] and will prevent the environment from being frozen or shutdown until any pending traces/metrics/logs have been exported. | ||
In this way the response times of the Lambda function is not impacted by the need to export data, however the billed duration will include the time taken to export data as well as runtime of the lambda function. | ||
|
||
The decouple processor should always be the last processor in the list to ensure that there are no issues with data being sent while the environment is about to be frozen, which could result in lost data. | ||
|
||
When combined with the batch processor, the number of exports required can be significantly reduced and therefore the cost of running the lambda. This is with the trade-off that the data will not be available at your chosen endpoint until some time after the invocation, up to a maximum of 5 minutes (the timeout that the environment is shutdown when no further invocations are received). | ||
|
||
## Processor Configuration | ||
|
||
```yaml | ||
processors: | ||
decouple: | ||
# max_queue_size allows you to control how many spans etc. are accepted before the pipeline blocks | ||
# until an export has been completed. Default value is 200. | ||
max_queue_size: 20 | ||
``` | ||
[in development]: https://github.com/open-telemetry/opentelemetry-collector#development | ||
[extension]: https://github.com/open-telemetry/opentelemetry-lambda/collector | ||
[lifecycle]: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-lifecycle |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package decoupleprocessor // import "github.com/open-telemetry/opentelemetry-lambda/collector/processor/decoupleprocessor" | ||
import "errors" | ||
|
||
// Config defines the configuration for the various elements of the processor. | ||
type Config struct { | ||
MaxQueueSize uint32 `mapstructure:"max_queue_size"` | ||
} | ||
|
||
var invalidMaxQueueSizeError = errors.New("max_queue_size must be greater than 0") | ||
|
||
// Validate validates the configuration by checking for missing or invalid fields | ||
func (cfg *Config) Validate() error { | ||
if cfg.MaxQueueSize == 0 { | ||
return invalidMaxQueueSizeError | ||
} | ||
return nil | ||
} |
Oops, something went wrong.