Skip to content

Commit

Permalink
Add default host.
Browse files Browse the repository at this point in the history
  • Loading branch information
constanca-m committed Oct 10, 2023
1 parent dcf2263 commit f2b5f50
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 66 deletions.
110 changes: 48 additions & 62 deletions internal/pkg/composable/providers/kubernetes/hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
username = "username"
password = "password"
stream = "stream" // this is the container stream: stdout/stderr

hints = "hints"
processors = "processors"
)

type hintsBuilder struct {
Expand Down Expand Up @@ -250,83 +253,66 @@ func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger
return hintsMapping
}

// Generates the hints and processor mappings from provided pod annotation map
// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map
func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}
var hints mapstr.M
var containerProcessors []mapstr.M

if ann, ok := k8sMapping["annotations"]; ok {
annotations, _ := ann.(mapstr.M)

if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil {
entries, ok := containerEntries.(mapstr.M)
if ok && len(entries) > 0 {
for key := range entries {
parts := strings.Split(key, "/")

if len(parts) > 1 {
if con, ok := k8sMapping["container"]; ok {
containers, ok := con.(mapstr.M)
if ok {
if cname, err := containers.GetValue("name"); err == nil {
if parts[0] == cname {
// If there are hints like co.elastic.hints.<container_name>/ then add the values after the / to the corresponding container
// Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx
hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix)
}
}
}
}
}
}
}
} else {
// If there are top level hints like co.elastic.hints/ then just add the values after the /
// Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod
hints = utils.GenerateHints(annotations, "", prefix)

ann, ok := k8sMapping["annotations"]
if !ok {
return hintData
}
annotations, _ := ann.(mapstr.M)

cName := ""
cHost := ""
// Get the name of the container from the metadata. We need it to extract the hints that affect it directly.
// E.g. co.elastic.hints.<container-name>/host: "..."
if con, ok := k8sMapping["container"]; ok {
containers, _ := con.(mapstr.M)
if name, err := containers.GetValue("name"); err == nil {
cName = name.(string)
}
if cPort, err := containers.GetValue("port"); err == nil {
// This is the default for the host value of a specific container.
cHost = "${kubernetes.pod.ip}:" + cPort.(string)
}
logger.Debugf("Extracted hints are :%v", hints)
}

if len(hints) > 0 {
hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID)
hintsExtracted := utils.GenerateHints(annotations, cName, prefix)
if len(hintsExtracted) == 0 {
return hintData
}

// Only if there are processors defined in a specific container we append them to the processors of the top level
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
// Check if host exists. Otherwise, add default entry for it.
if cHost != "" {
if hintsValues, ok := hintsExtracted[hints].(mapstr.M); ok {
if _, ok := hintsValues[host]; !ok {
hintsValues[host] = cHost
}
} else {
hintsExtracted[hints] = mapstr.M{
host: cHost,
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)
}
}

return hintData
}

// Generates hints and processors list for specific containers
func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) {
hints := utils.GenerateHints(annotations, parts, prefix)
// Processors for specific container
// We need to make an extra check if we have processors added only to the specific containers
containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors")

return hints, containerProcessors
}
logger.Debugf("Extracted hints are :%v", hintsExtracted)

// Generates the final hintData (hints and processors) struct that will be emitted in pods.
func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}

hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID)
hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings :%v", hintData.composableMapping)

// Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod
hintData.processors = utils.GetConfigs(annotations, prefix, processorhints)
hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors)
// We need to check the processors for the specific container, if they exist.
if cName != "" {
containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors)
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
}
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)

return hintData
}
131 changes: 131 additions & 0 deletions internal/pkg/composable/providers/kubernetes/hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,134 @@ func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) {
assert.Contains(t, expectedprocesors, hintData.processors[1])
}
}

func TestDefaultHost(t *testing.T) {
logger := getLogger()
cID := "abcd"

mapping := map[string]interface{}{
"namespace": "testns",
"pod": mapstr.M{
"uid": string(types.UID(uid)),
"name": "testpod",
"ip": "127.0.0.5",
},
"annotations": mapstr.M{
"app": "production",
"co": mapstr.M{
"elastic": mapstr.M{
"hints/package": "redis",
"hints": mapstr.M{
"redis-1/host": "${kubernetes.pod.ip}:6379",
"redis-1/stream": "stderr",
"redis-2/host": "${kubernetes.pod.ip}:6400",
"redis-4/stream": "stderr",
},
},
},
},
}

addContainerMapping := func(mapping map[string]interface{}, container mapstr.M) map[string]interface{} {
clone := make(map[string]interface{}, len(mapping))
for k, v := range mapping {
clone[k] = v
}
clone["container"] = container
return clone
}

tests := []struct {
msg string
mapping map[string]interface{}
expected mapstr.M
}{
{
msg: "Test container with two hints (redis-1), of which one is host.",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-1",
"port": "6379",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6379",
"stream": "stderr",
},
},
},
{
msg: "Test container with only one hint for host (redis-2).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-2",
"port": "6400",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:6400",
},
},
},
{
msg: "Test container without hints and check for the default host (redis-3).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-3",
"port": "7000",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"host": "127.0.0.5:7000",
},
},
},
{
msg: "Test container with one hint for stream and without port defined (redis-4).",
mapping: addContainerMapping(mapping,
mapstr.M{
"name": "redis-4",
"id": cID,
},
),
expected: mapstr.M{
"container_id": cID,
"redis": mapstr.M{
"container_logs": mapstr.M{
"enabled": true,
},
"enabled": true,
"stream": "stderr",
},
},
},
}

for _, test := range tests {
t.Run(test.msg, func(t *testing.T) {
hintData := GetHintsMapping(test.mapping, logger, "co.elastic", cID)
assert.Equal(t, test.expected, hintData.composableMapping)
})
}
}
4 changes: 0 additions & 4 deletions internal/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/composable"
)

const (
processorhints = "hints/processors"
)

type pod struct {
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
Expand Down

0 comments on commit f2b5f50

Please sign in to comment.