Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NH-76996 Otelcol: collecting Lambda function logs #18

Merged
merged 15 commits into from
May 16, 2024
8 changes: 8 additions & 0 deletions collector/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ receivers:
endpoint: "localhost:4317"
http:
endpoint: "localhost:4318"
telemetryapi:

processors:
decouple:
Expand All @@ -22,6 +23,9 @@ processors:
- key: sw.cloud.aws.resource.type
value: "Lambda"
action: upsert
- key: sw.data.module
value: "apm"
action: upsert

exporters:
logging:
Expand All @@ -42,6 +46,10 @@ service:
receivers: [otlp]
processors: [resource,resourcedetection,decouple]
exporters: [otlp,logging]
logs:
receivers: [telemetryapi]
processors: [resource,resourcedetection,decouple]
exporters: [otlp,logging]
telemetry:
metrics:
address: localhost:8888
2 changes: 1 addition & 1 deletion collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ require (
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.99.0 // indirect
go.opentelemetry.io/collector/receiver v0.99.0 // indirect
go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 // indirect
go.opentelemetry.io/collector/semconv v0.99.0 // indirect
go.opentelemetry.io/collector/semconv v0.100.0 // indirect
go.opentelemetry.io/collector/service v0.99.0 // indirect
go.opentelemetry.io/contrib/config v0.5.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,8 @@ go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 h1:bAICAUW2K67kL9NvN
go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0/go.mod h1:0IUYqbqx2ZxxW3iYVHjW0ulA4bPkUJSLVCFJq4MR8FI=
go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg=
go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs=
go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/service v0.99.0 h1:Ods9uVHAZb1PW1nTC5XOzC+lC1qrA+EVyt1NNzT8Uqk=
go.opentelemetry.io/collector/service v0.99.0/go.mod h1:R5dcvSHaqdx3xojrq0rJdLyKeGZMke37ZWgzCbLhapQ=
go.opentelemetry.io/contrib/config v0.5.0 h1:7jLbj1losnHOq1rarCVMEDrmkHWixEIJ11pDtT4KrGM=
Expand Down
24 changes: 15 additions & 9 deletions collector/internal/telemetryapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
)

const (
ApiVersion20220701 = "2022-07-01"
ApiVersionLatest = ApiVersion20220701
SchemaVersion20220701 = "2022-07-01"
SchemaVersionLatest = SchemaVersion20220701
SchemaVersion20221213 = "2022-12-13"
SchemaVersionLatest = SchemaVersion20221213
lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier"
)

Expand All @@ -42,17 +45,11 @@ func NewClient(logger *zap.Logger) *Client {
return &Client{
logger: logger.Named("telemetryAPI.Client"),
httpClient: &http.Client{},
baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), SchemaVersionLatest),
baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), ApiVersionLatest),
}
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
}

func (c *Client) SubscribeEvents(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) {
bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
Expand Down Expand Up @@ -107,6 +104,15 @@ func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI
return string(body), nil
}

func (c *Client) Subscribe(ctx context.Context, extensionID string, listenerURI string) (string, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
Comment on lines +110 to +111

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we still need these for later?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep it there so it doesn't have much difference when compared to upstream. (Easier to sync)

}
return c.SubscribeEvents(ctx, eventTypes, extensionID, listenerURI)
}

func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion collector/lambdacomponents/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ require (
go.opentelemetry.io/collector/extension/auth v0.99.0 // indirect
go.opentelemetry.io/collector/featuregate v1.6.0 // indirect
go.opentelemetry.io/collector/pdata v1.6.0 // indirect
go.opentelemetry.io/collector/semconv v0.99.0 // indirect
go.opentelemetry.io/collector/semconv v0.100.0 // indirect
go.opentelemetry.io/collector/service v0.99.0 // indirect
go.opentelemetry.io/contrib/config v0.5.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions collector/lambdacomponents/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0 h1:bAICAUW2K67kL9NvN
go.opentelemetry.io/collector/receiver/otlpreceiver v0.99.0/go.mod h1:0IUYqbqx2ZxxW3iYVHjW0ulA4bPkUJSLVCFJq4MR8FI=
go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg=
go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/service v0.99.0 h1:Ods9uVHAZb1PW1nTC5XOzC+lC1qrA+EVyt1NNzT8Uqk=
go.opentelemetry.io/collector/service v0.99.0/go.mod h1:R5dcvSHaqdx3xojrq0rJdLyKeGZMke37ZWgzCbLhapQ=
go.opentelemetry.io/contrib/config v0.5.0 h1:7jLbj1losnHOq1rarCVMEDrmkHWixEIJ11pDtT4KrGM=
Expand Down
12 changes: 11 additions & 1 deletion collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewFactory(extensionID string) receiver.Factory {
extensionID: extensionID,
}
},
receiver.WithTraces(createTracesReceiver, stability))
receiver.WithTraces(createTracesReceiver, stability),
receiver.WithLogs(createLogsReceiver, stability))
}

