Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #18979 to 7.x: Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed #19233

Merged
merged 1 commit into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ field. You can revert this change by configuring tags for the module and omittin
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix metrics hints builder to avoid wrong container metadata usage when port is not exposed {pull}18979[18979]

*Auditbeat*

Expand Down
72 changes: 55 additions & 17 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
}
}

// Pass annotations to all events so that it can be used in templating and by annotation builders.
var (
annotations = common.MapStr{}
nsAnn = common.MapStr{}
)
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}

if p.namespaceWatcher != nil {
if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnn, k, v)
}
}
}
}

emitted := 0
// Emit container and port information
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
Expand All @@ -301,39 +321,27 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
kubemeta["annotations"] = annotations
if p.namespaceWatcher != nil {
if rawNs, ok, err := p.namespaceWatcher.Store().GetByKey(pod.Namespace); ok && err == nil {
if namespace, ok := rawNs.(*kubernetes.Namespace); ok {
nsAnn := common.MapStr{}

for k, v := range namespace.GetAnnotations() {
safemapstr.Put(nsAnn, k, v)
}
kubemeta["namespace_annotations"] = nsAnn
}
}
if len(nsAnn) != 0 {
kubemeta["namespace_annotations"] = nsAnn
}

// Without this check there would be overlapping configurations with and without ports.
if len(c.Ports) == 0 {
// Set a zero port on the event to signify that the event is from a container
event := bus.Event{
"provider": p.uuid,
"id": eventID,
flag: true,
"host": host,
"port": 0,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)
emitted++
}

for _, port := range c.Ports {
Expand All @@ -349,6 +357,36 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
},
}
p.publish(event)
emitted++
}
}

// Finally publish a pod level event so that hints that have no exposed ports can get processed.
// Log hints would just ignore this event as there is no ${data.container.id}
// Publish the pod level hint only if atleast one container level hint was emitted. This ensures that there is
// no unnecessary pod level events emitted prematurely.
if emitted != 0 {
meta := p.metagen.Generate(pod)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["annotations"] = annotations
if len(nsAnn) != 0 {
kubemeta["namespace_annotations"] = nsAnn
}

// Don't set a port on the event
event := bus.Event{
"provider": p.uuid,
"id": fmt.Sprint(pod.GetObjectMeta().GetUID()),
flag: true,
"host": host,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)

}
}
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"port": 0,
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
Expand Down Expand Up @@ -525,6 +526,7 @@ func TestEmitEvent(t *testing.T) {
"stop": true,
"host": "",
"id": cid,
"port": 0,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
Expand Down Expand Up @@ -593,6 +595,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"stop": true,
"host": "127.0.0.1",
"port": 0,
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
Expand Down
30 changes: 19 additions & 11 deletions metricbeat/autodiscover/builder/hints/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
type metricHints struct {
Key string
Registry *mb.Register

logger *logp.Logger
}

// NewMetricHints builds a new metrics builder based on hints
Expand All @@ -67,18 +69,24 @@ func NewMetricHints(cfg *common.Config) (autodiscover.Builder, error) {
return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err)
}

return &metricHints{config.Key, config.Registry}, nil
return &metricHints{config.Key, config.Registry, logp.NewLogger("hints.builder")}, nil
}

// Create configs based on hints passed from providers
func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config {
var config []*common.Config
var (
config []*common.Config
noPort bool
)
host, _ := event["host"].(string)
if host == "" {
return config
}

port, _ := common.TryToInt(event["port"])
port, ok := common.TryToInt(event["port"])
if !ok {
noPort = true
}

hints, ok := event["hints"].(common.MapStr)
if !ok {
Expand All @@ -105,7 +113,7 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c
return config
}

hosts, ok := m.getHostsWithPort(hints, port)
hosts, ok := m.getHostsWithPort(hints, port, noPort)
if !ok {
return config
}
Expand Down Expand Up @@ -144,14 +152,14 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c
moduleConfig["password"] = password
}

logp.Debug("hints.builder", "generated config: %v", moduleConfig)
m.logger.Debug("generated config: %v", moduleConfig)

// Create config object
cfg, err := common.NewConfigFrom(moduleConfig)
if err != nil {
logp.Debug("hints.builder", "config merge failed with error: %v", err)
logp.Debug("", "config merge failed with error: %v", err)
}
logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true))
m.logger.Debug("generated config: %+v", common.DebugString(cfg, true))
config = append(config, cfg)

// Apply information in event to the template to generate the final config
Expand Down Expand Up @@ -181,22 +189,22 @@ func (m *metricHints) getMetricSets(hints common.MapStr, module string) []string
return msets
}

func (m *metricHints) getHostsWithPort(hints common.MapStr, port int) ([]string, bool) {
func (m *metricHints) getHostsWithPort(hints common.MapStr, port int, noPort bool) ([]string, bool) {
var result []string
thosts := builder.GetHintAsList(hints, m.Key, hosts)

// Only pick hosts that have ${data.port} or the port on current event. This will make
// sure that incorrect meta mapping doesn't happen
for _, h := range thosts {
if strings.Contains(h, "data.port") || m.checkHostPort(h, port) ||
if strings.Contains(h, "data.port") && port != 0 && !noPort || m.checkHostPort(h, port) ||
// Use the event that has no port config if there is a ${data.host}:9090 like input
(port == 0 && strings.Contains(h, "data.host")) {
(noPort && strings.Contains(h, "data.host")) {
result = append(result, h)
}
}

if len(thosts) > 0 && len(result) == 0 {
logp.Debug("hints.builder", "no hosts selected for port %d with hints: %+v", port, thosts)
m.logger.Debug("no hosts selected for port %d with hints: %+v", port, thosts)
return nil, false
}

Expand Down
18 changes: 18 additions & 0 deletions metricbeat/autodiscover/builder/hints/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/keystore"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
)

Expand Down Expand Up @@ -266,6 +267,21 @@ func TestGenerateHints(t *testing.T) {
},
},
},
{
message: "Module with data.host defined and a zero port should not return a config",
event: bus.Event{
"host": "1.2.3.4",
"port": 0,
"hints": common.MapStr{
"metrics": common.MapStr{
"module": "mockmoduledefaults",
"namespace": "test",
"hosts": "${data.host}:9090",
},
},
},
len: 0,
},
{
message: "Module, namespace, host hint should return valid config",
event: bus.Event{
Expand Down Expand Up @@ -340,6 +356,7 @@ func TestGenerateHints(t *testing.T) {
m := metricHints{
Key: defaultConfig().Key,
Registry: mockRegister,
logger: logp.NewLogger("hints.builder"),
}
cfgs := m.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len)
Expand Down Expand Up @@ -413,6 +430,7 @@ func TestGenerateHintsDoesNotAccessGlobalKeystore(t *testing.T) {
m := metricHints{
Key: defaultConfig().Key,
Registry: mockRegister,
logger: logp.NewLogger("hints.builder"),
}
cfgs := m.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len)
Expand Down