-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[chore] Add internal pdatautil.GroupByResourceLogs (#33311)
This is a first step towards a [Flatten/Unflatten workaround](#32080) for OTTL's data corruption issue, specifically this would support Unflatten. The workaround was discussed in a recent collector SIG and it sounded like it would be acceptable if available behind a feature gate and only for the transform processor. If this is accepted I'll work on a Flatten utility next, then integrate them into the tranform processor to prove end-to-end. Finally, I'll implement similar features for metrics and traces. This PR adds an internal utility package which simplifies grouping logs by resource and scope. I'm proposing this initially in `internal/pdatautil` but the functionality could eventually be merged into `pkg/pdatautil` if we find it useful elsewhere.
- Loading branch information
1 parent
6005ae8
commit 4f7a6b1
Showing
12 changed files
with
703 additions
and
0 deletions.
There are no files selected for viewing
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil | ||
|
||
go 1.21.0 | ||
|
||
require ( | ||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.101.0 | ||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.101.0 | ||
github.com/stretchr/testify v1.9.0 | ||
go.opentelemetry.io/collector/pdata v1.8.1-0.20240529223953-eaab76e46d38 | ||
) | ||
|
||
require ( | ||
github.com/cespare/xxhash/v2 v2.3.0 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
go.uber.org/multierr v1.11.0 // indirect | ||
golang.org/x/net v0.24.0 // indirect | ||
golang.org/x/sys v0.20.0 // indirect | ||
golang.org/x/text v0.15.0 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect | ||
google.golang.org/grpc v1.64.0 // indirect | ||
google.golang.org/protobuf v1.34.1 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package pdatautil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil" | ||
|
||
import ( | ||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" | ||
) | ||
|
||
// GroupByResourceLogs groups ScopeLogs by Resource. Modifications are made in place. | ||
func GroupByResourceLogs(rls plog.ResourceLogsSlice) { | ||
// Hash each ResourceLogs based on identifying information. | ||
resourceHashes := make([][16]byte, rls.Len()) | ||
for i := 0; i < rls.Len(); i++ { | ||
resourceHashes[i] = pdatautil.MapHash(rls.At(i).Resource().Attributes()) | ||
} | ||
|
||
// Find the first occurrence of each hash and note the index. | ||
firstScopeIndex := make([]int, rls.Len()) | ||
for i := 0; i < rls.Len(); i++ { | ||
firstScopeIndex[i] = i | ||
for j := 0; j < i; j++ { | ||
if resourceHashes[i] == resourceHashes[j] { | ||
firstScopeIndex[i] = j | ||
break | ||
} | ||
} | ||
} | ||
|
||
// Merge Resources with the same hash. | ||
for i := 0; i < rls.Len(); i++ { | ||
if i == firstScopeIndex[i] { | ||
// This is the first occurrence of this hash. | ||
continue | ||
} | ||
rls.At(i).ScopeLogs().MoveAndAppendTo(rls.At(firstScopeIndex[i]).ScopeLogs()) | ||
} | ||
|
||
// Remove the ResourceLogs which were merged onto others. | ||
i := 0 | ||
rls.RemoveIf(func(plog.ResourceLogs) bool { | ||
remove := i != firstScopeIndex[i] | ||
i++ | ||
return remove | ||
}) | ||
|
||
// Merge ScopeLogs within each ResourceLogs. | ||
for i := 0; i < rls.Len(); i++ { | ||
GroupByScopeLogs(rls.At(i).ScopeLogs()) | ||
} | ||
} | ||
|
||
// GroupByScopeLogs groups LogRecords by scope. Modifications are made in place. | ||
func GroupByScopeLogs(sls plog.ScopeLogsSlice) { | ||
// Hash each ScopeLogs based on identifying information. | ||
scopeHashes := make([][16]byte, sls.Len()) | ||
for i := 0; i < sls.Len(); i++ { | ||
scopeHashes[i] = HashScopeLogs(sls.At(i)) | ||
} | ||
|
||
// Find the first occurrence of each hash and note the index. | ||
firstScopeIndex := make([]int, sls.Len()) | ||
for i := 0; i < sls.Len(); i++ { | ||
firstScopeIndex[i] = i | ||
for j := 0; j < i; j++ { | ||
if scopeHashes[i] == scopeHashes[j] { | ||
firstScopeIndex[i] = j | ||
break | ||
} | ||
} | ||
} | ||
|
||
// Merge ScopeLogs with the same hash. | ||
for i := 0; i < sls.Len(); i++ { | ||
if i == firstScopeIndex[i] { | ||
// This is the first occurrence of this hash. | ||
continue | ||
} | ||
sls.At(i).LogRecords().MoveAndAppendTo(sls.At(firstScopeIndex[i]).LogRecords()) | ||
} | ||
|
||
// Remove the ScopeLogs which were merged onto others. | ||
i := 0 | ||
sls.RemoveIf(func(plog.ScopeLogs) bool { | ||
remove := i != firstScopeIndex[i] | ||
i++ | ||
return remove | ||
}) | ||
} | ||
|
||
// Creates a hash based on the ScopeLogs attributes, name, and version | ||
func HashScopeLogs(sl plog.ScopeLogs) [16]byte { | ||
scopeHash := pcommon.NewMap() | ||
scopeHash.PutStr("schema_url", sl.SchemaUrl()) | ||
scopeHash.PutStr("name", sl.Scope().Name()) | ||
scopeHash.PutStr("version", sl.Scope().Version()) | ||
attrHash := pdatautil.MapHash(sl.Scope().Attributes()) | ||
scopeHash.PutStr("attributes_hash", string(attrHash[:])) | ||
return pdatautil.MapHash(scopeHash) | ||
} |
Oops, something went wrong.