func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Traces) (receiver.Traces, error) {
Expand All @@ -50,3 +51,12 @@ func createTracesReceiver(ctx context.Context, params receiver.CreateSettings, r

return newTelemetryAPIReceiver(cfg, next, params)
}

func createLogsReceiver(ctx context.Context, params receiver.CreateSettings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) {
cfg, ok := rConf.(*Config)
if !ok {
return nil, errConfigNotTelemetryAPI
}

return newTelemetryAPILogsReceiver(cfg, next, params)
}
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
go.opentelemetry.io/collector/consumer v0.99.0
go.opentelemetry.io/collector/pdata v1.6.0
go.opentelemetry.io/collector/receiver v0.99.0
go.opentelemetry.io/collector/semconv v0.99.0
go.opentelemetry.io/collector/semconv v0.100.0
go.uber.org/zap v1.27.0
)

Expand Down
2 changes: 2 additions & 0 deletions collector/receiver/telemetryapireceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ go.opentelemetry.io/collector/receiver v0.99.0 h1:NdYShaEaabxVBRQaxK/HcKqRGl1eUF
go.opentelemetry.io/collector/receiver v0.99.0/go.mod h1:aU9ftU4FhdEY9/eREf86FWHmZHz8kufXchfpHrTTrn0=
go.opentelemetry.io/collector/semconv v0.99.0 h1:6xCezUbjdeMdrP2HtoEJQue99dgrZhqHCgjYRcuEGBg=
go.opentelemetry.io/collector/semconv v0.99.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/collector/semconv v0.100.0 h1:QArUvWcbmsMjM4PV0zngUHRizZeUXibsPBWjDuNJXAs=
go.opentelemetry.io/collector/semconv v0.100.0/go.mod h1:8ElcRZ8Cdw5JnvhTOQOdYizkJaQ10Z2fS+R6djOnj6A=
go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k=
go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg=
go.opentelemetry.io/otel/exporters/prometheus v0.47.0 h1:OL6yk1Z/pEGdDnrBbxSsH+t4FY1zXfBRGd7bjwhlMLU=
Expand Down
239 changes: 239 additions & 0 deletions collector/receiver/telemetryapireceiver/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver"

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/golang-collections/go-datastructures/queue"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
)

const defaultLogsListenerPort = "4327"
const initialLogsQueueSize = 5
const timeFormatLayout = "2006-01-02T15:04:05.000Z"
const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"

type telemetryAPILogsReceiver struct {
httpServer *http.Server
logger *zap.Logger
queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later
nextConsumer consumer.Logs
extensionID string
resource pcommon.Resource
}

func (r *telemetryAPILogsReceiver) Start(ctx context.Context, host component.Host) error {
address := listenOnLogsAddress()
r.logger.Info("Listening for requests", zap.String("address", address))

mux := http.NewServeMux()
mux.HandleFunc("/", r.httpHandler)
r.httpServer = &http.Server{Addr: address, Handler: mux}
go func() {
_ = r.httpServer.ListenAndServe()
}()

telemetryClient := telemetryapi.NewClient(r.logger)
_, err := telemetryClient.SubscribeEvents(ctx, []telemetryapi.EventType{telemetryapi.Platform, telemetryapi.Function}, r.extensionID, fmt.Sprintf("http://%s/", address))
if err != nil {
r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID))
return err
}
return nil
}

func (r *telemetryAPILogsReceiver) Shutdown(ctx context.Context) error {
return nil
}

func (r *telemetryAPILogsReceiver) httpHandler(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
r.logger.Error("error reading body", zap.Error(err))
return
}

var slice []event
if err := json.Unmarshal(body, &slice); err != nil {
r.logger.Error("error unmarshalling body", zap.Error(err))
return
}

if logs, err := r.createLogs(slice); err == nil {
err := r.nextConsumer.ConsumeLogs(context.Background(), logs)
if err != nil {
r.logger.Error("error receiving logs", zap.Error(err))
}
}

r.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", r.queue.Len()))
slice = nil
}

