Skip to content

Commit

Permalink
experiment with metricAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
chaudharysaket committed Nov 7, 2024
1 parent 0b44e36 commit 4165628
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 6 deletions.
21 changes: 15 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ func main() {
}()

// Call next, and process telemetry, until we're shut down
eventCounter := mainLoop(ctx, invocationClient, batch, telemetryChan, logServer, telemetryClient)
eventCounter := mainLoop(ctx, conf, invocationClient, batch, telemetryChan, logServer, telemetryClient)

util.Logf("New Relic Extension shutting down after %v events\n", eventCounter)

pollLogServer(logServer, batch)
pollLogServer(conf, logServer, batch)
err = logServer.Close()
if err != nil {
util.Logln("Error shutting down Log API server", err)
Expand Down Expand Up @@ -191,7 +191,7 @@ func logShipLoop(ctx context.Context, logServer *logserver.LogServer, telemetryC
}

// mainLoop repeatedly calls the /next api, and processes telemetry and platform logs. The timing is rather complicated.
func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, batch *telemetry.Batch, telemetryChan chan []byte, logServer *logserver.LogServer, telemetryClient *telemetry.Client) int {
func mainLoop(ctx context.Context, conf *config.Configuration, invocationClient *client.InvocationClient, batch *telemetry.Batch, telemetryChan chan []byte, logServer *logserver.LogServer, telemetryClient *telemetry.Client) int {
eventCounter := 0
probablyTimeout := false

Expand Down Expand Up @@ -277,7 +277,7 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba
// Before we begin to await telemetry, harvest and ship. Ripe telemetry will mostly be handled here. Even that is a
// minority of invocations. Putting this here lets us run the HTTP request to send to NR in parallel with the Lambda
// handler, reducing or eliminating our latency impact.
pollLogServer(logServer, batch)
pollLogServer(conf, logServer, batch)
shipHarvest(ctx, batch.Harvest(time.Now()), telemetryClient)

select {
Expand All @@ -299,7 +299,7 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba

// Opportunity for an aggressive harvest, in which case, we definitely want to wait for the HTTP POST
// to complete. Mostly, nothing really happens here.
pollLogServer(logServer, batch)
pollLogServer(conf, logServer, batch)
shipHarvest(ctx, batch.Harvest(time.Now()), telemetryClient)
}

Expand All @@ -309,8 +309,17 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba
}

// pollLogServer polls for platform logs, and annotates telemetry
func pollLogServer(logServer *logserver.LogServer, batch *telemetry.Batch) {
func pollLogServer(conf *config.Configuration , logServer *logserver.LogServer, batch *telemetry.Batch) {
for _, platformLog := range logServer.PollPlatformChannel() {
lambdaMetrics, _ := telemetry.ParseLambdaLog(string(platformLog.Content))
metrics := lambdaMetrics.ConvertToMetrics("lambda.function")
statusCode, responseBody, err := telemetry.SendMetrics(conf.LicenseKey, metrics, true)
if err != nil {
util.Logf("Error sending metric: %v", err)
}
fmt.Printf("Response Status: %d\n", statusCode)
fmt.Printf("Response Body: %s\n", responseBody)
util.Logf("Platform log: %s", platformLog.Content)
inv := batch.AddTelemetry(platformLog.RequestID, platformLog.Content)
if inv == nil {
util.Debugf("Skipping platform log for request %v", platformLog.RequestID)
Expand Down
185 changes: 185 additions & 0 deletions telemetry/send_metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package telemetry

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strconv"

"github.com/newrelic/newrelic-lambda-extension/util"
)

const (
MetricEndpointEU string = "https://staging-metric-api.eu.newrelic.com/metric/v1"
MetricEndpointUS string = "https://staging-metric-api.newrelic.com/metric/v1"
)

type Metric struct {
Name string `json:"name"`
Type string `json:"type"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
Attributes map[string]string `json:"attributes"`
}

type MetricPayload struct {
Metrics []Metric `json:"metrics"`
}

type LambdaMetrics struct {
RequestID string
Duration float64
BilledDuration float64
MemorySize int64
MaxMemoryUsed int64
InitDuration *float64
}

func ParseLambdaLog(logLine string) (*LambdaMetrics, error) {
basicPattern := `RequestId: (\S+)\s+Duration: ([\d.]+) ms\s+Billed Duration: (\d+) ms\s+Memory Size: (\d+) MB\s+Max Memory Used: (\d+) MB`
initPattern := `Init Duration: ([\d.]+) ms`

basicRe := regexp.MustCompile(basicPattern)
basicMatches := basicRe.FindStringSubmatch(logLine)
if basicMatches == nil {
return nil, fmt.Errorf("invalid log format")
}

duration, err := strconv.ParseFloat(basicMatches[2], 64)
if err != nil {
return nil, fmt.Errorf("error parsing duration: %v", err)
}

billedDuration, err := strconv.ParseInt(basicMatches[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing billed duration: %v", err)
}

memorySize, err := strconv.ParseInt(basicMatches[4], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing memory size: %v", err)
}

maxMemoryUsed, err := strconv.ParseInt(basicMatches[5], 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing max memory used: %v", err)
}

metrics := &LambdaMetrics{
RequestID: basicMatches[1],
Duration: duration,
BilledDuration: float64(billedDuration),
MemorySize: memorySize,
MaxMemoryUsed: maxMemoryUsed,
InitDuration: nil, // Default to nil for no init duration
}

// Check for init duration if present
initRe := regexp.MustCompile(initPattern)
initMatches := initRe.FindStringSubmatch(logLine)
if initMatches != nil {
initDuration, err := strconv.ParseFloat(initMatches[1], 64)
if err == nil { // Only set if parsing succeeds
metrics.InitDuration = &initDuration
}
}

return metrics, nil
}

// ConvertToMetrics converts LambdaMetrics to a slice of NewRelic metrics
func (lm *LambdaMetrics) ConvertToMetrics(prefix string) []Metric {
timestamp := util.Timestamp()
attributes := map[string]string{
"requestId": lm.RequestID,
}

metrics := []Metric{
{
Name: prefix + ".duration",
Type: "gauge",
Value: lm.Duration,
Timestamp: timestamp,
Attributes: attributes,
},
{
Name: prefix + ".billed_duration",
Type: "gauge",
Value: lm.BilledDuration,
Timestamp: timestamp,
Attributes: attributes,
},
{
Name: prefix + ".memory_size",
Type: "gauge",
Value: float64(lm.MemorySize),
Timestamp: timestamp,
Attributes: attributes,
},
{
Name: prefix + ".max_memory_used",
Type: "gauge",
Value: float64(lm.MaxMemoryUsed),
Timestamp: timestamp,
Attributes: attributes,
},
}

// Add init duration metric only if it exists
if lm.InitDuration != nil {
metrics = append(metrics, Metric{
Name: prefix + ".init_duration",
Type: "gauge",
Value: *lm.InitDuration,
Timestamp: timestamp,
Attributes: attributes,
})
}

return metrics
}


func SendMetrics(apiKey string, metrics []Metric, skipTLSVerify bool) (int, string, error) {
payload := []MetricPayload{
{
Metrics: metrics,
},
}

jsonData, err := json.Marshal(payload)
fmt.Printf("jsonData: %s\n", jsonData)

if err != nil {
return 0, "", fmt.Errorf("error marshaling JSON: %v", err)
}

req, err := http.NewRequest("POST", MetricEndpointUS, bytes.NewBuffer(jsonData))
if err != nil {
return 0, "", fmt.Errorf("error creating request: %v", err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", apiKey)

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTLSVerify},
}
client := &http.Client{Transport: tr}
resp, err := client.Do(req)
if err != nil {
return 0, "", fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, "", fmt.Errorf("error reading response: %v", err)
}

return resp.StatusCode, string(body), nil
}

Check failure on line 185 in telemetry/send_metric.go

View workflow job for this annotation

GitHub Actions / coverage

syntax error: unexpected var after top level declaration

0 comments on commit 4165628

Please sign in to comment.