From e8a4a5ffebef30a050770d58b95a7d45acedb0c2 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 9 May 2024 16:00:57 -0700 Subject: [PATCH 01/14] Subscribed function and extension log --- collector/internal/telemetryapi/client.go | 4 ++-- collector/internal/telemetryapi/listener.go | 6 ++++-- collector/internal/telemetryapi/types.go | 6 +++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index db57573fc2..b5b001a86f 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -49,8 +49,8 @@ func NewClient(logger *zap.Logger) *Client { func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { eventTypes := []EventType{ Platform, - // Function, - // Extension, + Function, + Extension, } bufferingConfig := BufferingCfg{ diff --git a/collector/internal/telemetryapi/listener.go b/collector/internal/telemetryapi/listener.go index 8499d4e402..d078bbd53f 100644 --- a/collector/internal/telemetryapi/listener.go +++ b/collector/internal/telemetryapi/listener.go @@ -140,8 +140,10 @@ func (s *Listener) Wait(ctx context.Context, reqID string) error { continue } - if i.Record["requestId"] == reqID { - return nil + if record, ok := i.Record.(map[string]interface{}); ok { + if record["requestId"] == reqID { + return nil + } } } } diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 56ca237e6f..18ccb5c79f 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -87,7 +87,7 @@ type SubscribeRequest struct { } type Event struct { - Time string `json:"time"` - Type string `json:"type"` - Record map[string]any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` } From e983516dffa1f4256539ef75a998d4a5dbbb3678 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 10 May 2024 12:01:10 -0700 Subject: [PATCH 02/14] nasty --- collector/go.mod | 2 +- collector/go.sum | 2 + collector/internal/telemetryapi/client.go | 65 ++++- collector/internal/tools/go.mod | 4 +- collector/lambdacomponents/go.mod | 5 +- collector/lambdacomponents/go.sum | 1 + .../receiver/telemetryapireceiver/factory.go | 12 +- .../receiver/telemetryapireceiver/go.mod | 6 +- .../receiver/telemetryapireceiver/go.sum | 1 + .../receiver/telemetryapireceiver/logs.go | 250 ++++++++++++++++++ .../receiver/telemetryapireceiver/types.go | 6 +- 11 files changed, 342 insertions(+), 12 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/logs.go diff --git a/collector/go.mod b/collector/go.mod index c5f1a830ae..d028160c05 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -156,7 +156,7 @@ require ( go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.99.0 // indirect go.opentelemetry.io/collector/receiver v0.99.0 // indirect go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 // indirect - go.opentelemetry.io/collector/semconv v0.99.0 // indirect + go.opentelemetry.io/collector/semconv v0.100.0 // indirect go.opentelemetry.io/collector/service v0.99.0 // indirect go.opentelemetry.io/contrib/config v0.5.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect diff --git a/collector/go.sum b/collector/go.sum index 8d14bcd412..77f55e763b 100644 --- a/collector/go.sum +++ b/collector/go.sum @@ -595,6 +595,8 @@ go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 h1:bAICAUW2K67kL9NvN go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0/go.mod h1:0IUYqbqx2ZxxW3iYVHjW0ulA4bPkUJSLVCFJq4MR8FI= go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg= go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs= +go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/collector/service v0.99.0 h1:Ods9uVHAZb1PW1nTC5XOzC+lC1qrA+EVyt1NNzT8Uqk= go.opentelemetry.io/collector/service v0.99.0/go.mod h1:R5dcvSHaqdx3xojrq0rJdLyKeGZMke37ZWgzCbLhapQ= go.opentelemetry.io/contrib/config v0.5.0 h1:7jLbj1losnHOq1rarCVMEDrmkHWixEIJ11pDtT4KrGM= diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index b5b001a86f..f41b6c74e1 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -46,9 +46,9 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { +func (c *Client) SubscribeLogs(ctx context.Context, extensionID string, listenerURI string) (string, error) { eventTypes := []EventType{ - Platform, + // Platform, Function, Extension, } @@ -107,6 +107,67 @@ func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI return string(body), nil } +func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) { + eventTypes := []EventType{ + Platform, + // Function, + // Extension, + } + + bufferingConfig := BufferingCfg{ + MaxItems: 1000, + MaxBytes: 256 * 1024, + TimeoutMS: 25, + } + + destination := Destination{ + Protocol: HttpProto, + HttpMethod: HttpPost, + Encoding: JSON, + URI: URI(listenerURI), + } + + data, err := json.Marshal( + &SubscribeRequest{ + SchemaVersion: SchemaVersionLatest, + EventTypes: eventTypes, + BufferingCfg: bufferingConfig, + Destination: destination, + }) + + if err != nil { + return "", fmt.Errorf("Failed to marshal SubscribeRequest: %w", err) + } + + headers := make(map[string]string) + headers[lambdaAgentIdentifierHeaderKey] = extensionID + + c.logger.Info("Subscribing", zap.String("baseURL", c.baseURL)) + resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseURL, data, headers) + if err != nil { + c.logger.Error("Subscription failed", zap.Error(err)) + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusAccepted { + c.logger.Error("Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?", zap.Int("status_code", resp.StatusCode)) + } else if resp.StatusCode != http.StatusOK { + c.logger.Error("Subscription failed") + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("request to %s failed: %d[%s]: %w", c.baseURL, resp.StatusCode, resp.Status, err) + } + + return "", fmt.Errorf("request to %s failed: %d[%s] %s", c.baseURL, resp.StatusCode, resp.Status, string(body)) + } + + body, _ := io.ReadAll(resp.Body) + c.logger.Info("Subscription success", zap.String("response", string(body))) + + return string(body), nil +} + func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data)) if err != nil { diff --git a/collector/internal/tools/go.mod b/collector/internal/tools/go.mod index c4720ec5c7..b0a26c1ebf 100644 --- a/collector/internal/tools/go.mod +++ b/collector/internal/tools/go.mod @@ -1,6 +1,8 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/internal/tools -go 1.19 +go 1.21 + +toolchain go1.21.1 require ( github.com/client9/misspell v0.3.4 diff --git a/collector/lambdacomponents/go.mod b/collector/lambdacomponents/go.mod index 9dcee13dda..ad1f79f229 100644 --- a/collector/lambdacomponents/go.mod +++ b/collector/lambdacomponents/go.mod @@ -1,6 +1,7 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/lambdacomponents -go 1.21 +go 1.21.0 + toolchain go1.22.2 require ( @@ -128,7 +129,7 @@ require ( go.opentelemetry.io/collector/extension/auth v0.99.0 // indirect go.opentelemetry.io/collector/featuregate v1.6.0 // indirect go.opentelemetry.io/collector/pdata v1.6.0 // indirect - go.opentelemetry.io/collector/semconv v0.99.0 // indirect + go.opentelemetry.io/collector/semconv v0.100.0 // indirect go.opentelemetry.io/collector/service v0.99.0 // indirect go.opentelemetry.io/contrib/config v0.5.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect diff --git a/collector/lambdacomponents/go.sum b/collector/lambdacomponents/go.sum index 3784ca56da..16b80d3808 100644 --- a/collector/lambdacomponents/go.sum +++ b/collector/lambdacomponents/go.sum @@ -583,6 +583,7 @@ go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 h1:bAICAUW2K67kL9NvN go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0/go.mod h1:0IUYqbqx2ZxxW3iYVHjW0ulA4bPkUJSLVCFJq4MR8FI= go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg= go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/collector/service v0.99.0 h1:Ods9uVHAZb1PW1nTC5XOzC+lC1qrA+EVyt1NNzT8Uqk= go.opentelemetry.io/collector/service v0.99.0/go.mod h1:R5dcvSHaqdx3xojrq0rJdLyKeGZMke37ZWgzCbLhapQ= go.opentelemetry.io/contrib/config v0.5.0 h1:7jLbj1losnHOq1rarCVMEDrmkHWixEIJ11pDtT4KrGM= diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index c89dcbeac8..dadc0d96d6 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -39,7 +39,8 @@ func NewFactory(extensionID string) receiver.Factory { extensionID: extensionID, } }, - receiver.WithTraces(createTracesReceiver, stability)) + receiver.WithTraces(createTracesReceiver, stability), + receiver.WithLogs(createLogsReceiver, stability)) } func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) { @@ -50,3 +51,12 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r return newTelemetryAPIReceiver(cfg, next, params) } + +func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + + return newTelemetryAPILogsReceiver(cfg, next, params) +} diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 0b04a1a462..b4cb92178d 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -1,6 +1,8 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver -go 1.21 +go 1.21.0 + +toolchain go1.21.1 replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ @@ -12,7 +14,7 @@ require ( go.opentelemetry.io/collector/consumer v0.99.0 go.opentelemetry.io/collector/pdata v1.6.0 go.opentelemetry.io/collector/receiver v0.99.0 - go.opentelemetry.io/collector/semconv v0.99.0 + go.opentelemetry.io/collector/semconv v0.100.0 go.uber.org/zap v1.27.0 ) diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 8af8cad4b4..22606e3372 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -81,6 +81,7 @@ go.opentelemetry.io/collector/receiver v0.99.0 h1:NdYShaEaabxVBRQaxK/HcKqRGl1eUF go.opentelemetry.io/collector/receiver v0.99.0/go.mod h1:aU9ftU4FhdEY9/eREf86FWHmZHz8kufXchfpHrTTrn0= go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg= go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= go.opentelemetry.io/otel/exporters/prometheus v0.47.0 h1:OL6yk1Z/pEGdDnrBbxSsH+t4FY1zXfBRGd7bjwhlMLU= diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go new file mode 100644 index 0000000000..5d71481028 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -0,0 +1,250 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" + +import ( + "context" + "encoding/json" + "fmt" + "github.com/golang-collections/go-datastructures/queue" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + "go.uber.org/zap" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" +) + +const defaultLogsListenerPort = "4327" +const initialLogsQueueSize = 5 + +type telemetryAPILogsReceiver struct { + httpServer *http.Server + logger *zap.Logger + queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later + nextConsumer consumer.Logs + extensionID string + resource pcommon.Resource +} + +func (r *telemetryAPILogsReceiver) Start(ctx context.Context, host component.Host) error { + address := listenOnLogsAddress() + r.logger.Info("Listening for requests", zap.String("address", address)) + + mux := http.NewServeMux() + mux.HandleFunc("/", r.httpHandler) + r.httpServer = &http.Server{Addr: address, Handler: mux} + go func() { + _ = r.httpServer.ListenAndServe() + }() + + telemetryClient := telemetryapi.NewClient(r.logger) + _, err := telemetryClient.SubscribeLogs(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } + return nil +} + +func (r *telemetryAPILogsReceiver) Shutdown(ctx context.Context) error { + return nil +} + +//func newSpanID() pcommon.SpanID { +// var rngSeed int64 +// _ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed) +// randSource := rand.New(rand.NewSource(rngSeed)) +// sid := pcommon.SpanID{} +// _, _ = randSource.Read(sid[:]) +// return sid +//} +// +//func newTraceID() pcommon.TraceID { +// var rngSeed int64 +// _ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed) +// randSource := rand.New(rand.NewSource(rngSeed)) +// tid := pcommon.TraceID{} +// _, _ = randSource.Read(tid[:]) +// return tid +//} + +// httpHandler handles the requests coming from the Telemetry API. +// Everytime Telemetry API sends events, this function will read them from the response body +// and put into a synchronous queue to be dispatched later. +// Logging or printing besides the error cases below is not recommended if you have subscribed to +// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for +// the printed lines which may create an infinite loop. +func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http.Request) { + body, err := io.ReadAll(req.Body) + if err != nil { + r.logger.Error("error reading body", zap.Error(err)) + return + } + + var slice []event + if err := json.Unmarshal(body, &slice); err != nil { + r.logger.Error("error unmarshalling body", zap.Error(err)) + return + } + + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + + layout := "2006-01-02T15:04:05.000Z" + if time, err := time.Parse(layout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time)) + } + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if observedTime, err := time.Parse(layout, timestamp); err == nil { + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(observedTime)) + } + } + if level, ok := record["level"].(string); ok { + level = strings.ToUpper(level) + logRecord.SetSeverityText(level) + switch level { + case "TRACE": + logRecord.SetSeverityNumber(1) + case "TRACE2": + logRecord.SetSeverityNumber(2) + case "TRACE3": + logRecord.SetSeverityNumber(3) + case "TRACE4": + logRecord.SetSeverityNumber(4) + case "DEBUG": + logRecord.SetSeverityNumber(5) + case "DEBUG2": + logRecord.SetSeverityNumber(6) + case "DEBUG3": + logRecord.SetSeverityNumber(7) + case "DEBUG4": + logRecord.SetSeverityNumber(8) + case "INFO": + logRecord.SetSeverityNumber(9) + case "INFO2": + logRecord.SetSeverityNumber(10) + case "INFO3": + logRecord.SetSeverityNumber(11) + case "INFO4": + logRecord.SetSeverityNumber(12) + case "WARN": + logRecord.SetSeverityNumber(13) + case "WARN2": + logRecord.SetSeverityNumber(14) + case "WARN3": + logRecord.SetSeverityNumber(15) + case "WARN4": + logRecord.SetSeverityNumber(16) + case "ERROR": + logRecord.SetSeverityNumber(17) + case "ERROR2": + logRecord.SetSeverityNumber(18) + case "ERROR3": + logRecord.SetSeverityNumber(19) + case "ERROR4": + logRecord.SetSeverityNumber(20) + case "FATAL": + logRecord.SetSeverityNumber(21) + case "FATAL2": + logRecord.SetSeverityNumber(22) + case "FATAL3": + logRecord.SetSeverityNumber(23) + case "FATAL4": + logRecord.SetSeverityNumber(24) + default: + } + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + line := el.Record.(string) + logRecord.Body().SetStr(line) + } + } + if err = r.nextConsumer.ConsumeLogs(context.Background(), log); err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } + r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) + slice = nil +} + +func newTelemetryAPILogsReceiver( + cfg *Config, + next consumer.Logs, + set receiver.CreateSettings, +) (*telemetryAPILogsReceiver, error) { + envResourceMap := map[string]string{ + "AWS_LAMBDA_FUNCTION_MEMORY_SIZE": semconv.AttributeFaaSMaxMemory, + "AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion, + "AWS_REGION": semconv.AttributeFaaSInvokedRegion, + } + r := pcommon.NewResource() + r.Attributes().PutStr(semconv.AttributeFaaSInvokedProvider, semconv.AttributeFaaSInvokedProviderAWS) + if val, ok := os.LookupEnv("AWS_LAMBDA_FUNCTION_NAME"); ok { + r.Attributes().PutStr(semconv.AttributeServiceName, val) + r.Attributes().PutStr(semconv.AttributeFaaSName, val) + } else { + r.Attributes().PutStr(semconv.AttributeServiceName, "unknown_service") + } + + for env, resourceAttribute := range envResourceMap { + if val, ok := os.LookupEnv(env); ok { + r.Attributes().PutStr(resourceAttribute, val) + } + } + return &telemetryAPILogsReceiver{ + logger: set.Logger, + queue: queue.New(initialLogsQueueSize), + nextConsumer: next, + extensionID: cfg.extensionID, + resource: r, + }, nil +} + +func listenOnLogsAddress() string { + envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") + var addr string + if ok && envAwsLocal == "true" { + addr = ":" + defaultLogsListenerPort + } else { + addr = "sandbox.localdomain:" + defaultLogsListenerPort + } + + return addr +} diff --git a/collector/receiver/telemetryapireceiver/types.go b/collector/receiver/telemetryapireceiver/types.go index 40bbc6ff94..fdcb4e3f28 100644 --- a/collector/receiver/telemetryapireceiver/types.go +++ b/collector/receiver/telemetryapireceiver/types.go @@ -15,7 +15,7 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" type event struct { - Time string `json:"time"` - Type string `json:"type"` - Record map[string]any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record any `json:"record"` } From 456d81b33bee87e89dd2666ac351f031ce91e79e Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 10 May 2024 12:08:10 -0700 Subject: [PATCH 03/14] Updated --- collector/internal/telemetryapi/client.go | 2 +- .../receiver/telemetryapireceiver/logs.go | 19 +------------------ 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index f41b6c74e1..c969b97472 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -50,7 +50,7 @@ func (c *Client) SubscribeLogs(ctx context.Context, extensionID string, listener eventTypes := []EventType{ // Platform, Function, - Extension, + // Extension, // overkill } bufferingConfig := BufferingCfg{ diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index 5d71481028..0e2808b325 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -71,24 +71,6 @@ func (r *telemetryAPILogsReceiver) Shutdown(ctx context.Context) error { return nil } -//func newSpanID() pcommon.SpanID { -// var rngSeed int64 -// _ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed) -// randSource := rand.New(rand.NewSource(rngSeed)) -// sid := pcommon.SpanID{} -// _, _ = randSource.Read(sid[:]) -// return sid -//} -// -//func newTraceID() pcommon.TraceID { -// var rngSeed int64 -// _ = binary.Read(crand.Reader, binary.LittleEndian, &rngSeed) -// randSource := rand.New(rand.NewSource(rngSeed)) -// tid := pcommon.TraceID{} -// _, _ = randSource.Read(tid[:]) -// return tid -//} - // httpHandler handles the requests coming from the Telemetry API. // Everytime Telemetry API sends events, this function will read them from the response body // and put into a synchronous queue to be dispatched later. @@ -122,6 +104,7 @@ func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http. layout := "2006-01-02T15:04:05.000Z" if time, err := time.Parse(layout, el.Time); err == nil { logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time)) } if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function From 4b7eed60164b372859d552a8f22883f577207284 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Fri, 10 May 2024 13:57:10 -0700 Subject: [PATCH 04/14] Updated --- collector/internal/telemetryapi/client.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index c969b97472..489dca2229 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -27,8 +27,11 @@ import ( ) const ( + ApiVersion20220701 = "2022-07-01" + ApiVersionLatest = ApiVersion20220701 SchemaVersion20220701 = "2022-07-01" - SchemaVersionLatest = SchemaVersion20220701 + SchemaVersion20221213 = "2022-12-13" + SchemaVersionLatest = SchemaVersion20221213 lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier" ) @@ -42,7 +45,7 @@ func NewClient(logger *zap.Logger) *Client { return &Client{ logger: logger.Named("telemetryAPI.Client"), httpClient: &http.Client{}, - baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), SchemaVersionLatest), + baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), ApiVersionLatest), } } @@ -50,7 +53,7 @@ func (c *Client) SubscribeLogs(ctx context.Context, extensionID string, listener eventTypes := []EventType{ // Platform, Function, - // Extension, // overkill + // Extension, } bufferingConfig := BufferingCfg{ From e83093fb1acef6b4fdfbad9ae920d7298616481e Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 13 May 2024 21:43:39 -0700 Subject: [PATCH 05/14] Updated --- collector/internal/telemetryapi/client.go | 8 +- .../receiver/telemetryapireceiver/go.sum | 1 + .../receiver/telemetryapireceiver/logs.go | 88 +++++++---- .../telemetryapireceiver/logs_test.go | 142 ++++++++++++++++++ .../receiver/telemetryapireceiver/receiver.go | 2 +- 5 files changed, 205 insertions(+), 36 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/logs_test.go diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index 489dca2229..a7a45df455 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -49,13 +49,7 @@ func NewClient(logger *zap.Logger) *Client { } } -func (c *Client) SubscribeLogs(ctx context.Context, extensionID string, listenerURI string) (string, error) { - eventTypes := []EventType{ - // Platform, - Function, - // Extension, - } - +func (c *Client) SubscribeEvents(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { bufferingConfig := BufferingCfg{ MaxItems: 1000, MaxBytes: 256 * 1024, diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index 22606e3372..8a013c97ab 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -81,6 +81,7 @@ go.opentelemetry.io/collector/receiver v0.99.0 h1:NdYShaEaabxVBRQaxK/HcKqRGl1eUF go.opentelemetry.io/collector/receiver v0.99.0/go.mod h1:aU9ftU4FhdEY9/eREf86FWHmZHz8kufXchfpHrTTrn0= go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg= go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= +go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs= go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index 0e2808b325..65e6c5b0f5 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -1,17 +1,3 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( @@ -52,14 +38,20 @@ func (r *telemetryAPILogsReceiver) Start(ctx context.Context, host component.Hos r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() - mux.HandleFunc("/", r.httpHandler) + mux.HandleFunc("/function", r.httpFunctionHandler) + mux.HandleFunc("/platform", r.httpPlatformHandler) r.httpServer = &http.Server{Addr: address, Handler: mux} go func() { _ = r.httpServer.ListenAndServe() }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.SubscribeLogs(ctx, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform}, r.extensionID, fmt.Sprintf("http://%s/platform", address)) + if err != nil { + r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + return err + } + _, err = telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/function", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -71,13 +63,7 @@ func (r *telemetryAPILogsReceiver) Shutdown(ctx context.Context) error { return nil } -// httpHandler handles the requests coming from the Telemetry API. -// Everytime Telemetry API sends events, this function will read them from the response body -// and put into a synchronous queue to be dispatched later. -// Logging or printing besides the error cases below is not recommended if you have subscribed to -// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for -// the printed lines which may create an infinite loop. -func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http.Request) { +func (r *telemetryAPILogsReceiver) httpPlatformHandler(w http.ResponseWriter, req *http.Request) { body, err := io.ReadAll(req.Body) if err != nil { r.logger.Error("error reading body", zap.Error(err)) @@ -102,9 +88,54 @@ func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http. logRecord.Attributes().PutStr("type", el.Type) layout := "2006-01-02T15:04:05.000Z" - if time, err := time.Parse(layout, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time)) + if t, err := time.Parse(layout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) + } + logRecord.SetSeverityText("INFO") + logRecord.SetSeverityNumber(9) + if j, err := json.Marshal(el.Record); err == nil { + logRecord.Body().SetStr(string(j)) + } else { + r.logger.Error("error stringify record", zap.Error(err)) + } + + if err = r.nextConsumer.ConsumeLogs(context.Background(), log); err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } + } + r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) + slice = nil +} + +func (r *telemetryAPILogsReceiver) httpFunctionHandler(w http.ResponseWriter, req *http.Request) { + body, err := io.ReadAll(req.Body) + if err != nil { + r.logger.Error("error reading body", zap.Error(err)) + return + } + + var slice []event + if err := json.Unmarshal(body, &slice); err != nil { + r.logger.Error("error unmarshalling body", zap.Error(err)) + return + } + + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + r.resource.CopyTo(resourceLog.Resource()) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + + layout := "2006-01-02T15:04:05.000Z" + if t, err := time.Parse(layout, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) } if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function @@ -176,8 +207,9 @@ func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http. } } else { // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - line := el.Record.(string) - logRecord.Body().SetStr(line) + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) + } } } if err = r.nextConsumer.ConsumeLogs(context.Background(), log); err != nil { diff --git a/collector/receiver/telemetryapireceiver/logs_test.go b/collector/receiver/telemetryapireceiver/logs_test.go new file mode 100644 index 0000000000..b356753312 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/logs_test.go @@ -0,0 +1,142 @@ +package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestListenOnLogsAddress(t *testing.T) { + testCases := []struct { + desc string + testFunc func(*testing.T) + }{ + { + desc: "listen on address without AWS_SAM_LOCAL env variable", + testFunc: func(t *testing.T) { + addr := listenOnLogsAddress() + require.EqualValues(t, "sandbox.localdomain:4327", addr) + }, + }, + { + desc: "listen on address with AWS_SAM_LOCAL env variable", + testFunc: func(t *testing.T) { + t.Setenv("AWS_SAM_LOCAL", "true") + addr := listenOnLogsAddress() + require.EqualValues(t, ":4327", addr) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, tc.testFunc) + } +} + +//type mockConsumer struct { +// consumed int +//} + +//func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { +// return nil +//} + +// func (c *mockConsumer) Capabilities() consumer.Capabilities { +// return consumer.Capabilities{MutatesData: true} +// } +//func TestHandler(t *testing.T) { +// testCases := []struct { +// desc string +// body string +// expectedSpans int +// }{ +// { +// desc: "empty body", +// body: `{}`, +// }, +// { +// desc: "invalid json", +// body: `invalid json`, +// }, +// { +// desc: "valid event", +// body: `[{"time":"", "type":"", "record": {}}]`, +// }, +// { +// desc: "valid event", +// body: `[{"time":"", "type":"platform.initStart", "record": {}}]`, +// }, +// { +// desc: "valid start/end events", +// body: `[ +// {"time":"2006-01-02T15:04:04.000Z", "type":"platform.initStart", "record": {}}, +// {"time":"2006-01-02T15:04:05.000Z", "type":"platform.initRuntimeDone", "record": {}} +// ]`, +// expectedLogs: 1, +// }, +// } +// for _, tc := range testCases { +// t.Run(tc.desc, func(t *testing.T) { +// consumer := mockConsumer{} +// r, err := newTelemetryAPIReceiver( +// &Config{}, +// &consumer, +// receivertest.NewNopCreateSettings(), +// ) +// require.NoError(t, err) +// req := httptest.NewRequest("POST", +// "http://localhost:53612/someevent", strings.NewReader(tc.body)) +// rec := httptest.NewRecorder() +// r.httpHandler(rec, req) +// require.Equal(t, tc.expectedSpans, consumer.consumed) +// }) +// } +//} + +// +//func TestCreatePlatformInitSpan(t *testing.T) { +// testCases := []struct { +// desc string +// start string +// end string +// expected int +// expectError bool +// }{ +// { +// desc: "no start/end times", +// expectError: true, +// }, +// { +// desc: "no end time", +// start: "2006-01-02T15:04:05.000Z", +// expectError: true, +// }, +// { +// desc: "no start times", +// end: "2006-01-02T15:04:05.000Z", +// expectError: true, +// }, +// { +// desc: "valid times", +// start: "2006-01-02T15:04:04.000Z", +// end: "2006-01-02T15:04:05.000Z", +// expected: 1, +// expectError: false, +// }, +// } +// for _, tc := range testCases { +// t.Run(tc.desc, func(t *testing.T) { +// r, err := newTelemetryAPIReceiver( +// &Config{}, +// nil, +// receivertest.NewNopCreateSettings(), +// ) +// require.NoError(t, err) +// td, err := r.createPlatformInitSpan(tc.start, tc.end) +// if tc.expectError { +// require.Error(t, err) +// } else { +// require.Equal(t, tc.expected, td.SpanCount()) +// } +// }) +// } +//} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 4df8b7764d..52510c0441 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -32,7 +32,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" From c25a8a59103a15eb1d36ea63ee455462b4c91ca9 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Tue, 14 May 2024 09:56:45 -0700 Subject: [PATCH 06/14] Fix timestamp --- collector/receiver/telemetryapireceiver/logs.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index 65e6c5b0f5..bf376ad2e4 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -89,8 +89,8 @@ func (r *telemetryAPILogsReceiver) httpPlatformHandler(w http.ResponseWriter, re layout := "2006-01-02T15:04:05.000Z" if t, err := time.Parse(layout, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) } logRecord.SetSeverityText("INFO") logRecord.SetSeverityNumber(9) @@ -134,14 +134,14 @@ func (r *telemetryAPILogsReceiver) httpFunctionHandler(w http.ResponseWriter, re layout := "2006-01-02T15:04:05.000Z" if t, err := time.Parse(layout, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) } if record, ok := el.Record.(map[string]interface{}); ok { // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { if observedTime, err := time.Parse(layout, timestamp); err == nil { - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(observedTime)) + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) } } if level, ok := record["level"].(string); ok { From 902dca0c956ff37b894cb82749b1c0990605decb Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Tue, 14 May 2024 14:36:08 -0700 Subject: [PATCH 07/14] Reverted --- collector/internal/telemetryapi/listener.go | 6 ++---- collector/internal/telemetryapi/types.go | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/collector/internal/telemetryapi/listener.go b/collector/internal/telemetryapi/listener.go index d078bbd53f..8499d4e402 100644 --- a/collector/internal/telemetryapi/listener.go +++ b/collector/internal/telemetryapi/listener.go @@ -140,10 +140,8 @@ func (s *Listener) Wait(ctx context.Context, reqID string) error { continue } - if record, ok := i.Record.(map[string]interface{}); ok { - if record["requestId"] == reqID { - return nil - } + if i.Record["requestId"] == reqID { + return nil } } } diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 18ccb5c79f..56ca237e6f 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -87,7 +87,7 @@ type SubscribeRequest struct { } type Event struct { - Time string `json:"time"` - Type string `json:"type"` - Record any `json:"record"` + Time string `json:"time"` + Type string `json:"type"` + Record map[string]any `json:"record"` } From ea56f469affbc15be26cf96e46abc6a6bd23c985 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Tue, 14 May 2024 14:38:51 -0700 Subject: [PATCH 08/14] Refactor --- collector/internal/telemetryapi/client.go | 54 +---------------------- 1 file changed, 1 insertion(+), 53 deletions(-) diff --git a/collector/internal/telemetryapi/client.go b/collector/internal/telemetryapi/client.go index a7a45df455..6ab172f3c9 100644 --- a/collector/internal/telemetryapi/client.go +++ b/collector/internal/telemetryapi/client.go @@ -110,59 +110,7 @@ func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI // Function, // Extension, } - - bufferingConfig := BufferingCfg{ - MaxItems: 1000, - MaxBytes: 256 * 1024, - TimeoutMS: 25, - } - - destination := Destination{ - Protocol: HttpProto, - HttpMethod: HttpPost, - Encoding: JSON, - URI: URI(listenerURI), - } - - data, err := json.Marshal( - &SubscribeRequest{ - SchemaVersion: SchemaVersionLatest, - EventTypes: eventTypes, - BufferingCfg: bufferingConfig, - Destination: destination, - }) - - if err != nil { - return "", fmt.Errorf("Failed to marshal SubscribeRequest: %w", err) - } - - headers := make(map[string]string) - headers[lambdaAgentIdentifierHeaderKey] = extensionID - - c.logger.Info("Subscribing", zap.String("baseURL", c.baseURL)) - resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseURL, data, headers) - if err != nil { - c.logger.Error("Subscription failed", zap.Error(err)) - return "", err - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusAccepted { - c.logger.Error("Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?", zap.Int("status_code", resp.StatusCode)) - } else if resp.StatusCode != http.StatusOK { - c.logger.Error("Subscription failed") - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("request to %s failed: %d[%s]: %w", c.baseURL, resp.StatusCode, resp.Status, err) - } - - return "", fmt.Errorf("request to %s failed: %d[%s] %s", c.baseURL, resp.StatusCode, resp.Status, string(body)) - } - - body, _ := io.ReadAll(resp.Body) - c.logger.Info("Subscription success", zap.String("response", string(body))) - - return string(body), nil + return c.SubscribeEvents(ctx, eventTypes, extensionID, listenerURI) } func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) { From 9f7b2528cc362155f2043d1e8a61336b80b41eed Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Tue, 14 May 2024 16:06:27 -0700 Subject: [PATCH 09/14] Updated --- collector/config.yaml | 5 +++++ collector/receiver/telemetryapireceiver/logs.go | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/collector/config.yaml b/collector/config.yaml index e46947e7f7..0016926f2d 100644 --- a/collector/config.yaml +++ b/collector/config.yaml @@ -10,6 +10,7 @@ receivers: endpoint: "localhost:4317" http: endpoint: "localhost:4318" + telemetryapi: processors: decouple: @@ -42,6 +43,10 @@ service: receivers: [otlp] processors: [resource,resourcedetection,decouple] exporters: [otlp,logging] + logs: + receivers: [telemetryapi] + processors: [resource,resourcedetection,decouple] + exporters: [otlp,logging] telemetry: metrics: address: localhost:8888 diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index bf376ad2e4..5ae4aeb3d1 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -46,7 +46,8 @@ func (r *telemetryAPILogsReceiver) Start(ctx context.Context, host component.Hos }() telemetryClient := telemetryapi.NewClient(r.logger) - _, err := telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform}, r.extensionID, fmt.Sprintf("http://%s/platform", address)) + var err error + _, err = telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform}, r.extensionID, fmt.Sprintf("http://%s/platform", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err From f649a7115f5c0ee5db48ca7b51908fbfed5aa7de Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Wed, 15 May 2024 10:46:44 -0700 Subject: [PATCH 10/14] refactored --- collector/config.yaml | 3 + .../receiver/telemetryapireceiver/logs.go | 223 ++++++++---------- 2 files changed, 95 insertions(+), 131 deletions(-) diff --git a/collector/config.yaml b/collector/config.yaml index 0016926f2d..0a809356ab 100644 --- a/collector/config.yaml +++ b/collector/config.yaml @@ -23,6 +23,9 @@ processors: - key: sw.cloud.aws.resource.type value: "Lambda" action: upsert + - key: sw.data.module + value: "apm" + action: upsert exporters: logging: diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index 5ae4aeb3d1..36c841ae5e 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -4,6 +4,12 @@ import ( "context" "encoding/json" "fmt" + "io" + "net/http" + "os" + "strings" + "time" + "github.com/golang-collections/go-datastructures/queue" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -12,17 +18,13 @@ import ( "go.opentelemetry.io/collector/receiver" semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" - "io" - "net/http" - "os" - "strings" - "time" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) const defaultLogsListenerPort = "4327" const initialLogsQueueSize = 5 +const timeFormatLayout = "2006-01-02T15:04:05.000Z" type telemetryAPILogsReceiver struct { httpServer *http.Server @@ -38,21 +40,14 @@ func (r *telemetryAPILogsReceiver) Start(ctx context.Context, host component.Hos r.logger.Info("Listening for requests", zap.String("address", address)) mux := http.NewServeMux() - mux.HandleFunc("/function", r.httpFunctionHandler) - mux.HandleFunc("/platform", r.httpPlatformHandler) + mux.HandleFunc("/", r.httpHandler) r.httpServer = &http.Server{Addr: address, Handler: mux} go func() { _ = r.httpServer.ListenAndServe() }() telemetryClient := telemetryapi.NewClient(r.logger) - var err error - _, err = telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform}, r.extensionID, fmt.Sprintf("http://%s/platform", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err - } - _, err = telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/function", address)) + _, err := telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) return err @@ -64,7 +59,7 @@ func (r *telemetryAPILogsReceiver) Shutdown(ctx context.Context) error { return nil } -func (r *telemetryAPILogsReceiver) httpPlatformHandler(w http.ResponseWriter, req *http.Request) { +func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http.Request) { body, err := io.ReadAll(req.Body) if err != nil { r.logger.Error("error reading body", zap.Error(err)) @@ -82,134 +77,100 @@ func (r *telemetryAPILogsReceiver) httpPlatformHandler(w http.ResponseWriter, re r.resource.CopyTo(resourceLog.Resource()) scopeLog := resourceLog.ScopeLogs().AppendEmpty() scopeLog.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") - for _, el := range slice { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() logRecord.Attributes().PutStr("type", el.Type) - - layout := "2006-01-02T15:04:05.000Z" - if t, err := time.Parse(layout, el.Time); err == nil { + if t, err := time.Parse(timeFormatLayout, el.Time); err == nil { logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } - logRecord.SetSeverityText("INFO") - logRecord.SetSeverityNumber(9) - if j, err := json.Marshal(el.Record); err == nil { - logRecord.Body().SetStr(string(j)) } else { - r.logger.Error("error stringify record", zap.Error(err)) + r.logger.Error("error parsing time", zap.Error(err)) } - - if err = r.nextConsumer.ConsumeLogs(context.Background(), log); err != nil { - r.logger.Error("error receiving logs", zap.Error(err)) - } - } - r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) - slice = nil -} - -func (r *telemetryAPILogsReceiver) httpFunctionHandler(w http.ResponseWriter, req *http.Request) { - body, err := io.ReadAll(req.Body) - if err != nil { - r.logger.Error("error reading body", zap.Error(err)) - return - } - - var slice []event - if err := json.Unmarshal(body, &slice); err != nil { - r.logger.Error("error unmarshalling body", zap.Error(err)) - return - } - - log := plog.NewLogs() - resourceLog := log.ResourceLogs().AppendEmpty() - r.resource.CopyTo(resourceLog.Resource()) - scopeLog := resourceLog.ScopeLogs().AppendEmpty() - scopeLog.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") - - for _, el := range slice { - r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) - logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Attributes().PutStr("type", el.Type) - - layout := "2006-01-02T15:04:05.000Z" - if t, err := time.Parse(layout, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } - if record, ok := el.Record.(map[string]interface{}); ok { - // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if timestamp, ok := record["timestamp"].(string); ok { - if observedTime, err := time.Parse(layout, timestamp); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { + if record, ok := el.Record.(map[string]interface{}); ok { + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + } } - } - if level, ok := record["level"].(string); ok { - level = strings.ToUpper(level) - logRecord.SetSeverityText(level) - switch level { - case "TRACE": - logRecord.SetSeverityNumber(1) - case "TRACE2": - logRecord.SetSeverityNumber(2) - case "TRACE3": - logRecord.SetSeverityNumber(3) - case "TRACE4": - logRecord.SetSeverityNumber(4) - case "DEBUG": - logRecord.SetSeverityNumber(5) - case "DEBUG2": - logRecord.SetSeverityNumber(6) - case "DEBUG3": - logRecord.SetSeverityNumber(7) - case "DEBUG4": - logRecord.SetSeverityNumber(8) - case "INFO": - logRecord.SetSeverityNumber(9) - case "INFO2": - logRecord.SetSeverityNumber(10) - case "INFO3": - logRecord.SetSeverityNumber(11) - case "INFO4": - logRecord.SetSeverityNumber(12) - case "WARN": - logRecord.SetSeverityNumber(13) - case "WARN2": - logRecord.SetSeverityNumber(14) - case "WARN3": - logRecord.SetSeverityNumber(15) - case "WARN4": - logRecord.SetSeverityNumber(16) - case "ERROR": - logRecord.SetSeverityNumber(17) - case "ERROR2": - logRecord.SetSeverityNumber(18) - case "ERROR3": - logRecord.SetSeverityNumber(19) - case "ERROR4": - logRecord.SetSeverityNumber(20) - case "FATAL": - logRecord.SetSeverityNumber(21) - case "FATAL2": - logRecord.SetSeverityNumber(22) - case "FATAL3": - logRecord.SetSeverityNumber(23) - case "FATAL4": - logRecord.SetSeverityNumber(24) - default: + if level, ok := record["level"].(string); ok { + level = strings.ToUpper(level) + logRecord.SetSeverityText(level) + switch level { + case "TRACE": + logRecord.SetSeverityNumber(1) + case "TRACE2": + logRecord.SetSeverityNumber(2) + case "TRACE3": + logRecord.SetSeverityNumber(3) + case "TRACE4": + logRecord.SetSeverityNumber(4) + case "DEBUG": + logRecord.SetSeverityNumber(5) + case "DEBUG2": + logRecord.SetSeverityNumber(6) + case "DEBUG3": + logRecord.SetSeverityNumber(7) + case "DEBUG4": + logRecord.SetSeverityNumber(8) + case "INFO": + logRecord.SetSeverityNumber(9) + case "INFO2": + logRecord.SetSeverityNumber(10) + case "INFO3": + logRecord.SetSeverityNumber(11) + case "INFO4": + logRecord.SetSeverityNumber(12) + case "WARN": + logRecord.SetSeverityNumber(13) + case "WARN2": + logRecord.SetSeverityNumber(14) + case "WARN3": + logRecord.SetSeverityNumber(15) + case "WARN4": + logRecord.SetSeverityNumber(16) + case "ERROR": + logRecord.SetSeverityNumber(17) + case "ERROR2": + logRecord.SetSeverityNumber(18) + case "ERROR3": + logRecord.SetSeverityNumber(19) + case "ERROR4": + logRecord.SetSeverityNumber(20) + case "FATAL": + logRecord.SetSeverityNumber(21) + case "FATAL2": + logRecord.SetSeverityNumber(22) + case "FATAL3": + logRecord.SetSeverityNumber(23) + case "FATAL4": + logRecord.SetSeverityNumber(24) + default: + } + } + if requestId, ok := record["requestId"].(string); ok { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) } - } - if requestId, ok := record["requestId"].(string); ok { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - } - if line, ok := record["message"].(string); ok { - logRecord.Body().SetStr(line) } } else { - // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if line, ok := el.Record.(string); ok { - logRecord.Body().SetStr(line) + logRecord.SetSeverityText("INFO") + logRecord.SetSeverityNumber(9) + if j, err := json.Marshal(el.Record); err == nil { + logRecord.Body().SetStr(string(j)) + } else { + r.logger.Error("error stringify record", zap.Error(err)) } } } From 785a728975d5a2251f84710ab53bccca8fe022c0 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Wed, 15 May 2024 20:22:32 -0700 Subject: [PATCH 11/14] polish --- .../receiver/telemetryapireceiver/logs.go | 18 +- .../telemetryapireceiver/logs_test.go | 457 ++++++++++++++---- 2 files changed, 365 insertions(+), 110 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index 36c841ae5e..c9b208a516 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -72,6 +72,18 @@ func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http. return } + if logs, err := r.createLogs(slice); err == nil { + err := r.nextConsumer.ConsumeLogs(context.Background(), logs) + if err != nil { + r.logger.Error("error receiving logs", zap.Error(err)) + } + } + + r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) + slice = nil +} + +func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() r.resource.CopyTo(resourceLog.Resource()) @@ -174,11 +186,7 @@ func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http. } } } - if err = r.nextConsumer.ConsumeLogs(context.Background(), log); err != nil { - r.logger.Error("error receiving logs", zap.Error(err)) - } - r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len())) - slice = nil + return log, nil } func newTelemetryAPILogsReceiver( diff --git a/collector/receiver/telemetryapireceiver/logs_test.go b/collector/receiver/telemetryapireceiver/logs_test.go index b356753312..fb0afbbc2a 100644 --- a/collector/receiver/telemetryapireceiver/logs_test.go +++ b/collector/receiver/telemetryapireceiver/logs_test.go @@ -1,9 +1,15 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" import ( + "context" + "net/http/httptest" + "strings" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver/receivertest" ) func TestListenOnLogsAddress(t *testing.T) { @@ -32,111 +38,352 @@ func TestListenOnLogsAddress(t *testing.T) { } } -//type mockConsumer struct { -// consumed int -//} +type mockLogsConsumer struct { + consumed int +} -//func (c *mockConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { -// return nil -//} +func (c *mockLogsConsumer) ConsumeLogs(ctx context.Context, td plog.Logs) error { + c.consumed += td.LogRecordCount() + return nil +} -// func (c *mockConsumer) Capabilities() consumer.Capabilities { -// return consumer.Capabilities{MutatesData: true} -// } -//func TestHandler(t *testing.T) { -// testCases := []struct { -// desc string -// body string -// expectedSpans int -// }{ -// { -// desc: "empty body", -// body: `{}`, -// }, -// { -// desc: "invalid json", -// body: `invalid json`, -// }, -// { -// desc: "valid event", -// body: `[{"time":"", "type":"", "record": {}}]`, -// }, -// { -// desc: "valid event", -// body: `[{"time":"", "type":"platform.initStart", "record": {}}]`, -// }, -// { -// desc: "valid start/end events", -// body: `[ -// {"time":"2006-01-02T15:04:04.000Z", "type":"platform.initStart", "record": {}}, -// {"time":"2006-01-02T15:04:05.000Z", "type":"platform.initRuntimeDone", "record": {}} -// ]`, -// expectedLogs: 1, -// }, -// } -// for _, tc := range testCases { -// t.Run(tc.desc, func(t *testing.T) { -// consumer := mockConsumer{} -// r, err := newTelemetryAPIReceiver( -// &Config{}, -// &consumer, -// receivertest.NewNopCreateSettings(), -// ) -// require.NoError(t, err) -// req := httptest.NewRequest("POST", -// "http://localhost:53612/someevent", strings.NewReader(tc.body)) -// rec := httptest.NewRecorder() -// r.httpHandler(rec, req) -// require.Equal(t, tc.expectedSpans, consumer.consumed) -// }) -// } -//} +func (c *mockLogsConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} -// -//func TestCreatePlatformInitSpan(t *testing.T) { -// testCases := []struct { -// desc string -// start string -// end string -// expected int -// expectError bool -// }{ -// { -// desc: "no start/end times", -// expectError: true, -// }, -// { -// desc: "no end time", -// start: "2006-01-02T15:04:05.000Z", -// expectError: true, -// }, -// { -// desc: "no start times", -// end: "2006-01-02T15:04:05.000Z", -// expectError: true, -// }, -// { -// desc: "valid times", -// start: "2006-01-02T15:04:04.000Z", -// end: "2006-01-02T15:04:05.000Z", -// expected: 1, -// expectError: false, -// }, -// } -// for _, tc := range testCases { -// t.Run(tc.desc, func(t *testing.T) { -// r, err := newTelemetryAPIReceiver( -// &Config{}, -// nil, -// receivertest.NewNopCreateSettings(), -// ) -// require.NoError(t, err) -// td, err := r.createPlatformInitSpan(tc.start, tc.end) -// if tc.expectError { -// require.Error(t, err) -// } else { -// require.Equal(t, tc.expected, td.SpanCount()) -// } -// }) -// } -//} +func TestLogsHandler(t *testing.T) { + testCases := []struct { + desc string + body string + expectedLogs int + }{ + { + desc: "empty body", + body: `{}`, + expectedLogs: 0, + }, + { + desc: "invalid json", + body: `invalid json`, + expectedLogs: 0, + }, + { + desc: "valid event", + body: `[{"time":"", "type":"", "record": {}}]`, + expectedLogs: 1, + }, + { + desc: "valid event", + body: `[{"time":"", "type":"platform.initStart", "record": {}}]`, + expectedLogs: 1, + }, + { + desc: "platform.initStart", + body: `[ + { + "time": "2024-05-15T18:10:29.635Z", + "type": "platform.initStart", + "record": { + "functionName": "opentelemetry-lambda-nodejs-experimental-arm64", + "functionVersion": "$LATEST", + "initializationType": "on-demand", + "phase": "init", + "runtimeVersion": "nodejs:20.v22", + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:da57c20c4b965d5b75540f6865a35fc8030358e33ec44ecfed33e90901a27a72" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.telemetrySubscription", + body: `[ + { + "time": "2024-05-15T18:10:30.010Z", + "type": "platform.telemetrySubscription", + "record": { + "name": "collector", + "state": "Subscribed", + "types": [ + "platform" + ] + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.telemetrySubscription", + body: `[ + { + "time": "2024-05-15T18:10:30.511Z", + "type": "platform.telemetrySubscription", + "record": { + "name": "collector", + "state": "Subscribed", + "types": [ + "platform", + "function" + ] + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.initRuntimeDone", + body: `[ + { + "time": "2024-05-15T23:58:26.857Z", + "type": "platform.initRuntimeDone", + "record": { + "initializationType": "on-demand", + "phase": "init", + "status": "success" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.extension", + body: `[ + { + "time": "2024-05-15T23:58:26.857Z", + "type": "platform.extension", + "record": { + "events": [ + "INVOKE", + "SHUTDOWN" + ], + "name": "collector", + "state": "Ready" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.initReport", + body: `[ + { + "time": "2024-05-15T23:58:26.858Z", + "type": "platform.initReport", + "record": { + "initializationType": "on-demand", + "metrics": { + "durationMs": 1819.081 + }, + "phase": "init", + "status": "success" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.runtimeDone", + body: `[ + { + "time": "2024-05-15T23:58:35.063Z", + "type": "platform.runtimeDone", + "record": { + "metrics": { + "durationMs": 8202.659, + "producedBytes": 50 + }, + "requestId": "882e9658-570e-4b2f-aaa8-5dfb88f7eccb", + "spans": [ + { + "durationMs": 8200.68, + "name": "responseLatency", + "start": "2024-05-15T23:58:26.860Z" + }, + { + "durationMs": 0.226, + "name": "responseDuration", + "start": "2024-05-15T23:58:35.061Z" + }, + { + "durationMs": 1.502, + "name": "runtimeOverhead", + "start": "2024-05-15T23:58:35.061Z" + } + ], + "status": "success" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.report", + body: `[ + { + "time": "2024-05-15T23:58:39.317Z", + "type": "platform.report", + "record": { + "metrics": { + "billedDurationMs": 12456, + "durationMs": 12455.155, + "initDurationMs": 1819.881, + "maxMemoryUsedMB": 128, + "memorySizeMB": 128 + }, + "requestId": "882e9658-570e-4b2f-aaa8-5dfb88f7eccb", + "status": "success" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.start", + body: `[ + { + "time": "2024-05-15T23:58:41.153Z", + "type": "platform.start", + "record": { + "requestId": "15b0ebbb-5cf8-49e2-8cbe-1d58a18330d2", + "version": "$LATEST" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.restoreStart", + body: `[ + { + "time": "2022-10-12T00:00:15.064Z", + "type": "platform.restoreStart", + "record": { + "runtimeVersion": "nodejs-14.v3", + "runtimeVersionArn": "arn", + "functionName": "myFunction", + "functionVersion": "$LATEST", + "instanceId": "82561ce0-53dd-47d1-90e0-c8f5e063e62e", + "instanceMaxMemory": 256 + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.restoreRuntimeDone", + body: `[ + { + "time": "2022-10-12T00:00:15.064Z", + "type": "platform.restoreRuntimeDone", + "record": { + "status": "success", + "spans": [ + { + "name": "someTimeSpan", + "start": "2022-08-02T12:01:23:521Z", + "durationMs": 80.0 + } + ] + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.restoreReport", + body: `[ + { + "time": "2022-10-12T00:00:15.064Z", + "type": "platform.restoreReport", + "record": { + "status": "success", + "metrics": { + "durationMs": 15.19 + }, + "spans": [ + { + "name": "someTimeSpan", + "start": "2022-08-02T12:01:23:521Z", + "durationMs": 30.0 + } + ] + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "platform.logsDropped", + body: `[ + { + "time": "2022-10-12T00:02:35.000Z", + "type": "platform.logsDropped", + "record": { + "droppedBytes": 12345, + "droppedRecords": 123, + "reason": "Some logs were dropped because the downstream consumer is slower than the logs production rate" + } + } + ]`, + expectedLogs: 1, + }, + { + desc: "function", + body: `[ + { + "time": "2024-05-15T23:59:20.159Z", + "type": "function", + "record": "2024-05-15T23:59:20.159Z\t8c181f94-d34c-4c65-abed-6977e17dd06b\tWARN\twarn from console\n" + }, + { + "time": "2022-10-12T00:03:50.000Z", + "type": "function", + "record": { + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!" + } + } + ]`, + expectedLogs: 2, + }, + { + desc: "extension", + body: `[ + { + "time": "2022-10-12T00:03:50.000Z", + "type": "extension", + "record": "[INFO] Hello world, I am an extension!" + }, + { + "time": "2022-10-12T00:03:50.000Z", + "type": "extension", + "record": { + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am an extension!" + } + } + ]`, + expectedLogs: 2, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + consumer := mockLogsConsumer{} + r, err := newTelemetryAPILogsReceiver( + &Config{}, + &consumer, + receivertest.NewNopCreateSettings(), + ) + require.NoError(t, err) + req := httptest.NewRequest("POST", + "http://localhost:53612/someevent", strings.NewReader(tc.body)) + rec := httptest.NewRecorder() + r.httpHandler(rec, req) + require.Equal(t, tc.expectedLogs, consumer.consumed) + }) + } +} From 0fe0bd0b10d4132bef82d69a11bc514c47e6bb40 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 16 May 2024 11:18:52 -0700 Subject: [PATCH 12/14] Added 1 more test case --- .../receiver/telemetryapireceiver/logs.go | 6 +- .../telemetryapireceiver/logs_test.go | 168 ++++++++++++++++++ 2 files changed, 173 insertions(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/logs.go b/collector/receiver/telemetryapireceiver/logs.go index c9b208a516..4dabad9996 100644 --- a/collector/receiver/telemetryapireceiver/logs.go +++ b/collector/receiver/telemetryapireceiver/logs.go @@ -25,6 +25,7 @@ import ( const defaultLogsListenerPort = "4327" const initialLogsQueueSize = 5 const timeFormatLayout = "2006-01-02T15:04:05.000Z" +const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" type telemetryAPILogsReceiver struct { httpServer *http.Server @@ -88,7 +89,7 @@ func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) resourceLog := log.ResourceLogs().AppendEmpty() r.resource.CopyTo(resourceLog.Resource()) scopeLog := resourceLog.ScopeLogs().AppendEmpty() - scopeLog.Scope().SetName("github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi") + scopeLog.Scope().SetName(scopeName) for _, el := range slice { r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) logRecord := scopeLog.LogRecords().AppendEmpty() @@ -98,6 +99,7 @@ func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) } else { r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err } if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { if record, ok := el.Record.(map[string]interface{}); ok { @@ -107,6 +109,7 @@ func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime)) } else { r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err } } if level, ok := record["level"].(string); ok { @@ -183,6 +186,7 @@ func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) logRecord.Body().SetStr(string(j)) } else { r.logger.Error("error stringify record", zap.Error(err)) + return plog.Logs{}, err } } } diff --git a/collector/receiver/telemetryapireceiver/logs_test.go b/collector/receiver/telemetryapireceiver/logs_test.go index fb0afbbc2a..7a38ba9b8b 100644 --- a/collector/receiver/telemetryapireceiver/logs_test.go +++ b/collector/receiver/telemetryapireceiver/logs_test.go @@ -2,9 +2,11 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" + "go.opentelemetry.io/collector/pdata/pcommon" "net/http/httptest" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" @@ -387,3 +389,169 @@ func TestLogsHandler(t *testing.T) { }) } } + +func TestCreateLogs(t *testing.T) { + testCases := []struct { + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedSeverityNumber plog.SeverityNumber + expectError bool + }{ + { + desc: "no slice", + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "Invalid Timestamp", + slice: []event{ + { + Time: "invalid", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectError: true, + }, + { + desc: "function text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "function json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "function", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am a function!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "extension text", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: "[INFO] Hello world, I am an extension!", + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, + }, + { + desc: "extension json", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "extension", + Record: map[string]any{ + "timestamp": "2022-10-12T00:03:50.000Z", + "level": "INFO", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "message": "Hello world, I am an extension!", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + { + desc: "platform.initReport", + slice: []event{ + { + Time: "2024-05-15T23:58:26.858Z", + Type: "platform.initReport", + Record: map[string]any{ + "initializationType": "on-demand", + "metrics": map[string]any{ + "durationMs": 1819.081, + }, + "phase": "init", + "status": "success", + }, + }, + }, + expectedLogRecords: 1, + expectedType: "platform.initReport", + expectedTimestamp: "2024-05-15T23:58:26.858Z", + expectedBody: "{\"initializationType\":\"on-demand\",\"metrics\":{\"durationMs\":1819.081},\"phase\":\"init\",\"status\":\"success\"}", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r, err := newTelemetryAPILogsReceiver( + &Config{}, + nil, + receivertest.NewNopCreateSettings(), + ) + require.NoError(t, err) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) + require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} From eb58dcda2e3a2a4a58834b09f85322c9f7235539 Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 16 May 2024 11:19:49 -0700 Subject: [PATCH 13/14] nits --- collector/receiver/telemetryapireceiver/logs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/receiver/telemetryapireceiver/logs_test.go b/collector/receiver/telemetryapireceiver/logs_test.go index 7a38ba9b8b..dc56e48cbf 100644 --- a/collector/receiver/telemetryapireceiver/logs_test.go +++ b/collector/receiver/telemetryapireceiver/logs_test.go @@ -2,7 +2,6 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" - "go.opentelemetry.io/collector/pdata/pcommon" "net/http/httptest" "strings" "testing" @@ -10,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" ) From 5e4a40b69a6472aafc7e9b08b2c935a0acfc2b0b Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Thu, 16 May 2024 11:35:29 -0700 Subject: [PATCH 14/14] Updated --- .../telemetryapireceiver/logs_test.go | 105 ++++++++++-------- 1 file changed, 60 insertions(+), 45 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/logs_test.go b/collector/receiver/telemetryapireceiver/logs_test.go index dc56e48cbf..1449b13087 100644 --- a/collector/receiver/telemetryapireceiver/logs_test.go +++ b/collector/receiver/telemetryapireceiver/logs_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" ) func TestListenOnLogsAddress(t *testing.T) { @@ -392,15 +393,17 @@ func TestLogsHandler(t *testing.T) { func TestCreateLogs(t *testing.T) { testCases := []struct { - desc string - slice []event - expectedLogRecords int - expectedType string - expectedTimestamp string - expectedBody string - expectedSeverityText string - expectedSeverityNumber plog.SeverityNumber - expectError bool + desc string + slice []event + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedSeverityText string + expectedContainsRequestId bool + expectedRequestId string + expectedSeverityNumber plog.SeverityNumber + expectError bool }{ { desc: "no slice", @@ -427,13 +430,14 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, }, { desc: "function json", @@ -449,13 +453,15 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am a function!", - expectedSeverityText: "INFO", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogRecords: 1, + expectedType: "function", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am a function!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, }, { desc: "extension text", @@ -466,13 +472,14 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "[INFO] Hello world, I am an extension!", + expectedContainsRequestId: false, + expectedSeverityText: "", + expectedSeverityNumber: plog.SeverityNumberUnspecified, + expectError: false, }, { desc: "extension json", @@ -483,18 +490,20 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{ "timestamp": "2022-10-12T00:03:50.000Z", "level": "INFO", - "requestId": "79b4f56e-95b1-4643-9700-2807f4e68189", + "requestId": "79b4f56e-95b1-4643-9700-2807f4e68689", "message": "Hello world, I am an extension!", }, }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am an extension!", - expectedSeverityText: "INFO", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogRecords: 1, + expectedType: "extension", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "Hello world, I am an extension!", + expectedContainsRequestId: true, + expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, }, { desc: "platform.initReport", @@ -512,13 +521,14 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "platform.initReport", - expectedTimestamp: "2024-05-15T23:58:26.858Z", - expectedBody: "{\"initializationType\":\"on-demand\",\"metrics\":{\"durationMs\":1819.081},\"phase\":\"init\",\"status\":\"success\"}", - expectedSeverityText: "INFO", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogRecords: 1, + expectedType: "platform.initReport", + expectedTimestamp: "2024-05-15T23:58:26.858Z", + expectedBody: "{\"initializationType\":\"on-demand\",\"metrics\":{\"durationMs\":1819.081},\"phase\":\"init\",\"status\":\"success\"}", + expectedContainsRequestId: false, + expectedSeverityText: "INFO", + expectedSeverityNumber: plog.SeverityNumberInfo, + expectError: false, }, } for _, tc := range testCases { @@ -547,6 +557,11 @@ func TestCreateLogs(t *testing.T) { expectedTime, err := time.Parse(timeFormatLayout, tc.expectedTimestamp) require.NoError(t, err) require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, tc.expectedContainsRequestId, ok) + if ok { + require.Equal(t, tc.expectedRequestId, requestId.Str()) + } require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) require.Equal(t, tc.expectedBody, logRecord.Body().Str())