From 8842b600cb442826f130b3cd08e910ee1f2b2e8f Mon Sep 17 00:00:00 2001 From: Wassim Dhif Date: Wed, 3 Jul 2024 16:45:38 +0200 Subject: [PATCH] feat(dogstatsd): implement local data header (#27155) Signed-off-by: Wassim DHIF --- comp/dogstatsd/server/parse.go | 94 +++++++++++++----- comp/dogstatsd/server/parse_events.go | 4 +- comp/dogstatsd/server/parse_service_checks.go | 4 +- comp/dogstatsd/server/parse_test.go | 99 +++++++++++++++++-- 4 files changed, 165 insertions(+), 36 deletions(-) diff --git a/comp/dogstatsd/server/parse.go b/comp/dogstatsd/server/parse.go index 220bc5a62055d8..38dd565a390d08 100644 --- a/comp/dogstatsd/server/parse.go +++ b/comp/dogstatsd/server/parse.go @@ -36,11 +36,19 @@ var ( colonSeparator = []byte(":") commaSeparator = []byte(",") - // containerIDFieldPrefix is the prefix for a common field holding the sender's container ID - containerIDFieldPrefix = []byte("c:") - - // containerInodeFieldPrefix is the prefix for a notation holding the sender's container Inode in the containerIDField - containerIDFieldInodePrefix = []byte("in-") + // LocalDataPrefix is the prefix for a common field which contains the local data for Origin Detection. + // The Local Data is a list that can contain one or two (split by a ',') of either: + // * "cid-" or "ci-" for the container ID. + // * "in-" for the cgroupv2 inode. + // Possible values: + // * "cid-" + // * "ci-,in-" + LocalDataPrefix = []byte("c:") + + // containerIDPrefix is the prefix for a notation holding the sender's container Inode in the containerIDField + containerIDPrefix = []byte("ci-") + // inodePrefix is the prefix for a notation holding the sender's container Inode in the containerIDField + inodePrefix = []byte("in-") ) // parser parses dogstatsd messages @@ -194,8 +202,8 @@ func (p *parser) parseMetricSample(message []byte) (dogstatsdMetricSample, error } timestamp = time.Unix(ts, 0) // container ID - case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix): - containerID = p.extractContainerID(optionalField) + case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix): + containerID = p.resolveContainerIDFromLocalData(optionalField) } } @@ -249,28 +257,66 @@ func (p *parser) parseFloat64List(rawFloats []byte) ([]float64, error) { return values, nil } -// extractContainerID parses the value of the container ID field. -// If the field is prefixed by `in-`, it corresponds to the cgroup controller's inode of the source -// and is used for ContainerID resolution. -func (p *parser) extractContainerID(rawContainerIDField []byte) []byte { - containerIDField := rawContainerIDField[len(containerIDFieldPrefix):] +// resolveContainerIDFromLocalData returns the container ID for the given Local Data. +// The Local Data is a list that can contain one or two (split by a ',') of either: +// * "ci-" for the container ID. +// * "in-" for the cgroupv2 inode. +// Possible values: +// * "" +// * "ci-" +// * "ci-,in-" +func (p *parser) resolveContainerIDFromLocalData(RawLocalData []byte) []byte { + // Remove prefix from Local Data + LocalData := RawLocalData[len(LocalDataPrefix):] - if bytes.HasPrefix(containerIDField[:len(containerIDFieldInodePrefix)], containerIDFieldInodePrefix) { - inodeField, err := strconv.ParseUint(string(containerIDField[len(containerIDFieldPrefix)+1:]), 10, 64) - if err != nil { - log.Debugf("Failed to parse inode from %s, got %v", containerIDField, err) - return nil + var containerID []byte + var containerIDFromInode []byte + + if bytes.Contains(LocalData, []byte(",")) { + // The Local Data can contain a list + items := bytes.Split(LocalData, []byte{','}) + for _, item := range items { + if bytes.HasPrefix(item, containerIDPrefix) { + containerID = item[len(containerIDPrefix):] + } else if bytes.HasPrefix(item, inodePrefix) { + containerIDFromInode = p.resolveContainerIDFromInode(item[len(inodePrefix):]) + } } - - containerID, err := p.provider.GetMetaCollector().GetContainerIDForInode(inodeField, cacheValidity) - if err != nil { - log.Debugf("Failed to get container ID, got %v", err) - return nil + if containerID == nil { + containerID = containerIDFromInode + } + } else { + // The Local Data can contain a single value + if bytes.HasPrefix(LocalData, containerIDPrefix) { // Container ID with new format: ci- + containerID = LocalData[len(containerIDPrefix):] + } else if bytes.HasPrefix(LocalData, inodePrefix) { // Cgroupv2 inode format: in- + containerID = p.resolveContainerIDFromInode(LocalData[len(inodePrefix):]) + } else { // Container ID with old format: + containerID = LocalData } - return []byte(containerID) } - return containerIDField + if containerID == nil { + log.Debugf("Could not parse container ID from Local Data: %s", LocalData) + } + + return containerID +} + +// resolveContainerIDFromInode returns the container ID for the given cgroupv2 inode. +func (p *parser) resolveContainerIDFromInode(inode []byte) []byte { + inodeField, err := strconv.ParseUint(string(inode), 10, 64) + if err != nil { + log.Debugf("Failed to parse inode from %s, got %v", inode, err) + return nil + } + + containerID, err := p.provider.GetMetaCollector().GetContainerIDForInode(inodeField, cacheValidity) + if err != nil { + log.Debugf("Failed to get container ID, got %v", err) + return nil + } + return []byte(containerID) } // the std API does not have methods to do []byte => float parsing diff --git a/comp/dogstatsd/server/parse_events.go b/comp/dogstatsd/server/parse_events.go index 5e3430dd908646..d0024be5b93504 100644 --- a/comp/dogstatsd/server/parse_events.go +++ b/comp/dogstatsd/server/parse_events.go @@ -163,8 +163,8 @@ func (p *parser) applyEventOptionalField(event dogstatsdEvent, optionalField []b newEvent.alertType, err = parseEventAlertType(optionalField[len(eventAlertTypePrefix):]) case bytes.HasPrefix(optionalField, eventTagsPrefix): newEvent.tags = p.parseTags(optionalField[len(eventTagsPrefix):]) - case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix): - newEvent.containerID = p.extractContainerID(optionalField) + case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix): + newEvent.containerID = p.resolveContainerIDFromLocalData(optionalField) } if err != nil { return event, err diff --git a/comp/dogstatsd/server/parse_service_checks.go b/comp/dogstatsd/server/parse_service_checks.go index 140658822f1074..889b4f08d6390e 100644 --- a/comp/dogstatsd/server/parse_service_checks.go +++ b/comp/dogstatsd/server/parse_service_checks.go @@ -97,8 +97,8 @@ func (p *parser) applyServiceCheckOptionalField(serviceCheck dogstatsdServiceChe newServiceCheck.tags = p.parseTags(optionalField[len(serviceCheckTagsPrefix):]) case bytes.HasPrefix(optionalField, serviceCheckMessagePrefix): newServiceCheck.message = string(optionalField[len(serviceCheckMessagePrefix):]) - case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix): - newServiceCheck.containerID = p.extractContainerID(optionalField) + case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix): + newServiceCheck.containerID = p.resolveContainerIDFromLocalData(optionalField) } if err != nil { return serviceCheck, err diff --git a/comp/dogstatsd/server/parse_test.go b/comp/dogstatsd/server/parse_test.go index f4fb88ea56a8d7..5e6e81632e8a75 100644 --- a/comp/dogstatsd/server/parse_test.go +++ b/comp/dogstatsd/server/parse_test.go @@ -112,21 +112,104 @@ func TestUnsafeParseInt(t *testing.T) { assert.Equal(t, integer, unsafeInteger) } -func TestExtractContainerID(t *testing.T) { +func TestResolveContainerIDFromLocalData(t *testing.T) { + const ( + LocalDataPrefix = "c:" + containerIDPrefix = "ci-" + inodePrefix = "in-" + containerID = "abcdef" + containerInode = "4242" + ) + deps := newServerDeps(t) stringInternerTelemetry := newSiTelemetry(false, deps.Telemetry) p := newParser(deps.Config, newFloat64ListPool(deps.Telemetry), 1, deps.WMeta, stringInternerTelemetry) - // Testing with a container ID - containerID := p.extractContainerID([]byte("c:1234567890abcdef")) - assert.Equal(t, []byte("1234567890abcdef"), containerID) - // Testing with an Inode + + // Mock the provider to resolve the container ID from the inode mockProvider := mock.NewMetricsProvider() + containerInodeUint, _ := strconv.ParseUint(containerInode, 10, 64) mockProvider.RegisterMetaCollector(&mock.MetaCollector{ CIDFromInode: map[uint64]string{ - 1234567890: "1234567890abcdef", + containerInodeUint: containerID, }, }) p.provider = mockProvider - containerIDFromInode := p.extractContainerID([]byte("c:in-1234567890")) - assert.Equal(t, []byte("1234567890abcdef"), containerIDFromInode) + + tests := []struct { + name string + input []byte + expected []byte + }{ + { + name: "Empty LocalData", + input: []byte(LocalDataPrefix), + expected: []byte{}, + }, + { + name: "LocalData with new container ID", + input: []byte(LocalDataPrefix + containerIDPrefix + containerID), + expected: []byte(containerID), + }, + { + name: "LocalData with old container ID format", + input: []byte(LocalDataPrefix + containerID), + expected: []byte(containerID), + }, + { + name: "LocalData with inode", + input: []byte(LocalDataPrefix + inodePrefix + containerInode), + expected: []byte(containerID), + }, + { + name: "LocalData with invalid inode", + input: []byte(LocalDataPrefix + inodePrefix + "invalid"), + expected: []byte(nil), + }, + { + name: "LocalData as a list", + input: []byte(LocalDataPrefix + containerIDPrefix + containerID + "," + inodePrefix + containerInode), + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only inode", + input: []byte(LocalDataPrefix + inodePrefix + containerInode), + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only container ID", + input: []byte(LocalDataPrefix + containerIDPrefix + containerID), + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only inode with trailing comma", + input: []byte(LocalDataPrefix + inodePrefix + containerInode + ","), + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only container ID with trailing comma", + input: []byte(LocalDataPrefix + containerIDPrefix + containerID + ","), + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only inode surrounded by commas", + input: []byte(LocalDataPrefix + "," + inodePrefix + containerInode + ","), // This is an invalid format, but we should still be able to extract the container ID + expected: []byte(containerID), + }, + { + name: "LocalData as a list with only inode surrounded by commas", + input: []byte(LocalDataPrefix + "," + containerIDPrefix + containerID + ","), // This is an invalid format, but we should still be able to extract the container ID + expected: []byte(containerID), + }, + { + name: "LocalData as an invalid list", + input: []byte(LocalDataPrefix + ","), + expected: []byte(nil), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, p.resolveContainerIDFromLocalData(tc.input)) + }) + } }