diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go index 0a439afd5da..934fcaf82e7 100644 --- a/internal/pkg/composable/providers/kubernetes/hints.go +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -26,6 +26,9 @@ const ( username = "username" password = "password" stream = "stream" // this is the container stream: stdout/stderr + + hints = "hints" + processors = "processors" ) type hintsBuilder struct { @@ -250,83 +253,66 @@ func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger return hintsMapping } -// Generates the hints and processor mappings from provided pod annotation map +// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData { hintData := hintsData{ composableMapping: mapstr.M{}, processors: []mapstr.M{}, } - var hints mapstr.M - var containerProcessors []mapstr.M - - if ann, ok := k8sMapping["annotations"]; ok { - annotations, _ := ann.(mapstr.M) - - if containerEntries, err := annotations.GetValue(prefix + ".hints"); err == nil { - entries, ok := containerEntries.(mapstr.M) - if ok && len(entries) > 0 { - for key := range entries { - parts := strings.Split(key, "/") - - if len(parts) > 1 { - if con, ok := k8sMapping["container"]; ok { - containers, ok := con.(mapstr.M) - if ok { - if cname, err := containers.GetValue("name"); err == nil { - if parts[0] == cname { - // If there are hints like co.elastic.hints./ then add the values after the / to the corresponding container - // Eg Annotation "co.elastic.hints.nginx/stream: stderr" will create a hints entry for container nginx - hints, containerProcessors = GenerateHintsForContainer(annotations, parts[0], prefix) - } - } - } - } - } - } - } - } else { - // If there are top level hints like co.elastic.hints/ then just add the values after the / - // Eg. Annotation "co.elastic.hints/stream: stderr" will will create a hints entries for all containers in the pod - hints = utils.GenerateHints(annotations, "", prefix) + + ann, ok := k8sMapping["annotations"] + if !ok { + return hintData + } + annotations, _ := ann.(mapstr.M) + + cName := "" + cHost := "" + // Get the name of the container from the metadata. We need it to extract the hints that affect it directly. + // E.g. co.elastic.hints./host: "..." + if con, ok := k8sMapping["container"]; ok { + containers, _ := con.(mapstr.M) + if name, err := containers.GetValue("name"); err == nil { + cName = name.(string) + } + if cPort, err := containers.GetValue("port"); err == nil { + // This is the default for the host value of a specific container. + cHost = "${kubernetes.pod.ip}:" + cPort.(string) } - logger.Debugf("Extracted hints are :%v", hints) + } - if len(hints) > 0 { - hintData = GenerateHintsResult(hints, k8sMapping, annotations, logger, prefix, cID) + hintsExtracted := utils.GenerateHints(annotations, cName, prefix) + if len(hintsExtracted) == 0 { + return hintData + } - // Only if there are processors defined in a specific container we append them to the processors of the top level - if len(containerProcessors) > 0 { - hintData.processors = append(hintData.processors, containerProcessors...) + // Check if host exists. Otherwise, add default entry for it. + if cHost != "" { + if hintsValues, ok := hintsExtracted[hints].(mapstr.M); ok { + if _, ok := hintsValues[host]; !ok { + hintsValues[host] = cHost + } + } else { + hintsExtracted[hints] = mapstr.M{ + host: cHost, } - logger.Debugf("Generated Processors mapping :%v", hintData.processors) } } - return hintData -} - -// Generates hints and processors list for specific containers -func GenerateHintsForContainer(annotations mapstr.M, parts, prefix string) (mapstr.M, []mapstr.M) { - hints := utils.GenerateHints(annotations, parts, prefix) - // Processors for specific container - // We need to make an extra check if we have processors added only to the specific containers - containerProcessors := utils.GetConfigs(annotations, prefix, "hints."+parts+"/processors") - - return hints, containerProcessors -} + logger.Debugf("Extracted hints are :%v", hintsExtracted) -// Generates the final hintData (hints and processors) struct that will be emitted in pods. -func GenerateHintsResult(hints mapstr.M, k8sMapping map[string]interface{}, annotations mapstr.M, logger *logp.Logger, prefix, cID string) hintsData { - hintData := hintsData{ - composableMapping: mapstr.M{}, - processors: []mapstr.M{}, - } - - hintData.composableMapping = GenerateHintsMapping(hints, k8sMapping, logger, cID) + hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID) logger.Debugf("Generated hints mappings :%v", hintData.composableMapping) - // Eg co.elastic.hints/processors.decode_json_fields.fields: "message" will add a processor in all containers of pod - hintData.processors = utils.GetConfigs(annotations, prefix, processorhints) + hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors) + // We need to check the processors for the specific container, if they exist. + if cName != "" { + containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors) + if len(containerProcessors) > 0 { + hintData.processors = append(hintData.processors, containerProcessors...) + } + } + logger.Debugf("Generated Processors mapping :%v", hintData.processors) return hintData } diff --git a/internal/pkg/composable/providers/kubernetes/hints_test.go b/internal/pkg/composable/providers/kubernetes/hints_test.go index 17e36b0d7c7..0cc709acaf7 100644 --- a/internal/pkg/composable/providers/kubernetes/hints_test.go +++ b/internal/pkg/composable/providers/kubernetes/hints_test.go @@ -591,3 +591,134 @@ func TestGenerateHintsMappingWithProcessorsForContainer(t *testing.T) { assert.Contains(t, expectedprocesors, hintData.processors[1]) } } + +func TestDefaultHost(t *testing.T) { + logger := getLogger() + cID := "abcd" + + mapping := map[string]interface{}{ + "namespace": "testns", + "pod": mapstr.M{ + "uid": string(types.UID(uid)), + "name": "testpod", + "ip": "127.0.0.5", + }, + "annotations": mapstr.M{ + "app": "production", + "co": mapstr.M{ + "elastic": mapstr.M{ + "hints/package": "redis", + "hints": mapstr.M{ + "redis-1/host": "${kubernetes.pod.ip}:6379", + "redis-1/stream": "stderr", + "redis-2/host": "${kubernetes.pod.ip}:6400", + "redis-4/stream": "stderr", + }, + }, + }, + }, + } + + addContainerMapping := func(mapping map[string]interface{}, container mapstr.M) map[string]interface{} { + clone := make(map[string]interface{}, len(mapping)) + for k, v := range mapping { + clone[k] = v + } + clone["container"] = container + return clone + } + + tests := []struct { + msg string + mapping map[string]interface{} + expected mapstr.M + }{ + { + msg: "Test container with two hints (redis-1), of which one is host.", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-1", + "port": "6379", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:6379", + "stream": "stderr", + }, + }, + }, + { + msg: "Test container with only one hint for host (redis-2).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-2", + "port": "6400", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:6400", + }, + }, + }, + { + msg: "Test container without hints and check for the default host (redis-3).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-3", + "port": "7000", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:7000", + }, + }, + }, + { + msg: "Test container with one hint for stream and without port defined (redis-4).", + mapping: addContainerMapping(mapping, + mapstr.M{ + "name": "redis-4", + "id": cID, + }, + ), + expected: mapstr.M{ + "container_id": cID, + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "stream": "stderr", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + hintData := GetHintsMapping(test.mapping, logger, "co.elastic", cID) + assert.Equal(t, test.expected, hintData.composableMapping) + }) + } +} diff --git a/internal/pkg/composable/providers/kubernetes/pod.go b/internal/pkg/composable/providers/kubernetes/pod.go index 1769e793183..ffe817218e7 100644 --- a/internal/pkg/composable/providers/kubernetes/pod.go +++ b/internal/pkg/composable/providers/kubernetes/pod.go @@ -24,10 +24,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/composable" ) -const ( - processorhints = "hints/processors" -) - type pod struct { watcher kubernetes.Watcher nodeWatcher kubernetes.Watcher