Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add autodiscover for kubernetes #6055

Merged
merged 4 commits into from
Jan 12, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add autodiscover for kubernetes
vjsamuel committed Jan 12, 2018
commit 9123dcc923312aeaf6be0832a2e741d1255aab9b
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
`logging.metrics` feature. {pull}5915[5915]
- Add the ability to log to the Windows Event Log. {pull}5913[5813]
- Improve Elasticsearch output metrics to count number of dropped and duplicate (if event ID is given) events. {pull}5811[5811]
- Add autodiscover for kubernetes. {pull}6055[6055]

*Auditbeat*

30 changes: 30 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kubernetes

import (
"time"

"github.com/elastic/beats/libbeat/autodiscover/template"
)

// Config for kubernetes autodiscover provider
type Config struct {
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Host string `config:"host"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout"`

IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

Templates template.MapperSettings `config:"templates"`
}

func defaultConfig() *Config {
return &Config{
InCluster: true,
SyncPeriod: 1 * time.Second,
}
}
175 changes: 175 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package kubernetes

import (
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
autodiscover.ProviderRegistry.AddProvider("kubernetes", AutodiscoverBuilder)
}

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
stop chan interface{}
startListener bus.Listener
stopListener bus.Listener
updateListener bus.Listener
}

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) {
config := defaultConfig()
err := c.Unpack(&config)
if err != nil {
return nil, err
}

mapper, err := template.NewConfigMapper(config.Templates)
if err != nil {
return nil, err
}

client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig)
if err != nil {
return nil, err
}

metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)

start := watcher.ListenStart()
stop := watcher.ListenStop()
update := watcher.ListenUpdate()

if err := watcher.Start(); err != nil {
return nil, err
}

return &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
stop: make(chan interface{}),
startListener: start,
stopListener: stop,
updateListener: update,
}, nil
}

func (p *Provider) Start() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Provider.Start should have comment or be unexported

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vjsamuel lately we have been trying to make hound happy for most of its messages, could you add these comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

go func() {
for {
select {
case <-p.stop:
p.startListener.Stop()
p.stopListener.Stop()
return

case event := <-p.startListener.Events():
p.emit(event, "start")

case event := <-p.stopListener.Events():
p.emit(event, "stop")

case event := <-p.updateListener.Events():
//On updates, first send a stop signal followed by a start signal to simulate a restart
p.emit(event, "stop")
p.emit(event, "start")
}
}
}()
}

func (p *Provider) emit(event bus.Event, flag string) {
pod, ok := event["pod"].(*kubernetes.Pod)
if !ok {
logp.Err("Couldn't get a pod from watcher event")
return
}

host := pod.Status.PodIP

// Emit pod container IDs
for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cmeta := common.MapStr{
"id": c.GetContainerID(),
"name": c.Name,
"image": c.Image,
}

// Metadata appended to each event
meta := p.metagen.ContainerMetadata(pod, c.Name)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

// Emit container info
p.publish(bus.Event{
flag: true,
"host": host,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
})
}

// Emit pod ports
for _, c := range pod.Spec.Containers {
cmeta := common.MapStr{
"name": c.Name,
"image": c.Image,
}

// Metadata appended to each event
meta := p.metagen.ContainerMetadata(pod, c.Name)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

for _, port := range c.Ports {
event := bus.Event{
flag: true,
"host": host,
"port": port.ContainerPort,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)
}
}
}

func (p *Provider) publish(event bus.Event) {
// Try to match a config
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) Stop() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Provider.Stop should have comment or be unexported

close(p.stop)
}

func (p *Provider) String() string {
return "kubernetes"
}
1 change: 1 addition & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ import (

// Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker"
_ "github.com/elastic/beats/libbeat/autodiscover/providers/kubernetes"

// Register default monitoring reporting
_ "github.com/elastic/beats/libbeat/monitoring/report/elasticsearch"
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/metadata.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,9 @@ func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
"pod": common.MapStr{
"name": pod.Metadata.Name,
},
"node": common.MapStr{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this need an update to our fields.yml or is node already in there?

"name": pod.Spec.NodeName,
},
"namespace": pod.Metadata.Namespace,
}

12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package kubernetes

import (
"encoding/json"
"strings"

"github.com/elastic/beats/libbeat/logp"

@@ -107,6 +108,17 @@ type Pod struct {
Status PodStatus `json:"status"`
}

func (s *PodContainerStatus) GetContainerID() string {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method PodContainerStatus.GetContainerID should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method PodContainerStatus.GetContainerID should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method PodContainerStatus.GetContainerID should have comment or be unexported

cID := s.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}

// GetPod converts Pod to our own type
func GetPod(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
62 changes: 62 additions & 0 deletions libbeat/common/kubernetes/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kubernetes

import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/ericchiang/k8s"
"github.com/ghodss/yaml"

"github.com/elastic/beats/libbeat/logp"
)

func GetKubernetesClient(in_cluster bool, kube_config string) (client *k8s.Client, err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function GetKubernetesClient should have comment or be unexported
don't use underscores in Go names; func parameter in_cluster should be inCluster
don't use underscores in Go names; func parameter kube_config should be kubeConfig

if in_cluster == true {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally use inClusterfor naming, same for kubeConfig

client, err = k8s.NewInClusterClient()
if err != nil {
return nil, fmt.Errorf("Unable to get in cluster configuration: %v", err)
}
} else {
data, err := ioutil.ReadFile(kube_config)
if err != nil {
return nil, fmt.Errorf("read kubeconfig: %v", err)
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err = yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %v", err)
}
client, err = k8s.NewClient(&config)
if err != nil {
return nil, err
}
}

return client, nil
}

func DiscoverKubernetesNode(host string, client *k8s.Client) string {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function DiscoverKubernetesNode should have comment or be unexported

ctx := context.Background()
if host == "" {
podName := os.Getenv("HOSTNAME")
logp.Info("Using pod name %s and namespace %s", podName, client.Namespace)
if podName == "localhost" {
host = "localhost"
} else {
pod, error := client.CoreV1().GetPod(ctx, podName, client.Namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We often use err instead of error

if error != nil {
logp.Err("Querying for pod failed with error: ", error.Error())
logp.Info("Unable to find pod, setting host to localhost")
host = "localhost"
} else {
host = pod.Spec.GetNodeName()
}

}
}

return host
}
16 changes: 2 additions & 14 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package add_kubernetes_metadata

import (
"fmt"
"strings"
"sync"

"github.com/elastic/beats/libbeat/common"
@@ -143,7 +142,7 @@ func NewContainerIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Ind
func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
var metadata []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
cID := status.ContainerID
if cID == "" {
continue
}
@@ -160,7 +159,7 @@ func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string {
var containers []string
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
cID := status.GetContainerID()
if cID == "" {
continue
}
@@ -169,17 +168,6 @@ func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string {
return containers
}

func containerID(status kubernetes.PodContainerStatus) string {
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}

// IPPortIndexer indexes pods based on all their host:port combinations
type IPPortIndexer struct {
metaGen kubernetes.MetaGenerator
Loading