Skip to content

Commit

Permalink
Fix metrics hints builder to avoid wrong container metadata usage whe…
Browse files Browse the repository at this point in the history
…n port is not exposed (#18979)
  • Loading branch information
vjsamuel authored Jun 16, 2020
1 parent 81b0c3a commit 2f7b501
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed {pull}18979[18979]

*Auditbeat*

Expand Down
72 changes: 55 additions & 17 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
}
}

// Pass annotations to all events so that it can be used in templating and by annotation builders.
var (
annotations = common.MapStr{}
nsAnn = common.MapStr{}
)
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}

if p.namespaceWatcher != nil {
if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnn, k, v)
}
}
}
}

emitted := 0
// Emit container and port information
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
Expand All @@ -301,39 +321,27 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
kubemeta["annotations"] = annotations
if p.namespaceWatcher != nil {
if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
nsAnn := common.MapStr{}

for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnn, k, v)
}
kubemeta["namespace_annotations"] = nsAnn
}
}
if len(nsAnn) != 0 {
kubemeta["namespace_annotations"] = nsAnn
}

// Without this check there would be overlapping configurations with and without ports.
if len(c.Ports) == 0 {
// Set a zero port on the event to signify that the event is from a container
event := bus.Event{
"provider": p.uuid,
"id": eventID,
flag: true,
"host": host,
"port": 0,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)
emitted++
}

for _, port := range c.Ports {
Expand All @@ -349,6 +357,36 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
},
}
p.publish(event)
emitted++
}
}

// Finally publish a pod level event so that hints that have no exposed ports can get processed.
// Log hints would just ignore this event as there is no ${data.container.id}
// Publish the pod level hint only if atleast one container level hint was emitted. This ensures that there is
// no unnecessary pod level events emitted prematurely.
if emitted != 0 {
meta := p.metagen.Generate(pod)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["annotations"] = annotations
if len(nsAnn) != 0 {
kubemeta["namespace_annotations"] = nsAnn
}

// Don't set a port on the event
event := bus.Event{
"provider": p.uuid,
"id": fmt.Sprint(pod.GetObjectMeta().GetUID()),
flag: true,
"host": host,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)

}
}
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"port": 0,
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
Expand Down Expand Up @@ -525,6 +526,7 @@ func TestEmitEvent(t *testing.T) {
"stop": true,
"host": "",
"id": cid,
"port": 0,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
Expand Down Expand Up @@ -593,6 +595,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"stop": true,
"host": "127.0.0.1",
"port": 0,
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
Expand Down
30 changes: 19 additions & 11 deletions metricbeat/autodiscover/builder/hints/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
type metricHints struct {
Key string
Registry *mb.Register

logger *logp.Logger
}

// NewMetricHints builds a new metrics builder based on hints
Expand All @@ -67,18 +69,24 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) {
return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err)
}

return &metricHints{config.Key, config.Registry}, nil
return &metricHints{config.Key, config.Registry, logp.NewLogger("hints.builder")}, nil
}

// Create configs based on hints passed from providers
func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config {
var config []*common.Config
var (
config []*common.Config
noPort bool
)
host, _ := event["host"].(string)
if host == "" {
return config
}

port, _ := common.TryToInt(event["port"])
port, ok := common.TryToInt(event["port"])
if !ok {
noPort = true
}

hints, ok := event["hints"].(common.MapStr)
if !ok {
Expand All @@ -105,7 +113,7 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c
return config
}

hosts, ok := m.getHostsWithPort(hints, port)
hosts, ok := m.getHostsWithPort(hints, port, noPort)
if !ok {
return config
}
Expand Down Expand Up @@ -144,14 +152,14 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c
moduleConfig["password"] = password
}

logp.Debug("hints.builder", "generated config: %v", moduleConfig)
m.logger.Debug("generated config: %v", moduleConfig)

// Create config object
cfg, err := common.NewConfigFrom(moduleConfig)
if err != nil {
logp.Debug("hints.builder", "config merge failed with error: %v", err)
logp.Debug("", "config merge failed with error: %v", err)
}
logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true))
m.logger.Debug("generated config: %+v", common.DebugString(cfg, true))
config = append(config, cfg)

// Apply information in event to the template to generate the final config
Expand Down Expand Up @@ -181,22 +189,22 @@ func (m *metricHints) getMetricSets(hints common.MapStr, module string) []string
return msets
}

func (m *metricHints) getHostsWithPort(hints common.MapStr, port int) ([]string, bool) {
func (m *metricHints) getHostsWithPort(hints common.MapStr, port int, noPort bool) ([]string, bool) {
var result []string
thosts := builder.GetHintAsList(hints, m.Key, hosts)

// Only pick hosts that have ${data.port} or the port on current event. This will make
// sure that incorrect meta mapping doesn't happen
for _, h := range thosts {
if strings.Contains(h, "data.port") || m.checkHostPort(h, port) ||
if strings.Contains(h, "data.port") && port != 0 && !noPort || m.checkHostPort(h, port) ||
// Use the event that has no port config if there is a ${data.host}:9090 like input
(port == 0 && strings.Contains(h, "data.host")) {
(noPort && strings.Contains(h, "data.host")) {
result = append(result, h)
}
}

if len(thosts) > 0 && len(result) == 0 {
logp.Debug("hints.builder", "no hosts selected for port %d with hints: %+v", port, thosts)
m.logger.Debug("no hosts selected for port %d with hints: %+v", port, thosts)
return nil, false
}

Expand Down
18 changes: 18 additions & 0 deletions metricbeat/autodiscover/builder/hints/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
)

Expand Down Expand Up @@ -266,6 +267,21 @@ func TestGenerateHints(t *testing.T) {
},
},
},
{
message: "Module with data.host defined and a zero port should not return a config",
event: bus.Event{
"host": "1.2.3.4",
"port": 0,
"hints": common.MapStr{
"metrics": common.MapStr{
"module": "mockmoduledefaults",
"namespace": "test",
"hosts": "${data.host}:9090",
},
},
},
len: 0,
},
{
message: "Module, namespace, host hint should return valid config",
event: bus.Event{
Expand Down Expand Up @@ -340,6 +356,7 @@ func TestGenerateHints(t *testing.T) {
m := metricHints{
Key: defaultConfig().Key,
Registry: mockRegister,
logger: logp.NewLogger("hints.builder"),
}
cfgs := m.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len)
Expand Down Expand Up @@ -413,6 +430,7 @@ func TestGenerateHintsDoesNotAccessGlobalKeystore(t *testing.T) {
m := metricHints{
Key: defaultConfig().Key,
Registry: mockRegister,
logger: logp.NewLogger("hints.builder"),
}
cfgs := m.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len)
Expand Down

0 comments on commit 2f7b501

Please sign in to comment.