Skip to content

Commit

Permalink
feat(dogstatsd): implement local data header (#27155)
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <[email protected]>
  • Loading branch information
wdhif authored Jul 3, 2024
1 parent 00e95ee commit 8842b60
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 36 deletions.
94 changes: 70 additions & 24 deletions comp/dogstatsd/server/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ var (
colonSeparator = []byte(":")
commaSeparator = []byte(",")

// containerIDFieldPrefix is the prefix for a common field holding the sender's container ID
containerIDFieldPrefix = []byte("c:")

// containerInodeFieldPrefix is the prefix for a notation holding the sender's container Inode in the containerIDField
containerIDFieldInodePrefix = []byte("in-")
// LocalDataPrefix is the prefix for a common field which contains the local data for Origin Detection.
// The Local Data is a list that can contain one or two (split by a ',') of either:
// * "cid-<container-id>" or "ci-<container-id>" for the container ID.
// * "in-<cgroupv2-inode>" for the cgroupv2 inode.
// Possible values:
// * "cid-<container-id>"
// * "ci-<container-id>,in-<cgroupv2-inode>"
LocalDataPrefix = []byte("c:")

// containerIDPrefix is the prefix for a notation holding the sender's container Inode in the containerIDField
containerIDPrefix = []byte("ci-")
// inodePrefix is the prefix for a notation holding the sender's container Inode in the containerIDField
inodePrefix = []byte("in-")
)

// parser parses dogstatsd messages
Expand Down Expand Up @@ -194,8 +202,8 @@ func (p *parser) parseMetricSample(message []byte) (dogstatsdMetricSample, error
}
timestamp = time.Unix(ts, 0)
// container ID
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix):
containerID = p.extractContainerID(optionalField)
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix):
containerID = p.resolveContainerIDFromLocalData(optionalField)
}
}

Expand Down Expand Up @@ -249,28 +257,66 @@ func (p *parser) parseFloat64List(rawFloats []byte) ([]float64, error) {
return values, nil
}

