Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NH-76996 Otelcol: collecting Lambda function logs (code refactoring) #21

Merged
merged 11 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ service:
processors: [resource,resourcedetection,decouple]
exporters: [otlp,logging]
logs:
receivers: [telemetryapi]
receivers: [otlp,telemetryapi]
processors: [resource,resourcedetection,decouple]
exporters: [otlp,logging]
telemetry:
Expand Down
2 changes: 0 additions & 2 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,6 @@ go.opentelemetry.io/collector/receiver v0.99.0 h1:NdYShaEaabxVBRQaxK/HcKqRGl1eUF
go.opentelemetry.io/collector/receiver v0.99.0/go.mod h1:aU9ftU4FhdEY9/eREf86FWHmZHz8kufXchfpHrTTrn0=
go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 h1:bAICAUW2K67kL9NvN+liAaXt9wXINFqgcZr1H8vUrow=
go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0/go.mod h1:0IUYqbqx2ZxxW3iYVHjW0ulA4bPkUJSLVCFJq4MR8FI=
go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg=
go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs=
go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/service v0.99.0 h1:Ods9uVHAZb1PW1nTC5XOzC+lC1qrA+EVyt1NNzT8Uqk=
Expand Down
2 changes: 1 addition & 1 deletion collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
Expand Down
11 changes: 1 addition & 10 deletions collector/internal/telemetryapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewClient(logger *zap.Logger) *Client {
}
}

func (c *Client) SubscribeEvents(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
Expand Down Expand Up @@ -104,15 +104,6 @@ func (c *Client) SubscribeEvents(ctx context.Context, eventTypes []EventType, ex
return string(body), nil
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}
return c.SubscribeEvents(ctx, eventTypes, extensionID, listenerURI)
}

func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data))
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions collector/internal/telemetryapi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ func (s *Listener) Wait(ctx context.Context, reqID string) error {
continue
}

if i.Record["requestId"] == reqID {
return nil
if record, ok := i.Record.(map[string]any); ok {
if record["requestId"] == reqID {
return nil
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions collector/internal/telemetryapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type SubscribeRequest struct {
}

type Event struct {
Time string `json:"time"`
Type string `json:"type"`
Record map[string]any `json:"record"`
Time string `json:"time"`
Type string `json:"type"`
Record any `json:"record"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we change the type because it can be something other than a map?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

}
10 changes: 5 additions & 5 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |-----------------|
| Stability | [in development]|
| Supported pipeline types | traces |
| Distributions | [extension] |
| Status | |
| ------------------------ |-----------------------|
| Stability | [in development] |
| Supported pipeline types | traces, metrics, logs |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand Down
28 changes: 25 additions & 3 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -40,6 +41,7 @@ func NewFactory(extensionID string) receiver.Factory {
}
},
receiver.WithTraces(createTracesReceiver, stability),
receiver.WithMetrics(createMetricsReceiver, stability),
receiver.WithLogs(createLogsReceiver, stability))
}

Expand All @@ -48,15 +50,35 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next)
return r, nil
}

return newTelemetryAPIReceiver(cfg, next, params)
func createMetricsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Metrics) (receiver.Metrics, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerMetricsConsumer(next)
return r, nil
}

func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}

return newTelemetryAPILogsReceiver(cfg, next, params)
r := receivers.GetOrAdd(cfg, func() component.Component {
return newTelemetryAPIReceiver(cfg, params)
})
r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next)
return r, nil
}

var receivers = sharedcomponent.NewSharedComponents()
3 changes: 0 additions & 3 deletions collector/receiver/telemetryapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ go 1.21.0

toolchain go1.22.2

replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../

require (
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.99.0
go.opentelemetry.io/collector/consumer v0.99.0
Expand Down
2 changes: 0 additions & 2 deletions collector/receiver/telemetryapireceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ go.opentelemetry.io/collector/pdata/testdata v0.99.0 h1:/cEg4jdR3ntR3kZ0XjSelaBn
go.opentelemetry.io/collector/pdata/testdata v0.99.0/go.mod h1:YzEkHFLPsxeNI2gv6UQvvn73nsgRNxMRnBpY63qvdsg=
go.opentelemetry.io/collector/receiver v0.99.0 h1:NdYShaEaabxVBRQaxK/HcKqRGl1eUFaipKmjZlQb5FA=
go.opentelemetry.io/collector/receiver v0.99.0/go.mod h1:aU9ftU4FhdEY9/eREf86FWHmZHz8kufXchfpHrTTrn0=
go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg=
go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs=
go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents struct {
comps map[any]*SharedComponent
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents() *SharedComponents {
return &SharedComponents{
comps: make(map[any]*SharedComponent),
}
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent {
if c, ok := scs.comps[key]; ok {
return c
}
newComp := &SharedComponent{
Component: create(),
removeFunc: func() {
delete(scs.comps, key)
},
}
scs.comps[key] = newComp
return newComp
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent struct {
component.Component

startOnce sync.Once
stopOnce sync.Once
removeFunc func()
}

// Unwrap returns the original component.
func (r *SharedComponent) Unwrap() component.Component {
return r.Component
}

// Start implements component.Component.
func (r *SharedComponent) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
err = r.Component.Start(ctx, host)
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = r.Component.Shutdown(ctx)
r.removeFunc()
})
return err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sharedcomponent

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

var id = component.MustNewID("test")

func TestNewSharedComponents(t *testing.T) {
comps := NewSharedComponents()
assert.Len(t, comps.comps, 0)
}

type mockComponent struct {
component.StartFunc
component.ShutdownFunc
}

func TestSharedComponents_GetOrAdd(t *testing.T) {
nop := &mockComponent{}
createNop := func() component.Component { return nop }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createNop)
assert.Len(t, comps.comps, 1)
assert.Same(t, nop, got.Unwrap())
assert.Same(t, got, comps.GetOrAdd(id, createNop))

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
assert.NotSame(t, got, comps.GetOrAdd(id, createNop))
}

func TestSharedComponent(t *testing.T) {
wantErr := errors.New("my error")
calledStart := 0
calledStop := 0
comp := &mockComponent{
StartFunc: func(_ context.Context, _ component.Host) error {
calledStart++
return wantErr
},
ShutdownFunc: func(_ context.Context) error {
calledStop++
return wantErr
},
}
createComp := func() component.Component { return comp }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createComp)
assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
assert.Equal(t, wantErr, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
// Second time is not called anymore.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}
Loading