From 5032a8389b6391b298fbfb33960671bb047fc35b Mon Sep 17 00:00:00 2001 From: Jerry Leung Date: Mon, 22 Jul 2024 12:32:47 -0700 Subject: [PATCH] Updated --- .../extension.go | 4 +- collector/lambdacomponents/go.mod | 1 + .../receiver/telemetryapireceiver/go.mod | 3 + .../internal/telemetryapi/client.go | 120 -------------- .../internal/telemetryapi/listener.go | 151 ------------------ .../internal/telemetryapi/types.go | 93 ----------- .../receiver/telemetryapireceiver/receiver.go | 6 +- .../telemetryapireceiver/receiver_test.go | 2 +- 8 files changed, 11 insertions(+), 369 deletions(-) delete mode 100644 collector/receiver/telemetryapireceiver/internal/telemetryapi/client.go delete mode 100644 collector/receiver/telemetryapireceiver/internal/telemetryapi/listener.go delete mode 100644 collector/receiver/telemetryapireceiver/internal/telemetryapi/types.go diff --git a/collector/extension/solarwindsapmsettingsextension/extension.go b/collector/extension/solarwindsapmsettingsextension/extension.go index 435d2d4931..5c2dfd1f39 100644 --- a/collector/extension/solarwindsapmsettingsextension/extension.go +++ b/collector/extension/solarwindsapmsettingsextension/extension.go @@ -21,7 +21,7 @@ import ( const ( JSONOutputFile = "/tmp/solarwinds-apm-settings.json" - GrpcContextDeadline = time.Duration(10) * time.Second + GrpcContextDeadline = time.Duration(5) * time.Second ) type solarwindsapmSettingsExtension struct { @@ -176,12 +176,14 @@ func (extension *solarwindsapmSettingsExtension) Start(ctx context.Context, _ co ctx, extension.cancel = context.WithCancel(ctx) systemCertPool, err := x509.SystemCertPool() if err != nil { + extension.logger.Error("Getting system cert pool failed: ", zap.Error(err)) return err } subjects := systemCertPool.Subjects() extension.logger.Info("Loading system certificates", zap.Int("numberOfCertificates", len(subjects))) extension.conn, err = grpc.NewClient(extension.config.Endpoint, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{RootCAs: systemCertPool}))) if err != nil { + extension.logger.Error("grpc.NewClient creation failed: ", zap.Error(err)) return err } extension.logger.Info("Created a grpc.NewClient", zap.String("endpoint", extension.config.Endpoint)) diff --git a/collector/lambdacomponents/go.mod b/collector/lambdacomponents/go.mod index fd158085a4..1f8de62c75 100644 --- a/collector/lambdacomponents/go.mod +++ b/collector/lambdacomponents/go.mod @@ -128,6 +128,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.102.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.102.0 // indirect + github.com/open-telemetry/opentelemetry-lambda/collector v0.0.0-00010101000000-000000000000 // indirect github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 7f6da3b005..431c707a30 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -4,8 +4,11 @@ go 1.21.0 toolchain go1.21.4 +replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ + require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/open-telemetry/opentelemetry-lambda/collector v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.102.1 go.opentelemetry.io/collector/confmap v0.102.1 diff --git a/collector/receiver/telemetryapireceiver/internal/telemetryapi/client.go b/collector/receiver/telemetryapireceiver/internal/telemetryapi/client.go deleted file mode 100644 index 9b02fcb282..0000000000 --- a/collector/receiver/telemetryapireceiver/internal/telemetryapi/client.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package telemetryapireceiver - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - - "go.uber.org/zap" -) - -const ( - ApiVersion20220701 = "2022-07-01" - ApiVersionLatest = ApiVersion20220701 - SchemaVersion20220701 = "2022-07-01" - SchemaVersion20221213 = "2022-12-13" - SchemaVersionLatest = SchemaVersion20221213 - lambdaAgentIdentifierHeaderKey = "Lambda-Extension-Identifier" -) - -type Client struct { - logger *zap.Logger - httpClient *http.Client - baseURL string -} - -func NewClient(logger *zap.Logger) *Client { - return &Client{ - logger: logger.Named("telemetryAPIReceiver.Client"), - httpClient: &http.Client{}, - baseURL: fmt.Sprintf("http://%s/%s/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API"), ApiVersionLatest), - } -} - -func (c *Client) Subscribe(ctx context.Context, eventTypes []EventType, extensionID string, listenerURI string) (string, error) { - bufferingConfig := BufferingCfg{ - MaxItems: 1000, - MaxBytes: 256 * 1024, - TimeoutMS: 25, - } - - destination := Destination{ - Protocol: HttpProto, - HttpMethod: HttpPost, - Encoding: JSON, - URI: URI(listenerURI), - } - - data, err := json.Marshal( - &SubscribeRequest{ - SchemaVersion: SchemaVersionLatest, - EventTypes: eventTypes, - BufferingCfg: bufferingConfig, - Destination: destination, - }) - - if err != nil { - return "", fmt.Errorf("Failed to marshal SubscribeRequest: %w", err) - } - - headers := make(map[string]string) - headers[lambdaAgentIdentifierHeaderKey] = extensionID - - c.logger.Info("Subscribing", zap.String("baseURL", c.baseURL), zap.Any("types", eventTypes), zap.String("extensionID", extensionID)) - resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseURL, data, headers) - if err != nil { - c.logger.Error("Subscription failed", zap.Error(err)) - return "", err - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusAccepted { - c.logger.Error("Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?", zap.Int("status_code", resp.StatusCode)) - } else if resp.StatusCode != http.StatusOK { - c.logger.Error("Subscription failed") - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("request to %s failed: %d[%s]: %w", c.baseURL, resp.StatusCode, resp.Status, err) - } - - return "", fmt.Errorf("request to %s failed: %d[%s] %s", c.baseURL, resp.StatusCode, resp.Status, string(body)) - } - - body, _ := io.ReadAll(resp.Body) - c.logger.Info("Subscription success", zap.String("response", string(body))) - - return string(body), nil -} - -func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers map[string]string) (*http.Response, error) { - req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data)) - if err != nil { - return nil, err - } - - contentType := "application/json" - req.Header.Set("Content-Type", contentType) - for k, v := range headers { - req.Header.Set(k, v) - } - - return client.Do(req) -} diff --git a/collector/receiver/telemetryapireceiver/internal/telemetryapi/listener.go b/collector/receiver/telemetryapireceiver/internal/telemetryapi/listener.go deleted file mode 100644 index 1ad35221b9..0000000000 --- a/collector/receiver/telemetryapireceiver/internal/telemetryapi/listener.go +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package telemetryapireceiver - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "os" - "time" - - "github.com/golang-collections/go-datastructures/queue" - "go.uber.org/zap" -) - -const defaultListenerPort = "53612" -const initialQueueSize = 5 - -// Listener is used to listen to the Telemetry API -type Listener struct { - httpServer *http.Server - logger *zap.Logger - // queue is a synchronous queue and is used to put the received log events to be dispatched later - queue *queue.Queue -} - -func NewListener(logger *zap.Logger) *Listener { - return &Listener{ - httpServer: nil, - logger: logger.Named("telemetryAPI.Listener"), - queue: queue.New(initialQueueSize), - } -} - -func listenOnAddress() string { - envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") - var addr string - if ok && envAwsLocal == "true" { - addr = ":" + defaultListenerPort - } else { - addr = "sandbox.localdomain:" + defaultListenerPort - } - - return addr -} - -// Start the server in a goroutine where the log events will be sent -func (s *Listener) Start() (string, error) { - address := listenOnAddress() - s.logger.Info("Listening for requests", zap.String("address", address)) - s.httpServer = &http.Server{Addr: address} - http.HandleFunc("/", s.httpHandler) - go func() { - err := s.httpServer.ListenAndServe() - if err != http.ErrServerClosed { - s.logger.Error("Unexpected stop on HTTP Server", zap.Error(err)) - s.Shutdown() - } else { - s.logger.Info("HTTP Server closed:", zap.Error(err)) - } - }() - return fmt.Sprintf("http://%s/", address), nil -} - -// httpHandler handles the requests coming from the Telemetry API. -// Everytime Telemetry API sends log events, this function will read them from the response body -// and put into a synchronous queue to be dispatched later. -// Logging or printing besides the error cases below is not recommended if you have subscribed to -// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for -// the printed lines which may create an infinite loop. -func (s *Listener) httpHandler(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - s.logger.Error("error reading body", zap.Error(err)) - return - } - - // Parse and put the log messages into the queue - var slice []Event - _ = json.Unmarshal(body, &slice) - - for _, el := range slice { - if err := s.queue.Put(el); err != nil { - s.logger.Error("Failed to put event in queue", zap.Error(err)) - } - } - - s.logger.Debug("logEvents received", zap.Int("count", len(slice)), zap.Int64("queue_length", s.queue.Len())) - slice = nil -} - -// Shutdown the HTTP server listening for logs -func (s *Listener) Shutdown() { - if s.httpServer != nil { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - err := s.httpServer.Shutdown(ctx) - if err != nil { - s.logger.Error("Failed to shutdown HTTP server gracefully", zap.Error(err)) - } else { - s.httpServer = nil - } - } -} - -func (s *Listener) Wait(ctx context.Context, reqID string) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - s.logger.Debug("looking for platform.runtimeDone event") - items, err := s.queue.Get(10) - if err != nil { - return fmt.Errorf("unable to get telemetry events from queue: %w", err) - } - - for _, item := range items { - i, ok := item.(Event) - if !ok { - s.logger.Warn("non-Event found in queue", zap.Any("item", item)) - continue - } - s.logger.Debug("Event processed", zap.Any("event", i)) - if i.Type != "platform.runtimeDone" { - continue - } - - if record, ok := i.Record.(map[string]any); ok { - if record["requestId"] == reqID { - return nil - } - } - } - } - } -} diff --git a/collector/receiver/telemetryapireceiver/internal/telemetryapi/types.go b/collector/receiver/telemetryapireceiver/internal/telemetryapi/types.go deleted file mode 100644 index 1039877e4e..0000000000 --- a/collector/receiver/telemetryapireceiver/internal/telemetryapi/types.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package telemetryapireceiver - -// EventType represents the type of log events in Lambda -type EventType string - -const ( - // Platform is used to receive log events emitted by the Lambda platform - Platform EventType = "platform" - // PlatformInitStart is used when function initialization started. - PlatformInitStart EventType = Platform + ".initStart" - // PlatformInitRuntimeDone is used when function initialization ended. - PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone" - // Function is used to receive log events emitted by the function - Function EventType = "function" - // Extension is used is to receive log events emitted by the extension - Extension EventType = "extension" -) - -// BufferingCfg holds configuration for receiving telemetry from the Telemetry API. -// Telemetry will be sent to your listener when one of the conditions below is met. -type BufferingCfg struct { - // Maximum number of log events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000) - MaxItems uint32 `json:"maxItems"` - // Maximum size in bytes of the log events to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576) - MaxBytes uint32 `json:"maxBytes"` - // Maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000) - TimeoutMS uint32 `json:"timeoutMs"` -} - -// URI is used to set the endpoint where the logs will be sent to -type URI string - -// HTTPMethod represents the HTTP method used to receive events from the Telemetry API -type HTTPMethod string - -const ( - // Receive log events via POST requests to the listener - HttpPost HTTPMethod = "POST" - // Receive log events via PUT requests to the listener - HttpPut HTTPMethod = "PUT" -) - -// Used to specify the protocol when subscribing to Telemetry API for HTTP -type HTTPProtocol string - -const ( - HttpProto HTTPProtocol = "HTTP" -) - -// Denotes what the content is encoded in -type HTTPEncoding string - -const ( - JSON HTTPEncoding = "JSON" -) - -// Configuration for listeners that would like to receive telemetry via HTTP -type Destination struct { - Protocol HTTPProtocol `json:"protocol"` - URI URI `json:"URI"` - HttpMethod HTTPMethod `json:"method"` - Encoding HTTPEncoding `json:"encoding"` -} - -type SchemaVersion string - -// Request body that is sent to the Telemetry API on subscribe -type SubscribeRequest struct { - SchemaVersion SchemaVersion `json:"schemaVersion"` - EventTypes []EventType `json:"types"` - BufferingCfg BufferingCfg `json:"buffering"` - Destination Destination `json:"destination"` -} - -type Event struct { - Time string `json:"time"` - Type string `json:"type"` - Record any `json:"record"` -} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 231c3cc457..79c0d1401a 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -30,7 +30,7 @@ import ( "time" "github.com/golang-collections/go-datastructures/queue" - telemetryapi "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/telemetryapi" + telemetryapi "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -71,9 +71,9 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, _ component.Host) erro telemetryClient := telemetryapi.NewClient(r.logger) if len(r.types) > 0 { - _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) + _, err := telemetryClient.Subscribe(context.Background(), r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) + r.logger.Error("Cannot register Telemetry API client", zap.Error(err)) return err } } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index ac2a0349e4..8d12e51113 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - telemetryapi "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver/internal/telemetryapi" + telemetryapi "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon"