diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 3ee1db2826..052c45f671 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -75,7 +75,7 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex } telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, res.ExtensionID, addr) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) if err != nil { logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) } diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index 2eb9fd5e4e..16883e1804 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { - eventTypes := []EventType{ - Platform, - // Function, - // Extension, - } - +func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { bufferingConfig := BufferingCfg{ MaxItems: 1000, MaxBytes: 256 * 1024, diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index fe2c2745f1..f7e7ebca61 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -1,10 +1,10 @@ # Telemetry API Receiver -| Status | | -| ------------------------ |-----------------| -| Stability | [alpha] | -| Supported pipeline types | traces | -| Distributions | [extension] | +| Status | | +| ------------------------ |--------------| +| Stability | [alpha] | +| Supported pipeline types | traces, logs | +| Distributions | [extension] | This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup. @@ -15,11 +15,25 @@ Supported events: ## Configuration -There are currently no configuration parameters available for this receiver. It can be enabled via the following configuration: +| Field | Default | Description | +|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| `port` | 4325 | HTTP server port to receive Telemetry API data. | +| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | + ```yaml receivers: telemetryapi: + telemetryapi/1: + port: 4326 + telemetryapi/2: + port: 4327 + types: + - platform + - function + telemetryapi/3: + port: 4328 + types: ["platform", "function"] ``` [alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 86b5250196..b51ef1ed57 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -14,12 +14,23 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" +import ( + "fmt" +) + // Config defines the configuration for the various elements of the receiver agent. type Config struct { extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` } // Validate validates the configuration by checking for missing or invalid fields func (cfg *Config) Validate() error { + for _, t := range cfg.Types { + if t != platform && t != function && t != extension { + return fmt.Errorf("unknown extension type: %s", t) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/config_test.go b/collector/receiver/telemetryapireceiver/config_test.go index 7f3969dd7c..025846e8cd 100644 --- a/collector/receiver/telemetryapireceiver/config_test.go +++ b/collector/receiver/telemetryapireceiver/config_test.go @@ -15,11 +15,107 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" ) +func TestLoadConfig(t *testing.T) { + t.Parallel() + + // Helper function to create expected Config + createExpectedConfig := func(types []string) *Config { + return &Config{ + extensionID: "extensionID", + Port: 12345, + Types: types, + } + } + + tests := []struct { + name string + id component.ID + expected component.Config + }{ + { + name: "default", + id: component.NewID(component.MustNewType("telemetryapi")), + expected: NewFactory("extensionID").CreateDefaultConfig(), + }, + { + name: "all types", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "1"), + expected: createExpectedConfig([]string{platform, function, extension}), + }, + { + name: "platform only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "2"), + expected: createExpectedConfig([]string{platform}), + }, + { + name: "function only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "3"), + expected: createExpectedConfig([]string{function}), + }, + { + name: "extension only", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "4"), + expected: createExpectedConfig([]string{extension}), + }, + { + name: "platform and function", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "5"), + expected: createExpectedConfig([]string{platform, function}), + }, + { + name: "platform and extension", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "6"), + expected: createExpectedConfig([]string{platform, extension}), + }, + { + name: "function and extension", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "7"), + expected: createExpectedConfig([]string{function, extension}), + }, + { + name: "empty types", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "8"), + expected: createExpectedConfig([]string{}), + }, + { + name: "function and extension (alternative syntax)", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "9"), + expected: createExpectedConfig([]string{function, extension}), + }, + { + name: "function and extension (another syntax)", + id: component.NewIDWithName(component.MustNewType("telemetryapi"), "10"), + expected: createExpectedConfig([]string{function, extension}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + + factory := NewFactory("extensionID") + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(tt.id.String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + require.Equal(t, tt.expected, cfg) + }) + } +} + func TestValidate(t *testing.T) { testCases := []struct { desc string @@ -31,6 +127,13 @@ func TestValidate(t *testing.T) { cfg: &Config{}, expectedErr: nil, }, + { + desc: "invalid config", + cfg: &Config{ + Types: []string{"invalid"}, + }, + expectedErr: fmt.Errorf("unknown extension type: invalid"), + }, } for _, tc := range testCases { diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 88d23f8c81..83ab96f23b 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -18,14 +18,19 @@ import ( "context" "errors" + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" ) const ( - typeStr = "telemetryapi" - stability = component.StabilityLevelDevelopment + typeStr = "telemetryapi" + stability = component.StabilityLevelDevelopment + defaultPort = 4325 + platform = "platform" + function = "function" + extension = "extension" ) var errConfigNotTelemetryAPI = errors.New("config was not a Telemetry API receiver config") @@ -37,9 +42,12 @@ func NewFactory(extensionID string) receiver.Factory { func() component.Config { return &Config{ extensionID: extensionID, + Port: defaultPort, + Types: []string{platform, function, extension}, } }, - receiver.WithTraces(createTracesReceiver, stability)) + receiver.WithTraces(createTracesReceiver, stability), + receiver.WithLogs(createLogsReceiver, stability)) } func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { @@ -47,6 +55,25 @@ func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf c if !ok { return nil, errConfigNotTelemetryAPI } + r := receivers.GetOrAdd(cfg, func() component.Component { + t, _ := newTelemetryAPIReceiver(cfg, params) + return t + }) + r.Unwrap().(*telemetryAPIReceiver).registerTracesConsumer(next) + return r, nil +} - return newTelemetryAPIReceiver(cfg, next, params) +func createLogsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + r := receivers.GetOrAdd(cfg, func() component.Component { + t, _ := newTelemetryAPIReceiver(cfg, params) + return t + }) + r.Unwrap().(*telemetryAPIReceiver).registerLogsConsumer(next) + return r, nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/collector/receiver/telemetryapireceiver/factory_test.go b/collector/receiver/telemetryapireceiver/factory_test.go index 0d60191d15..69247f5325 100644 --- a/collector/receiver/telemetryapireceiver/factory_test.go +++ b/collector/receiver/telemetryapireceiver/factory_test.go @@ -41,7 +41,7 @@ func TestNewFactory(t *testing.T) { testFunc: func(t *testing.T) { factory := NewFactory("test") - var expectedCfg component.Config = &Config{extensionID: "test"} + var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}} require.Equal(t, expectedCfg, factory.CreateDefaultConfig()) }, diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 9dc718eeb1..fb66eefafc 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -11,11 +11,12 @@ require ( github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.107.0 - go.opentelemetry.io/collector/consumer v0.106.1 - go.opentelemetry.io/collector/consumer/consumertest v0.106.1 + go.opentelemetry.io/collector/confmap v0.107.0 + go.opentelemetry.io/collector/consumer v0.107.0 + go.opentelemetry.io/collector/consumer/consumertest v0.107.0 go.opentelemetry.io/collector/pdata v1.13.0 - go.opentelemetry.io/collector/receiver v0.106.1 - go.opentelemetry.io/collector/semconv v0.106.1 + go.opentelemetry.io/collector/receiver v0.107.0 + go.opentelemetry.io/collector/semconv v0.107.0 go.uber.org/zap v1.27.0 ) @@ -25,9 +26,16 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect @@ -37,8 +45,10 @@ require ( github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.107.0 // indirect - go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.106.1 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 // indirect + go.opentelemetry.io/collector/featuregate v1.13.0 // indirect + go.opentelemetry.io/collector/internal/globalgates v0.107.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.107.0 // indirect go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect @@ -46,9 +56,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 864167af18..4998268d81 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -11,6 +11,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc= +github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= @@ -20,14 +22,26 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= +github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -54,27 +68,33 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/collector v0.106.1 h1:ZSQMpFGzFP3RILe1/+K80kCCT2ahn3MKt5e3u0Yz7Rs= +go.opentelemetry.io/collector v0.107.0 h1:C1Mng03iE73flGhEg795IFVlr3qhDLef5GESjIVtx5g= go.opentelemetry.io/collector/component v0.107.0 h1:3ReaEAtKwrPj7HrlKjEGBDKbBaxdRMPC2mfZ9b6zjXE= go.opentelemetry.io/collector/component v0.107.0/go.mod h1:1xMIYKvpnP7laipjgEw7kq1ozG7ySLkA0Evhr2Bp8M4= go.opentelemetry.io/collector/config/configtelemetry v0.107.0 h1:pSGd4FWQ/Up/Af+XZTR8JNneH/wmQ/TAU4Z16JHQeUc= go.opentelemetry.io/collector/config/configtelemetry v0.107.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= -go.opentelemetry.io/collector/consumer v0.106.1 h1:+AQ/Kmoc/g0WP8thwymNkXk1jeWsHDK6XyYfdezcxcc= -go.opentelemetry.io/collector/consumer v0.106.1/go.mod h1:oy6pR/v5o/N9cxsICskyt//bU8k8EG0JeOO1MTDfs5A= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1 h1:uxQjWm2XE7d1OncQDM9tL1ha+otGt1HjoRYIcQRMOfQ= -go.opentelemetry.io/collector/consumer/consumerprofiles v0.106.1/go.mod h1:xQScBf9/PORFaYM6JVPOr7/TcRVEuKcW5XbAXfJByRs= -go.opentelemetry.io/collector/consumer/consumertest v0.106.1 h1:hDdFeVjCLIJ6iLfbiYcV9s+4iboFXbkJ/k3h09qusPw= -go.opentelemetry.io/collector/consumer/consumertest v0.106.1/go.mod h1:WRTYnQ8bYHQrEN6eJZ80oC4pNI7VeDRdsTZI6xs9o5M= +go.opentelemetry.io/collector/confmap v0.107.0 h1:M2o7jvQM9bnMU3pE2N6BK4KHYtSnvsSZkegUD89y8BU= +go.opentelemetry.io/collector/confmap v0.107.0/go.mod h1:9Fs/ZEIeiMa38VqkqIpn+JKQkcPf/lhAKA9fHu6c9GY= +go.opentelemetry.io/collector/consumer v0.107.0 h1:fF/+xyv9BfXQUvuJqkljrpzKyBQExDQt6zB5rzGyuHs= +go.opentelemetry.io/collector/consumer v0.107.0/go.mod h1:wgWpFes9sbnZ11XeJPSeutU8GJx6dT/gzSUqHpaZZQA= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0 h1:SEP5rLm4KgBaELciRQO4m9U2q3xn16KGjpIw8zQn6Ik= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.107.0/go.mod h1:Vi/aqlZjCBdGgGu+iOEfUyHvq2TJBar0WfsQSOMhR6Y= +go.opentelemetry.io/collector/consumer/consumertest v0.107.0 h1:BfjFHHAqbTmCN32akYvMhWKYC+ayHTX935/fRChwohM= +go.opentelemetry.io/collector/consumer/consumertest v0.107.0/go.mod h1:qNMedscdVyuxbV+wWUt4yGKQM3c0YEgQJTFeAtGZjRY= +go.opentelemetry.io/collector/featuregate v1.13.0 h1:rc84eCf5hesXQ8/bP6Zc15wqthbomfLBHmox5tT7AwM= +go.opentelemetry.io/collector/featuregate v1.13.0/go.mod h1:PsOINaGgTiFc+Tzu2K/X2jP+Ngmlp7YKGV1XrnBkH7U= +go.opentelemetry.io/collector/internal/globalgates v0.107.0 h1:PaD6WgQg80YTVxg8OF+YEqgI7WRd13wMu/R6GIG7uNU= +go.opentelemetry.io/collector/internal/globalgates v0.107.0/go.mod h1:hca7Tpzu6JmBrAOgmlyp/ZM6kxprPRMKqSYoq/Tdzjw= go.opentelemetry.io/collector/pdata v1.13.0 h1:eV3NQt2f1UcaibkziMvGTQI34LlpiYBUGp1yP0G/Cxw= go.opentelemetry.io/collector/pdata v1.13.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI= -go.opentelemetry.io/collector/pdata/pprofile v0.106.1 h1:nOLo25YnluNi+zAbU7G24RN86cJ1/EZJc6VEayBlOPo= -go.opentelemetry.io/collector/pdata/pprofile v0.106.1/go.mod h1:chr7lMJIzyXkccnPRkIPhyXtqLZLSReZYhwsggOGEfg= -go.opentelemetry.io/collector/pdata/testdata v0.106.1 h1:JUyLAwKD8o/9jgkBi16zOClxOyY028A7XIXHPV4mNmM= -go.opentelemetry.io/collector/pdata/testdata v0.106.1/go.mod h1:ghdz2RDEzsfigW0J+9oqA4fGmQJ/DJYUhE3vYU6JfhM= -go.opentelemetry.io/collector/receiver v0.106.1 h1:9kDLDJmInnz+AzAV9oV/UGMoc1+oI1pwMMs7+uMiJq4= -go.opentelemetry.io/collector/receiver v0.106.1/go.mod h1:3j9asWz7mqsgE77rPaNhlNQhRwgFhRynf0UEPs/4rkM= -go.opentelemetry.io/collector/semconv v0.106.1 h1:x0OSXrQCFinqZNUPTKrHU0gnbwngOVOPyhedQCDyDoQ= -go.opentelemetry.io/collector/semconv v0.106.1/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= +go.opentelemetry.io/collector/pdata/pprofile v0.107.0 h1:F25VZrEkSaneIBNcNI9LEBWf9nRC/WHKluSBTP0gKAA= +go.opentelemetry.io/collector/pdata/pprofile v0.107.0/go.mod h1:1GrwsKtgogRCt8aG/0lfJ037yDdFtYqF+OtJr+snxRQ= +go.opentelemetry.io/collector/pdata/testdata v0.107.0 h1:02CqvJrYjkrBlWDD+6yrByN1AhG2zT61OScLPhyyMwU= +go.opentelemetry.io/collector/pdata/testdata v0.107.0/go.mod h1:bqaeiDH1Lc5DFJXvjVHwO50x00TXj+oFre+EbOVeZXs= +go.opentelemetry.io/collector/receiver v0.107.0 h1:zfqvvYw5EmGsHT0WAfRyBv1WDN1uSXYRVNuHlYswTmQ= +go.opentelemetry.io/collector/receiver v0.107.0/go.mod h1:b29OEGTLMTit+2Xj8MA59PFbZVXpiTMGnVR0SuzqrI0= +go.opentelemetry.io/collector/semconv v0.107.0 h1:MrrUR4L4tu3IE1JxsxtT/PxjVUqvd6SC9d/dQzk/OxA= +go.opentelemetry.io/collector/semconv v0.107.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/prometheus v0.50.0 h1:2Ewsda6hejmbhGFyUvWZjUThC98Cf8Zy6g0zkIimOng= @@ -102,20 +122,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go new file mode 100644 index 0000000000..b297d81586 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package sharedcomponent exposes util functionality for receivers and exporters +// that need to share state between different signal types instances such as net.Listener or os.File. +package sharedcomponent // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/sharedcomponent" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// SharedComponents a map that keeps reference of all created instances for a given configuration, +// and ensures that the shared state is started and stopped only once. +type SharedComponents struct { + comps map[any]*SharedComponent +} + +// NewSharedComponents returns a new empty SharedComponents. +func NewSharedComponents() *SharedComponents { + return &SharedComponents{ + comps: make(map[any]*SharedComponent), + } +} + +// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// and adds it to the map of references. +func (scs *SharedComponents) GetOrAdd(key any, create func() component.Component) *SharedComponent { + if c, ok := scs.comps[key]; ok { + return c + } + newComp := &SharedComponent{ + Component: create(), + removeFunc: func() { + delete(scs.comps, key) + }, + } + scs.comps[key] = newComp + return newComp +} + +// SharedComponent ensures that the wrapped component is started and stopped only once. +// When stopped it is removed from the SharedComponents map. +type SharedComponent struct { + component.Component + + startOnce sync.Once + stopOnce sync.Once + removeFunc func() +} + +// Unwrap returns the original component. +func (r *SharedComponent) Unwrap() component.Component { + return r.Component +} + +// Start implements component.Component. +func (r *SharedComponent) Start(ctx context.Context, host component.Host) error { + var err error + r.startOnce.Do(func() { + err = r.Component.Start(ctx, host) + }) + return err +} + +// Shutdown implements component.Component. +func (r *SharedComponent) Shutdown(ctx context.Context) error { + var err error + r.stopOnce.Do(func() { + err = r.Component.Shutdown(ctx) + r.removeFunc() + }) + return err +} diff --git a/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go new file mode 100644 index 0000000000..dad4886c17 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/internal/sharedcomponent/sharedcomponent_test.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sharedcomponent + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +var id = component.MustNewID("test") + +func TestNewSharedComponents(t *testing.T) { + comps := NewSharedComponents() + assert.Len(t, comps.comps, 0) +} + +type mockComponent struct { + component.StartFunc + component.ShutdownFunc +} + +func TestSharedComponents_GetOrAdd(t *testing.T) { + nop := &mockComponent{} + createNop := func() component.Component { return nop } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createNop) + assert.Len(t, comps.comps, 1) + assert.Same(t, nop, got.Unwrap()) + assert.Same(t, got, comps.GetOrAdd(id, createNop)) + + // Shutdown nop will remove + assert.NoError(t, got.Shutdown(context.Background())) + assert.Len(t, comps.comps, 0) + assert.NotSame(t, got, comps.GetOrAdd(id, createNop)) +} + +func TestSharedComponent(t *testing.T) { + wantErr := errors.New("my error") + calledStart := 0 + calledStop := 0 + comp := &mockComponent{ + StartFunc: func(_ context.Context, _ component.Host) error { + calledStart++ + return wantErr + }, + ShutdownFunc: func(_ context.Context) error { + calledStop++ + return wantErr + }, + } + createComp := func() component.Component { return comp } + + comps := NewSharedComponents() + got := comps.GetOrAdd(id, createComp) + assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // Second time is not called anymore. + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + assert.Equal(t, wantErr, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) + // Second time is not called anymore. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 1, calledStop) +} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 0e484454bd..6fbc8a6774 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -24,36 +24,42 @@ import ( "math/rand" "net/http" "os" + "strconv" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const defaultListenerPort = "4325" const initialQueueSize = 5 +const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPIReceiver struct { httpServer *http.Server logger *zap.Logger queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later - nextConsumer consumer.Traces + nextTraces consumer.Traces + nextLogs consumer.Logs lastPlatformStartTime string lastPlatformEndTime string extensionID string + port int + types []telemetryapi.EventType resource pcommon.Resource } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress() + address := listenOnAddress(r.port) r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() @@ -64,10 +70,12 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.Subscribe(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err + if len(r.types) > 0 { + _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } } return nil } @@ -147,12 +155,26 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { - err := r.nextConsumer.ConsumeTraces(context.Background(), td) - if err == nil { - r.lastPlatformEndTime = "" - r.lastPlatformStartTime = "" - } else { - r.logger.Error("error receiving traces", zap.Error(err)) + if r.nextTraces != nil { + err := r.nextTraces.ConsumeTraces(context.Background(), td) + if err == nil { + r.lastPlatformEndTime = "" + r.lastPlatformStartTime = "" + } else { + r.logger.Error("error receiving traces", zap.Error(err)) + } + } + } + } + + // Logs + if r.nextLogs != nil { + if logs, err := r.createLogs(slice); err == nil { + if logs.LogRecordCount() > 0 { + err := r.nextLogs.ConsumeLogs(context.Background(), logs) + if err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } } } } @@ -161,26 +183,119 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(scopeName) + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if t, err := time.Parse(time.RFC3339, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + } + if level, ok := record["level"].(string); ok { + logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } + } + } + } + return log, nil +} + +func severityTextToNumber(severityText string) plog.SeverityNumber { + mapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, + "WARNING": plog.SeverityNumberWarn, + } + if ans, ok := mapping[strings.ToUpper(severityText)]; ok { + return ans + } else { + return plog.SeverityNumberUnspecified + } +} + +func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { + r.nextTraces = next +} + +func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) { + r.nextLogs = next +} + func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) { traceData := ptrace.NewTraces() rs := traceData.ResourceSpans().AppendEmpty() r.resource.CopyTo(rs.Resource()) ss := rs.ScopeSpans().AppendEmpty() - ss.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + ss.Scope().SetName(scopeName) span := ss.Spans().AppendEmpty() span.SetTraceID(newTraceID()) span.SetSpanID(newSpanID()) span.SetName("platform.initRuntimeDone") span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) - layout := "2006-01-02T15:04:05.000Z" - startTime, err := time.Parse(layout, start) + startTime, err := time.Parse(time.RFC3339, start) if err != nil { return ptrace.Traces{}, err } span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - endTime, err := time.Parse(layout, end) + endTime, err := time.Parse(time.RFC3339, end) if err != nil { return ptrace.Traces{}, err } @@ -190,7 +305,6 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace func newTelemetryAPIReceiver( cfg *Config, - next consumer.Traces, set receiver.Settings, ) (*telemetryAPIReceiver, error) { envResourceMap := map[string]string{ @@ -212,22 +326,36 @@ func newTelemetryAPIReceiver( r.Attributes().PutStr(resourceAttribute, val) } } + + subscribedTypes := []telemetryapi.EventType{} + for _, val := range cfg.Types { + switch val { + case "platform": + subscribedTypes = append(subscribedTypes, telemetryapi.Platform) + case "function": + subscribedTypes = append(subscribedTypes, telemetryapi.Function) + case "extension": + subscribedTypes = append(subscribedTypes, telemetryapi.Extension) + } + } + return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - nextConsumer: next, - extensionID: cfg.extensionID, - resource: r, + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, }, nil } -func listenOnAddress() string { +func listenOnAddress(port int) string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") var addr string if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort + addr = ":" + strconv.Itoa(port) } else { - addr = "sandbox.localdomain:" + defaultListenerPort + addr = "sandbox.localdomain:" + strconv.Itoa(port) } return addr diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 59344dd723..691d3fa7c1 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -19,11 +19,15 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" ) func TestListenOnAddress(t *testing.T) { @@ -34,7 +38,7 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, "sandbox.localdomain:4325", addr) }, }, @@ -42,7 +46,7 @@ func TestListenOnAddress(t *testing.T) { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress() + addr := listenOnAddress(4325) require.EqualValues(t, ":4325", addr) }, }, @@ -61,11 +65,17 @@ func (c *mockConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro return nil } +func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { + return nil +} + func (c *mockConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } func TestHandler(t *testing.T) { + t.Parallel() + testCases := []struct { desc string body string @@ -101,10 +111,10 @@ func TestHandler(t *testing.T) { consumer := mockConsumer{} r, err := newTelemetryAPIReceiver( &Config{}, - &consumer, receivertest.NewNopSettings(), ) require.NoError(t, err) + r.registerTracesConsumer(&consumer) req := httptest.NewRequest("POST", "http://localhost:53612/someevent", strings.NewReader(tc.body)) rec := httptest.NewRecorder() @@ -148,7 +158,6 @@ func TestCreatePlatformInitSpan(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( &Config{}, - nil, receivertest.NewNopSettings(), ) require.NoError(t, err) @@ -161,3 +170,225 @@ func TestCreatePlatformInitSpan(t *testing.T) { }) } } + +func TestCreateLogs(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedContainsRequestId bool + expectedRequestId string + expectedSeverityNumber plog.SeverityNumber + expectError bool + }{ + { + desc: "no slice", + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "Invalid Timestamp", + slice: []event{ + { + Time: "invalid", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectError: true, + }, + { + desc: "function text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "function json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + expectedSeverityText: "Info", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "extension json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Info", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension json anything", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "anything", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "Unspecified", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r, err := newTelemetryAPIReceiver( + &Config{}, + receivertest.NewNopSettings(), + ) + require.NoError(t, err) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, tc.expectedContainsRequestId, ok) + if ok { + require.Equal(t, tc.expectedRequestId, requestId.Str()) + } + require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) + require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} + +func TestSeverityTextToNumber(t *testing.T) { + t.Parallel() + + goldenMapping := map[string]plog.SeverityNumber{ + "TRACE": plog.SeverityNumberTrace, + "TRACE2": plog.SeverityNumberTrace2, + "TRACE3": plog.SeverityNumberTrace3, + "TRACE4": plog.SeverityNumberTrace4, + "DEBUG": plog.SeverityNumberDebug, + "DEBUG2": plog.SeverityNumberDebug2, + "DEBUG3": plog.SeverityNumberDebug3, + "DEBUG4": plog.SeverityNumberDebug4, + "INFO": plog.SeverityNumberInfo, + "INFO2": plog.SeverityNumberInfo2, + "INFO3": plog.SeverityNumberInfo3, + "INFO4": plog.SeverityNumberInfo4, + "WARN": plog.SeverityNumberWarn, + "WARN2": plog.SeverityNumberWarn2, + "WARN3": plog.SeverityNumberWarn3, + "WARN4": plog.SeverityNumberWarn4, + "ERROR": plog.SeverityNumberError, + "ERROR2": plog.SeverityNumberError2, + "ERROR3": plog.SeverityNumberError3, + "ERROR4": plog.SeverityNumberError4, + "FATAL": plog.SeverityNumberFatal, + "FATAL2": plog.SeverityNumberFatal2, + "FATAL3": plog.SeverityNumberFatal3, + "FATAL4": plog.SeverityNumberFatal4, + "CRITICAL": plog.SeverityNumberFatal, + "ALL": plog.SeverityNumberTrace, + "WARNING": plog.SeverityNumberWarn, + } + for level, number := range goldenMapping { + require.Equal(t, number, severityTextToNumber(level)) + } + + others := []string{"", "UNKNOWN", "other", "anything"} + for _, level := range others { + require.Equal(t, plog.SeverityNumberUnspecified, severityTextToNumber(level)) + } +} diff --git a/collector/receiver/telemetryapireceiver/testdata/config.yaml b/collector/receiver/telemetryapireceiver/testdata/config.yaml new file mode 100644 index 0000000000..ce43e718f9 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/testdata/config.yaml @@ -0,0 +1,32 @@ +telemetryapi: +telemetryapi/1: + port: 12345 +telemetryapi/2: + port: 12345 + types: ["platform"] +telemetryapi/3: + port: 12345 + types: ["function"] +telemetryapi/4: + port: 12345 + types: ["extension"] +telemetryapi/5: + port: 12345 + types: ["platform", "function"] +telemetryapi/6: + port: 12345 + types: ["platform", "extension"] +telemetryapi/7: + port: 12345 + types: ["function", "extension"] +telemetryapi/8: + port: 12345 + types: [] +telemetryapi/9: + port: 12345 + types: + - function + - extension +telemetryapi/10: + port: 12345 + types: [function, extension]