func (r *telemetryAPILogsReceiver) createLogs(slice []event) (plog.Logs, error) {
log := plog.NewLogs()
resourceLog := log.ResourceLogs().AppendEmpty()
r.resource.CopyTo(resourceLog.Resource())
scopeLog := resourceLog.ScopeLogs().AppendEmpty()
scopeLog.Scope().SetName(scopeName)
for _, el := range slice {
r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el))
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutStr("type", el.Type)
if t, err := time.Parse(timeFormatLayout, el.Time); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t))
logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now()))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) {
if record, ok := el.Record.(map[string]interface{}); ok {
// in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if timestamp, ok := record["timestamp"].(string); ok {
if observedTime, err := time.Parse(timeFormatLayout, timestamp); err == nil {
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(observedTime))
} else {
r.logger.Error("error parsing time", zap.Error(err))
return plog.Logs{}, err
}
}
if level, ok := record["level"].(string); ok {
level = strings.ToUpper(level)
logRecord.SetSeverityText(level)
switch level {
case "TRACE":
logRecord.SetSeverityNumber(1)
case "TRACE2":
logRecord.SetSeverityNumber(2)
case "TRACE3":
logRecord.SetSeverityNumber(3)
case "TRACE4":
logRecord.SetSeverityNumber(4)
case "DEBUG":
logRecord.SetSeverityNumber(5)
case "DEBUG2":
logRecord.SetSeverityNumber(6)
case "DEBUG3":
logRecord.SetSeverityNumber(7)
case "DEBUG4":
logRecord.SetSeverityNumber(8)
case "INFO":
logRecord.SetSeverityNumber(9)
case "INFO2":
logRecord.SetSeverityNumber(10)
case "INFO3":
logRecord.SetSeverityNumber(11)
case "INFO4":
logRecord.SetSeverityNumber(12)
case "WARN":
logRecord.SetSeverityNumber(13)
case "WARN2":
logRecord.SetSeverityNumber(14)
case "WARN3":
logRecord.SetSeverityNumber(15)
case "WARN4":
logRecord.SetSeverityNumber(16)
case "ERROR":
logRecord.SetSeverityNumber(17)
case "ERROR2":
logRecord.SetSeverityNumber(18)
case "ERROR3":
logRecord.SetSeverityNumber(19)
case "ERROR4":
logRecord.SetSeverityNumber(20)
case "FATAL":
logRecord.SetSeverityNumber(21)
case "FATAL2":
logRecord.SetSeverityNumber(22)
case "FATAL3":
logRecord.SetSeverityNumber(23)
case "FATAL4":
logRecord.SetSeverityNumber(24)
default:
}
}
if requestId, ok := record["requestId"].(string); ok {
logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId)
}
if line, ok := record["message"].(string); ok {
logRecord.Body().SetStr(line)
}
} else {
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
if line, ok := el.Record.(string); ok {
logRecord.Body().SetStr(line)
}
}
} else {
logRecord.SetSeverityText("INFO")
logRecord.SetSeverityNumber(9)
if j, err := json.Marshal(el.Record); err == nil {
logRecord.Body().SetStr(string(j))
} else {
r.logger.Error("error stringify record", zap.Error(err))
return plog.Logs{}, err
}
}
}
return log, nil
}

func newTelemetryAPILogsReceiver(
cfg *Config,
next consumer.Logs,
set receiver.CreateSettings,
) (*telemetryAPILogsReceiver, error) {
envResourceMap := map[string]string{
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": semconv.AttributeFaaSMaxMemory,
"AWS_LAMBDA_FUNCTION_VERSION": semconv.AttributeFaaSVersion,
"AWS_REGION": semconv.AttributeFaaSInvokedRegion,
}
r := pcommon.NewResource()
r.Attributes().PutStr(semconv.AttributeFaaSInvokedProvider, semconv.AttributeFaaSInvokedProviderAWS)
if val, ok := os.LookupEnv("AWS_LAMBDA_FUNCTION_NAME"); ok {
r.Attributes().PutStr(semconv.AttributeServiceName, val)
r.Attributes().PutStr(semconv.AttributeFaaSName, val)
} else {
r.Attributes().PutStr(semconv.AttributeServiceName, "unknown_service")
}

for env, resourceAttribute := range envResourceMap {
if val, ok := os.LookupEnv(env); ok {
r.Attributes().PutStr(resourceAttribute, val)
}
}
return &telemetryAPILogsReceiver{
logger: set.Logger,
queue: queue.New(initialLogsQueueSize),
nextConsumer: next,
extensionID: cfg.extensionID,
resource: r,
}, nil
}

func listenOnLogsAddress() string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + defaultLogsListenerPort
} else {
addr = "sandbox.localdomain:" + defaultLogsListenerPort
}

return addr
}
Loading