Skip to content

Commit

Permalink
[pkg/stanza/adapter] Fix scope parsing (open-telemetry#32033)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
Fix:   open-telemetry#23387 
Adds grouping of logrecords based on scope

 - adds scope grouping along with scope name
 - adds integration test for benchmark

**Link to tracking Issue:**  open-telemetry#23387 

**Testing:**
1. Did some benchmark on this build, results below:
On main branch: `BenchmarkEmitterToConsumer`: ~3.12
On my branch: `BenchmarkEmitterToConsumer`: ~3.26
on my branch: `BenchmarkEmitterToConsumerScopeGroupping` (with 2 host
variation and 2 scope variation): ~4.2s
2. Added unit tests

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Dan Jaglowski <[email protected]>
  • Loading branch information
2 people authored and lokesh.balla committed May 5, 2024
1 parent f3203d9 commit e2b62bf
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 2 deletions.
27 changes: 27 additions & 0 deletions .chloggen/ft-scope-groupping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix missing scope name and group logs based on scope

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23387]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
20 changes: 18 additions & 2 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,35 @@ func (c *Converter) workerLoop() {
}

resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)

pLogs := plog.NewLogs()
var sl plog.ScopeLogs

for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs

resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
rl := pLogs.ResourceLogs().AppendEmpty()

rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())

scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(0)
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
}
convertInto(e, sl.LogRecords().AppendEmpty())
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/stanza/adapter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,53 @@ func complexEntriesForNDifferentHosts(count int, n int) []*entry.Entry {
return ret
}

func complexEntriesForNDifferentHostsMDifferentScopes(count int, n int, m int) []*entry.Entry {
ret := make([]*entry.Entry, count)
for i := 0; i < count; i++ {
for j := 0; j < m; j++ {
e := entry.New()
e.Severity = entry.Error
e.Resource = map[string]any{
"host": fmt.Sprintf("host-%d", i%n),
"bool": true,
"int": 123,
"double": 12.34,
"string": "hello",
"object": map[string]any{
"bool": true,
"int": 123,
"double": 12.34,
"string": "hello",
},
}
e.Body = map[string]any{
"bool": true,
"int": 123,
"double": 12.34,
"string": "hello",
"bytes": []byte("asdf"),
"object": map[string]any{
"bool": true,
"int": 123,
"double": 12.34,
"string": "hello",
"bytes": []byte("asdf"),
"object": map[string]any{
"bool": true,
"int": 123,
"double": 12.34,
"string": "hello",
"bytes": []byte("asdf"),
},
},
}
e.ScopeName = fmt.Sprintf("scope-%d", i%m)
ret[i] = e
}
}
return ret
}

func complexEntry() *entry.Entry {
e := entry.New()
e.Severity = entry.Error
Expand Down Expand Up @@ -320,6 +367,70 @@ func TestHashResource(t *testing.T) {
}
}

func TestAllConvertedEntriesScopeGrouping(t *testing.T) {
t.Parallel()

testcases := []struct {
numberOFScopes int
logsPerScope int
scopeName string
}{
{
numberOFScopes: 1,
logsPerScope: 100,
},
{
numberOFScopes: 2,
logsPerScope: 50,
},
}

for i, tc := range testcases {
tc := tc

t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(zap.NewNop())
converter.Start()
defer converter.Stop()

go func() {
entries := complexEntriesForNDifferentHostsMDifferentScopes(100, 1, tc.numberOFScopes)
assert.NoError(t, converter.Batch(entries))
}()

var (
timeoutTimer = time.NewTimer(10 * time.Second)
ch = converter.OutChannel()
)
defer timeoutTimer.Stop()

select {
case pLogs, ok := <-ch:
if !ok {
break
}

rLogs := pLogs.ResourceLogs()
rLog := rLogs.At(0)

ills := rLog.ScopeLogs()
require.Equal(t, ills.Len(), tc.numberOFScopes)

for i := 0; i < tc.numberOFScopes; i++ {
sl := ills.At(i)
require.Equal(t, sl.Scope().Name(), fmt.Sprintf("scope-%d", i%tc.numberOFScopes))
require.Equal(t, sl.LogRecords().Len(), tc.logsPerScope)
}

case <-timeoutTimer.C:
break
}
})
}
}

func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
t.Parallel()

Expand Down
39 changes: 39 additions & 0 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,45 @@ func BenchmarkEmitterToConsumer(b *testing.B) {
}
}

func BenchmarkEmitterToConsumerScopeGroupping(b *testing.B) {
const (
entryCount = 1_000_000
hostsCount = 2
scopesCount = 2
)

var (
entries = complexEntriesForNDifferentHostsMDifferentScopes(entryCount, hostsCount, scopesCount)
)

cl := &consumertest.LogsSink{}
logsReceiver, err := createNoopReceiver(cl)
require.NoError(b, err)

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {
cl.Reset()

go func() {
ctx := context.Background()
for _, e := range entries {
_ = logsReceiver.emitter.Process(ctx, e)
}
}()

require.Eventually(b,
func() bool {
return cl.LogRecordCount() == entryCount
},
30*time.Second, 5*time.Millisecond, "Did not receive all logs (only received %d)", cl.LogRecordCount(),
)
}
}

func TestEmitterToConsumer(t *testing.T) {
const (
entryCount = 1_000
Expand Down

0 comments on commit e2b62bf

Please sign in to comment.