Skip to content

Commit

Permalink
Memory optimizations for various parts of the code (#1096)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Aug 20, 2024
1 parent d433fd4 commit 3fd9525
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkg/export/alloy/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
for spans := range in {
for i := range spans {
span := &spans[i]
if span.IgnoreSpan == request.IgnoreTraces {
if span.IgnoreTraces() {
continue
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/export/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestTracePrinterValidEnabled(t *testing.T) {
func traceFuncHelper(t *testing.T, tracePrinter TracePrinter) string {
fakeSpan := request.Span{
Type: request.EventTypeHTTP,
IgnoreSpan: request.IgnoreMetrics,
Method: "method",
Path: "path",
Route: "route",
Expand All @@ -63,6 +62,8 @@ func traceFuncHelper(t *testing.T, tracePrinter TracePrinter) string {
Statement: "statement",
}

fakeSpan.SetIgnoreMetrics()

// redirect the TracePrinter function stdout to a pipe so that we can
// capture and return its output
r, w, err := os.Pipe()
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
s := &spans[i]

// If we are ignoring this span because of route patterns, don't do anything
if s.IgnoreSpan == request.IgnoreMetrics {
if s.IgnoreMetrics() {
continue
}
reporter, err := mr.reporters.For(&s.ServiceID)
Expand Down
23 changes: 21 additions & 2 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

expirable2 "github.com/hashicorp/golang-lru/v2/expirable"
"github.com/mariomac/pipes/pipe"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

func tlog() *slog.Logger {
Expand All @@ -49,6 +51,8 @@ func tlog() *slog.Logger {

const reporterName = "github.com/grafana/beyla"

var serviceAttrCache = expirable2.NewLRU[svc.UID, []attribute.KeyValue](1024, nil, 5*time.Minute)

type TracesConfig struct {
CommonEndpoint string `yaml:"-" env:"OTEL_EXPORTER_OTLP_ENDPOINT"`
TracesEndpoint string `yaml:"endpoint" env:"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"`
Expand Down Expand Up @@ -195,7 +199,7 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
for spans := range in {
for i := range spans {
span := &spans[i]
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
if span.IgnoreTraces() || !tr.acceptSpan(span) {
continue
}
traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs)
Expand Down Expand Up @@ -364,6 +368,21 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig {
return backOffCfg
}

func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
if service.UID == "" {
return getAppResourceAttrs(hostID, service)
}

attrs, ok := serviceAttrCache.Get(service.UID)
if ok {
return attrs
}
attrs = getAppResourceAttrs(hostID, service)
serviceAttrCache.Add(service.UID, attrs)

return attrs
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
t := span.Timings()
Expand All @@ -372,7 +391,7 @@ func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]s
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
resourceAttrs := getAppResourceAttrs(hostID, &span.ServiceID)
resourceAttrs := traceAppResourceAttrs(hostID, &span.ServiceID)
resourceAttrs = append(resourceAttrs, envResourceAttrs...)
resourceAttrsMap := attrsToMap(resourceAttrs)
resourceAttrsMap.PutStr(string(semconv.OTelLibraryNameKey), reporterName)
Expand Down
35 changes: 33 additions & 2 deletions pkg/export/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,6 @@ func TestSpanHostPeer(t *testing.T) {
}

func TestTracesInstrumentations(t *testing.T) {

tests := []InstrTest{
{
name: "all instrumentations",
Expand Down Expand Up @@ -959,6 +958,38 @@ func TestTracesInstrumentations(t *testing.T) {
}
}

func TestTracesAttrReuse(t *testing.T) {
tests := []struct {
name string
span request.Span
same bool
}{
{
name: "Reuses the trace attributes, with svc.UID defined",
span: request.Span{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200},
same: true,
},
{
name: "No UID, no caching of trace attributes",
span: request.Span{ServiceID: svc.ID{}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200},
same: false,
},
{
name: "No ServiceID, no caching of trace attributes",
span: request.Span{Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200},
same: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
attr1 := traceAppResourceAttrs("123", &tt.span.ServiceID)
attr2 := traceAppResourceAttrs("123", &tt.span.ServiceID)
assert.Equal(t, tt.same, &attr1[0] == &attr2[0], tt.name)
})
}
}

type fakeInternalTraces struct {
imetrics.NoopReporter
sum atomic.Int32
Expand Down Expand Up @@ -1152,7 +1183,7 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques
assert.NoError(t, err)
for i := range spans {
span := &spans[i]
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
if span.IgnoreTraces() || !tr.acceptSpan(span) {
continue
}
res = append(res, GenerateTraces(span, "host-id", traceAttrs, []attribute.KeyValue{}))
Expand Down
4 changes: 0 additions & 4 deletions pkg/internal/ebpf/common/http2grpc_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/grafana/beyla/pkg/internal/ebpf/bhpack"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

type BPFHTTP2Info bpfHttp2GrpcRequestT
Expand Down Expand Up @@ -210,8 +209,6 @@ func readRetMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFram
return status, grpc, ok
}

var genericServiceID = svc.ID{SDKLanguage: svc.InstrumentableGeneric}

func http2InfoToSpan(info *BPFHTTP2Info, method, path, peer, host string, status int, protocol Protocol) request.Span {
return request.Span{
Type: info.eventType(protocol),
Expand All @@ -226,7 +223,6 @@ func http2InfoToSpan(info *BPFHTTP2Info, method, path, peer, host string, status
Start: int64(info.StartMonotimeNs),
End: int64(info.EndMonotimeNs),
Status: status,
ServiceID: genericServiceID, // set generic service to be overwritten later by the PID filters
TraceID: trace.TraceID(info.Tp.TraceId),
SpanID: trace.SpanID(info.Tp.SpanId),
ParentSpanID: trace.SpanID(info.Tp.ParentId),
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/ebpf/common/httpfltr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestToRequestTrace(t *testing.T) {
Start: 123456,
End: 789012,
HostPort: 1,
ServiceID: svc.ID{SDKLanguage: svc.InstrumentableGeneric},
ServiceID: svc.ID{},
}
assert.Equal(t, expected, result)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestToRequestTraceNoConnection(t *testing.T) {
End: 789012,
Status: 200,
HostPort: 7033,
ServiceID: svc.ID{SDKLanguage: svc.InstrumentableGeneric},
ServiceID: svc.ID{},
}
assert.Equal(t, expected, result)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestToRequestTrace_BadHost(t *testing.T) {
Start: 123456,
End: 789012,
HostPort: 0,
ServiceID: svc.ID{SDKLanguage: svc.InstrumentableGeneric},
ServiceID: svc.ID{},
}
assert.Equal(t, expected, result)

Expand Down
2 changes: 0 additions & 2 deletions pkg/internal/ebpf/common/httpfltr_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func HTTPInfoEventToSpan(event BPFHTTPInfo) (request.Span, bool, error) {
}
result.URL = event.url()
result.Method = event.method()
// set generic service to be overwritten later by the PID filters
result.Service = svc.ID{SDKLanguage: svc.InstrumentableGeneric}

return httpInfoToSpan(&result), false, nil
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/internal/infraolly/process/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/grafana/beyla/pkg/internal/helpers"
)

const unknown string = "-"

// CPUInfo represents CPU usage statistics at a given point
type CPUInfo struct {
// User time of CPU, in seconds
Expand Down Expand Up @@ -153,19 +155,24 @@ func (pw *linuxProcess) Username() (string, error) {
// try to get it from gopsutil and return it if ok
pw.user, err = pw.process.Username()
if err == nil {
if pw.user == "" {
pw.user = unknown
}
return pw.user, nil
}

// get the uid to be retrieved from getent
uid, err := pw.uid()
if err != nil {
return "", err
pw.user = unknown
return pw.user, err
}

// try to get it using getent
pw.user, err = usernameFromGetent(uid)
if err != nil {
return "", err
pw.user = unknown
return pw.user, err
}
}
return pw.user, nil
Expand Down
54 changes: 39 additions & 15 deletions pkg/internal/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,27 @@ func (t EventType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}

type IgnoreMode uint8
type ignoreMode uint8

const (
IgnoreMetrics IgnoreMode = iota + 1
IgnoreTraces
ignoreMetrics ignoreMode = 0x1
ignoreTraces ignoreMode = 0x2
)

func (m IgnoreMode) String() string {
switch m {
case IgnoreMetrics:
return "Metrics"
case IgnoreTraces:
return "Traces"
case 0:
return "(none)"
default:
return fmt.Sprintf("UNKNOWN (%d)", m)
func (m ignoreMode) String() string {
result := ""

if (m & ignoreMetrics) == ignoreMetrics {
result += "Metrics"
}
if (m & ignoreTraces) == ignoreTraces {
result += "Traces"
}

return result
}

func (m IgnoreMode) MarshalText() ([]byte, error) {
func (m ignoreMode) MarshalText() ([]byte, error) {
return []byte(m.String()), nil
}

Expand Down Expand Up @@ -113,7 +113,7 @@ type PidInfo struct {
// SpanPromGetters and getDefinitions in pkg/export/attributes/attr_defs.go
type Span struct {
Type EventType `json:"type"`
IgnoreSpan IgnoreMode `json:"ignoreSpan"`
IgnoreSpan ignoreMode `json:"ignoreSpan"`
Method string `json:"-"`
Path string `json:"-"`
Route string `json:"-"`
Expand Down Expand Up @@ -289,6 +289,30 @@ func (s *Span) IsClientSpan() bool {
return false
}

func (s *Span) setIgnoreFlag(flag ignoreMode) {
s.IgnoreSpan |= flag
}

func (s *Span) isIgnored(flag ignoreMode) bool {
return (s.IgnoreSpan & flag) == flag
}

func (s *Span) SetIgnoreMetrics() {
s.setIgnoreFlag(ignoreMetrics)
}

func (s *Span) SetIgnoreTraces() {
s.setIgnoreFlag(ignoreTraces)
}

func (s *Span) IgnoreMetrics() bool {
return s.isIgnored(ignoreMetrics)
}

func (s *Span) IgnoreTraces() bool {
return s.isIgnored(ignoreTraces)
}

func SpanStatusCode(span *Span) codes.Code {
switch span.Type {
case EventTypeHTTP, EventTypeHTTPClient:
Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/request/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestEventTypeString(t *testing.T) {
}

func TestIgnoreModeString(t *testing.T) {
modeStringMap := map[IgnoreMode]string{
IgnoreMetrics: "Metrics",
IgnoreTraces: "Traces",
IgnoreMode(0): "(none)",
IgnoreMode(99): "UNKNOWN (99)",
modeStringMap := map[ignoreMode]string{
ignoreMetrics: "Metrics",
ignoreTraces: "Traces",
ignoreMode(0): "",
ignoreMode(ignoreTraces | ignoreMetrics): "MetricsTraces",
}

for mode, str := range modeStringMap {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestSerializeJSONSpans(t *testing.T) {
test := func(t *testing.T, tData *testData) {
span := Span{
Type: tData.eventType,
IgnoreSpan: IgnoreMetrics,
IgnoreSpan: ignoreMetrics,
Method: "method",
Path: "path",
Route: "route",
Expand Down
Loading

0 comments on commit 3fd9525

Please sign in to comment.