Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Generate function and extension logs via Telemetry API receiver #1347

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
bd5dd05
Added WithLogs and its handling
jerrytfleung May 24, 2024
63558a8
nits
jerrytfleung May 25, 2024
1ae4fdd
Added extensions
jerrytfleung May 25, 2024
0f74cbe
Fixed unit tests
jerrytfleung May 28, 2024
0c54292
Added unit test cases
jerrytfleung May 28, 2024
dd2e317
Added config (#26)
jerrytfleung Jun 20, 2024
13a99af
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jun 20, 2024
4ff5876
Added severityTextToNumber function
jerrytfleung Jun 20, 2024
730cfc9
Corrected README.md
jerrytfleung Jun 27, 2024
999a7ce
Handled empty types array
jerrytfleung Jun 27, 2024
1190cb5
Added CRITICAL & ALL
jerrytfleung Jun 27, 2024
165cdda
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jul 15, 2024
26a3c68
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Jul 17, 2024
16f92d1
Removed invalid test case
jerrytfleung Jul 17, 2024
2575871
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Aug 22, 2024
a297035
Fixed code after rebase
jerrytfleung Aug 22, 2024
2d33076
Updated README.md
jerrytfleung Aug 22, 2024
cdea000
Used time.RFC3339 format
jerrytfleung Aug 22, 2024
166628a
Applied review comments
jerrytfleung Aug 27, 2024
2e03ffe
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Aug 28, 2024
3b7cda5
Added WARNING, Updated test cases, Added String.ToUpper
jerrytfleung Aug 28, 2024
1bc05fb
Merge branch 'main' into feature/telemetryapireceiver-configuration
jerrytfleung Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
Expand Down
8 changes: 1 addition & 7 deletions collector/internal/telemetryapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client {
}
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}

func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
Expand Down
26 changes: 20 additions & 6 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |-----------------|
| Stability | [alpha] |
| Supported pipeline types | traces |
| Distributions | [extension] |
| Status | |
| ------------------------ |--------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand All @@ -15,11 +15,25 @@ Supported events:

## Configuration

There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration:
| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
telemetryapi/2:
port: 4326
types:
- platform
- function
telemetryapi/3:
port: 4326
types: ["platform", "function"]
```

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
Expand Down
11 changes: 11 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,23 @@

package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
)

// Config defines the configuration for the various elements of the receiver agent.
type Config struct {
extensionID string
Port int `mapstructure:"port"`
Types []string `mapstructure:"types"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
}
}
return nil
}
126 changes: 126 additions & 0 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,130 @@
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewID(component.MustNewType("telemetryapi")),
expected: NewFactory("extensionID").CreateDefaultConfig(),
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, function},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{platform, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
{
id: component.NewIDWithName(component.MustNewType("telemetryapi"), "11"),
expected: &Config{
extensionID: "extensionID",
Port: 12345,
Types: []string{function, extension},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory("extensionID")
cfg := factory.CreateDefaultConfig()
sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))
require.NoError(t, component.ValidateConfig(cfg))
require.Equal(t, tt.expected, cfg)
})
}
}

jerrytfleung marked this conversation as resolved.
Show resolved Hide resolved
func TestValidate(t *testing.T) {
testCases := []struct {
desc string
Expand All @@ -31,6 +150,13 @@ func TestValidate(t *testing.T) {
cfg: &Config{},
expectedErr: nil,
},
{
desc: "invalid config",
cfg: &Config{
Types: []string{"invalid"},
},
expectedErr: fmt.Errorf("unknown extension type: invalid"),
},
}

for _, tc := range testCases {
Expand Down
33 changes: 29 additions & 4 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
platform = "platform"
function = "function"
extension = "extension"
)

var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config")
Expand All @@ -37,16 +42,36 @@ func NewFactory(extensionID string) receiver.Factory {
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
}
},
receiver.WithTraces(createTracesReceiver, stability))
receiver.WithTraces(createTracesReceiver, stability),
receiver.WithLogs(createLogsReceiver, stability))
}

func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
Copy link
Contributor

@nslaughter nslaughter Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would want more data protection in this file than I see in the top-level variable for receivers, but it looks like the code doesn't read from receivers. As I traced my steps through this code, I started thinking we wouldn't get a benefit from the call to GetOrAdd receivers. What if we changed that to the following?

Suggested change
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
r := newTelemetryAPIReceiver(cfg, params)
r.registerTracesConsumer(next)

And if that works we don't have to rely on copying internal/sharedcomponent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and actually this is a bit fragile. this means that you cannot use this receiver in lets say, more than one pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to reuse the http server. internal/sharedcomponent can help to reuse the http server by looking up the receiver using cfg as a key.
I was assuming we could have configuration for telemetryapi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sharedcomponent could play a role so as to reuse http server and to support multiple instances of telemetry api receiver.

Can you take a look to the updated change and let me know your feedback? Thanks!

return r, nil
}

return newTelemetryAPIReceiver(cfg, next, params)
func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next)
return r, nil
}

var receivers = sharedcomponent.NewSharedComponents()
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test"}
var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents struct {
comps map[any]*SharedComponent
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents() *SharedComponents {
return &SharedComponents{
comps: make(map[any]*SharedComponent),
}
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent {
if c, ok := scs.comps[key]; ok {
return c
}
newComp := &SharedComponent{
Component: create(),
removeFunc: func() {
delete(scs.comps, key)
},
}
scs.comps[key] = newComp
return newComp
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent struct {
component.Component

startOnce sync.Once
stopOnce sync.Once
removeFunc func()
}

// Unwrap returns the original component.
func (r *SharedComponent) Unwrap() component.Component {
return r.Component
}

// Start implements component.Component.
func (r *SharedComponent) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
err = r.Component.Start(ctx, host)
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = r.Component.Shutdown(ctx)
r.removeFunc()
})
return err
}
Loading
Loading