// extractContainerID parses the value of the container ID field.
// If the field is prefixed by `in-`, it corresponds to the cgroup controller's inode of the source
// and is used for ContainerID resolution.
func (p *parser) extractContainerID(rawContainerIDField []byte) []byte {
containerIDField := rawContainerIDField[len(containerIDFieldPrefix):]
// resolveContainerIDFromLocalData returns the container ID for the given Local Data.
// The Local Data is a list that can contain one or two (split by a ',') of either:
// * "ci-<container-id>" for the container ID.
// * "in-<cgroupv2-inode>" for the cgroupv2 inode.
// Possible values:
// * "<container-id>"
// * "ci-<container-id>"
// * "ci-<container-id>,in-<cgroupv2-inode>"
func (p *parser) resolveContainerIDFromLocalData(RawLocalData []byte) []byte {
// Remove prefix from Local Data
LocalData := RawLocalData[len(LocalDataPrefix):]

if bytes.HasPrefix(containerIDField[:len(containerIDFieldInodePrefix)], containerIDFieldInodePrefix) {
inodeField, err := strconv.ParseUint(string(containerIDField[len(containerIDFieldPrefix)+1:]), 10, 64)
if err != nil {
log.Debugf("Failed to parse inode from %s, got %v", containerIDField, err)
return nil
var containerID []byte
var containerIDFromInode []byte

if bytes.Contains(LocalData, []byte(",")) {
// The Local Data can contain a list
items := bytes.Split(LocalData, []byte{','})
for _, item := range items {
if bytes.HasPrefix(item, containerIDPrefix) {
containerID = item[len(containerIDPrefix):]
} else if bytes.HasPrefix(item, inodePrefix) {
containerIDFromInode = p.resolveContainerIDFromInode(item[len(inodePrefix):])
}
}

containerID, err := p.provider.GetMetaCollector().GetContainerIDForInode(inodeField, cacheValidity)
if err != nil {
log.Debugf("Failed to get container ID, got %v", err)
return nil
if containerID == nil {
containerID = containerIDFromInode
}
} else {
// The Local Data can contain a single value
if bytes.HasPrefix(LocalData, containerIDPrefix) { // Container ID with new format: ci-<container-id>
containerID = LocalData[len(containerIDPrefix):]
} else if bytes.HasPrefix(LocalData, inodePrefix) { // Cgroupv2 inode format: in-<cgroupv2-inode>
containerID = p.resolveContainerIDFromInode(LocalData[len(inodePrefix):])
} else { // Container ID with old format: <container-id>
containerID = LocalData
}
return []byte(containerID)
}

return containerIDField
if containerID == nil {
log.Debugf("Could not parse container ID from Local Data: %s", LocalData)
}

return containerID
}

// resolveContainerIDFromInode returns the container ID for the given cgroupv2 inode.
func (p *parser) resolveContainerIDFromInode(inode []byte) []byte {
inodeField, err := strconv.ParseUint(string(inode), 10, 64)
if err != nil {
log.Debugf("Failed to parse inode from %s, got %v", inode, err)
return nil
}

containerID, err := p.provider.GetMetaCollector().GetContainerIDForInode(inodeField, cacheValidity)
if err != nil {
log.Debugf("Failed to get container ID, got %v", err)
return nil
}
return []byte(containerID)
}

// the std API does not have methods to do []byte => float parsing
Expand Down
4 changes: 2 additions & 2 deletions comp/dogstatsd/server/parse_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func (p *parser) applyEventOptionalField(event dogstatsdEvent, optionalField []b
newEvent.alertType, err = parseEventAlertType(optionalField[len(eventAlertTypePrefix):])
case bytes.HasPrefix(optionalField, eventTagsPrefix):
newEvent.tags = p.parseTags(optionalField[len(eventTagsPrefix):])
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix):
newEvent.containerID = p.extractContainerID(optionalField)
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix):
newEvent.containerID = p.resolveContainerIDFromLocalData(optionalField)
}
if err != nil {
return event, err
Expand Down
4 changes: 2 additions & 2 deletions comp/dogstatsd/server/parse_service_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (p *parser) applyServiceCheckOptionalField(serviceCheck dogstatsdServiceChe
newServiceCheck.tags = p.parseTags(optionalField[len(serviceCheckTagsPrefix):])
case bytes.HasPrefix(optionalField, serviceCheckMessagePrefix):
newServiceCheck.message = string(optionalField[len(serviceCheckMessagePrefix):])
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, containerIDFieldPrefix):
newServiceCheck.containerID = p.extractContainerID(optionalField)
case p.dsdOriginEnabled && bytes.HasPrefix(optionalField, LocalDataPrefix):
newServiceCheck.containerID = p.resolveContainerIDFromLocalData(optionalField)
}
if err != nil {
return serviceCheck, err
Expand Down
99 changes: 91 additions & 8 deletions comp/dogstatsd/server/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,104 @@ func TestUnsafeParseInt(t *testing.T) {
assert.Equal(t, integer, unsafeInteger)
}

func TestExtractContainerID(t *testing.T) {
func TestResolveContainerIDFromLocalData(t *testing.T) {
const (
LocalDataPrefix = "c:"
containerIDPrefix = "ci-"
inodePrefix = "in-"
containerID = "abcdef"
containerInode = "4242"
)

deps := newServerDeps(t)
stringInternerTelemetry := newSiTelemetry(false, deps.Telemetry)
p := newParser(deps.Config, newFloat64ListPool(deps.Telemetry), 1, deps.WMeta, stringInternerTelemetry)
// Testing with a container ID
containerID := p.extractContainerID([]byte("c:1234567890abcdef"))
assert.Equal(t, []byte("1234567890abcdef"), containerID)
// Testing with an Inode

// Mock the provider to resolve the container ID from the inode
mockProvider := mock.NewMetricsProvider()
containerInodeUint, _ := strconv.ParseUint(containerInode, 10, 64)
mockProvider.RegisterMetaCollector(&mock.MetaCollector{
CIDFromInode: map[uint64]string{
1234567890: "1234567890abcdef",
containerInodeUint: containerID,
},
})
p.provider = mockProvider
containerIDFromInode := p.extractContainerID([]byte("c:in-1234567890"))
assert.Equal(t, []byte("1234567890abcdef"), containerIDFromInode)

tests := []struct {
name string
input []byte
expected []byte
}{
{
name: "Empty LocalData",
input: []byte(LocalDataPrefix),
expected: []byte{},
},
{
name: "LocalData with new container ID",
input: []byte(LocalDataPrefix + containerIDPrefix + containerID),
expected: []byte(containerID),
},
{
name: "LocalData with old container ID format",
input: []byte(LocalDataPrefix + containerID),
expected: []byte(containerID),
},
{
name: "LocalData with inode",
input: []byte(LocalDataPrefix + inodePrefix + containerInode),
expected: []byte(containerID),
},
{
name: "LocalData with invalid inode",
input: []byte(LocalDataPrefix + inodePrefix + "invalid"),
expected: []byte(nil),
},
{
name: "LocalData as a list",
input: []byte(LocalDataPrefix + containerIDPrefix + containerID + "," + inodePrefix + containerInode),
expected: []byte(containerID),
},
{
name: "LocalData as a list with only inode",
input: []byte(LocalDataPrefix + inodePrefix + containerInode),
expected: []byte(containerID),
},
{
name: "LocalData as a list with only container ID",
input: []byte(LocalDataPrefix + containerIDPrefix + containerID),
expected: []byte(containerID),
},
{
name: "LocalData as a list with only inode with trailing comma",
input: []byte(LocalDataPrefix + inodePrefix + containerInode + ","),
expected: []byte(containerID),
},
{
name: "LocalData as a list with only container ID with trailing comma",
input: []byte(LocalDataPrefix + containerIDPrefix + containerID + ","),
expected: []byte(containerID),
},
{
name: "LocalData as a list with only inode surrounded by commas",
input: []byte(LocalDataPrefix + "," + inodePrefix + containerInode + ","), // This is an invalid format, but we should still be able to extract the container ID
expected: []byte(containerID),
},
{
name: "LocalData as a list with only inode surrounded by commas",
input: []byte(LocalDataPrefix + "," + containerIDPrefix + containerID + ","), // This is an invalid format, but we should still be able to extract the container ID
expected: []byte(containerID),
},
{
name: "LocalData as an invalid list",
input: []byte(LocalDataPrefix + ","),
expected: []byte(nil),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, p.resolveContainerIDFromLocalData(tc.input))
})
}
}

0 comments on commit 8842b60

Please sign in to comment.