diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c76fd55cda0..1a83e98204f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -133,6 +133,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945] - Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640] - Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640] +- Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed {pull}18979[18979] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index d671cf14a22..a01d89b47cb 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -275,6 +275,26 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet } } + // Pass annotations to all events so that it can be used in templating and by annotation builders. + var ( + annotations = common.MapStr{} + nsAnn = common.MapStr{} + ) + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + + if p.namespaceWatcher != nil { + if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil { + if namespace, ok := rawNs.(*kubernetes.Namespace); ok { + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(nsAnn, k, v) + } + } + } + } + + emitted := 0 // Emit container and port information for _, c := range containers { // If it doesn't have an ID, container doesn't exist in @@ -301,39 +321,27 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet // Information that can be used in discovering a workload kubemeta := meta.Clone() kubemeta["container"] = cmeta - - // Pass annotations to all events so that it can be used in templating and by annotation builders. - annotations := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) - } kubemeta["annotations"] = annotations - if p.namespaceWatcher != nil { - if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil { - if namespace, ok := rawNs.(*kubernetes.Namespace); ok { - nsAnn := common.MapStr{} - - for k, v := range namespace.GetAnnotations() { - safemapstr.Put(nsAnn, k, v) - } - kubemeta["namespace_annotations"] = nsAnn - } - } + if len(nsAnn) != 0 { + kubemeta["namespace_annotations"] = nsAnn } // Without this check there would be overlapping configurations with and without ports. if len(c.Ports) == 0 { + // Set a zero port on the event to signify that the event is from a container event := bus.Event{ "provider": p.uuid, "id": eventID, flag: true, "host": host, + "port": 0, "kubernetes": kubemeta, "meta": common.MapStr{ "kubernetes": meta, }, } p.publish(event) + emitted++ } for _, port := range c.Ports { @@ -349,6 +357,36 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet }, } p.publish(event) + emitted++ } } + + // Finally publish a pod level event so that hints that have no exposed ports can get processed. + // Log hints would just ignore this event as there is no ${data.container.id} + // Publish the pod level hint only if atleast one container level hint was emitted. This ensures that there is + // no unnecessary pod level events emitted prematurely. + if emitted != 0 { + meta := p.metagen.Generate(pod) + + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["annotations"] = annotations + if len(nsAnn) != 0 { + kubemeta["namespace_annotations"] = nsAnn + } + + // Don't set a port on the event + event := bus.Event{ + "provider": p.uuid, + "id": fmt.Sprint(pod.GetObjectMeta().GetUID()), + flag: true, + "host": host, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + } + p.publish(event) + + } } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index ce537523167..7e5d51b23b0 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -392,6 +392,7 @@ func TestEmitEvent(t *testing.T) { Expected: bus.Event{ "start": true, "host": "127.0.0.1", + "port": 0, "id": cid, "provider": UUID, "kubernetes": common.MapStr{ @@ -525,6 +526,7 @@ func TestEmitEvent(t *testing.T) { "stop": true, "host": "", "id": cid, + "port": 0, "provider": UUID, "kubernetes": common.MapStr{ "container": common.MapStr{ @@ -593,6 +595,7 @@ func TestEmitEvent(t *testing.T) { Expected: bus.Event{ "stop": true, "host": "127.0.0.1", + "port": 0, "id": cid, "provider": UUID, "kubernetes": common.MapStr{ diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index c90b4e55419..84ce8d65a86 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -56,6 +56,8 @@ const ( type metricHints struct { Key string Registry *mb.Register + + logger *logp.Logger } // NewMetricHints builds a new metrics builder based on hints @@ -67,18 +69,24 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) { return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err) } - return &metricHints{config.Key, config.Registry}, nil + return &metricHints{config.Key, config.Registry, logp.NewLogger("hints.builder")}, nil } // Create configs based on hints passed from providers func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config { - var config []*common.Config + var ( + config []*common.Config + noPort bool + ) host, _ := event["host"].(string) if host == "" { return config } - port, _ := common.TryToInt(event["port"]) + port, ok := common.TryToInt(event["port"]) + if !ok { + noPort = true + } hints, ok := event["hints"].(common.MapStr) if !ok { @@ -105,7 +113,7 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c return config } - hosts, ok := m.getHostsWithPort(hints, port) + hosts, ok := m.getHostsWithPort(hints, port, noPort) if !ok { return config } @@ -144,14 +152,14 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c moduleConfig["password"] = password } - logp.Debug("hints.builder", "generated config: %v", moduleConfig) + m.logger.Debug("generated config: %v", moduleConfig) // Create config object cfg, err := common.NewConfigFrom(moduleConfig) if err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) + logp.Debug("", "config merge failed with error: %v", err) } - logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + m.logger.Debug("generated config: %+v", common.DebugString(cfg, true)) config = append(config, cfg) // Apply information in event to the template to generate the final config @@ -181,22 +189,22 @@ func (m *metricHints) getMetricSets(hints common.MapStr, module string) []string return msets } -func (m *metricHints) getHostsWithPort(hints common.MapStr, port int) ([]string, bool) { +func (m *metricHints) getHostsWithPort(hints common.MapStr, port int, noPort bool) ([]string, bool) { var result []string thosts := builder.GetHintAsList(hints, m.Key, hosts) // Only pick hosts that have ${data.port} or the port on current event. This will make // sure that incorrect meta mapping doesn't happen for _, h := range thosts { - if strings.Contains(h, "data.port") || m.checkHostPort(h, port) || + if strings.Contains(h, "data.port") && port != 0 && !noPort || m.checkHostPort(h, port) || // Use the event that has no port config if there is a ${data.host}:9090 like input - (port == 0 && strings.Contains(h, "data.host")) { + (noPort && strings.Contains(h, "data.host")) { result = append(result, h) } } if len(thosts) > 0 && len(result) == 0 { - logp.Debug("hints.builder", "no hosts selected for port %d with hints: %+v", port, thosts) + m.logger.Debug("no hosts selected for port %d with hints: %+v", port, thosts) return nil, false } diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index f7159323640..3b306ac62c0 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/keystore" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -266,6 +267,21 @@ func TestGenerateHints(t *testing.T) { }, }, }, + { + message: "Module with data.host defined and a zero port should not return a config", + event: bus.Event{ + "host": "1.2.3.4", + "port": 0, + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + }, + }, + len: 0, + }, { message: "Module, namespace, host hint should return valid config", event: bus.Event{ @@ -340,6 +356,7 @@ func TestGenerateHints(t *testing.T) { m := metricHints{ Key: defaultConfig().Key, Registry: mockRegister, + logger: logp.NewLogger("hints.builder"), } cfgs := m.CreateConfig(test.event) assert.Equal(t, len(cfgs), test.len) @@ -413,6 +430,7 @@ func TestGenerateHintsDoesNotAccessGlobalKeystore(t *testing.T) { m := metricHints{ Key: defaultConfig().Key, Registry: mockRegister, + logger: logp.NewLogger("hints.builder"), } cfgs := m.CreateConfig(test.event) assert.Equal(t, len(cfgs), test.len)