Skip to content

Commit

Permalink
Add autodiscover for kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Jan 12, 2018
1 parent a3970ee commit 9123dcc
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
`logging.metrics` feature. {pull}5915[5915]
- Add the ability to log to the Windows Event Log. {pull}5913[5813]
- Improve Elasticsearch output metrics to count number of dropped and duplicate (if event ID is given) events. {pull}5811[5811]
- Add autodiscover for kubernetes. {pull}6055[6055]

*Auditbeat*

Expand Down
30 changes: 30 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kubernetes

import (
"time"

"github.com/elastic/beats/libbeat/autodiscover/template"
)

// Config for kubernetes autodiscover provider
type Config struct {
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Host string `config:"host"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout"`

IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

Templates template.MapperSettings `config:"templates"`
}

func defaultConfig() *Config {
return &Config{
InCluster: true,
SyncPeriod: 1 * time.Second,
}
}
175 changes: 175 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package kubernetes

import (
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
autodiscover.ProviderRegistry.AddProvider("kubernetes", AutodiscoverBuilder)
}

// Provider implements autodiscover provider for docker containers
type Provider struct {
config *Config
bus bus.Bus
watcher kubernetes.Watcher
metagen kubernetes.MetaGenerator
templates *template.Mapper
stop chan interface{}
startListener bus.Listener
stopListener bus.Listener
updateListener bus.Listener
}

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) {
config := defaultConfig()
err := c.Unpack(&config)
if err != nil {
return nil, err
}

mapper, err := template.NewConfigMapper(config.Templates)
if err != nil {
return nil, err
}

client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig)
if err != nil {
return nil, err
}

metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)

start := watcher.ListenStart()
stop := watcher.ListenStop()
update := watcher.ListenUpdate()

if err := watcher.Start(); err != nil {
return nil, err
}

return &Provider{
config: config,
bus: bus,
templates: mapper,
metagen: metagen,
watcher: watcher,
stop: make(chan interface{}),
startListener: start,
stopListener: stop,
updateListener: update,
}, nil
}

func (p *Provider) Start() {
go func() {
for {
select {
case <-p.stop:
p.startListener.Stop()
p.stopListener.Stop()
return

case event := <-p.startListener.Events():
p.emit(event, "start")

case event := <-p.stopListener.Events():
p.emit(event, "stop")

case event := <-p.updateListener.Events():
//On updates, first send a stop signal followed by a start signal to simulate a restart
p.emit(event, "stop")
p.emit(event, "start")
}
}
}()
}

func (p *Provider) emit(event bus.Event, flag string) {
pod, ok := event["pod"].(*kubernetes.Pod)
if !ok {
logp.Err("Couldn't get a pod from watcher event")
return
}

host := pod.Status.PodIP

// Emit pod container IDs
for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cmeta := common.MapStr{
"id": c.GetContainerID(),
"name": c.Name,
"image": c.Image,
}

// Metadata appended to each event
meta := p.metagen.ContainerMetadata(pod, c.Name)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

// Emit container info
p.publish(bus.Event{
flag: true,
"host": host,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
})
}

// Emit pod ports
for _, c := range pod.Spec.Containers {
cmeta := common.MapStr{
"name": c.Name,
"image": c.Image,
}

// Metadata appended to each event
meta := p.metagen.ContainerMetadata(pod, c.Name)

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
kubemeta["container"] = cmeta

for _, port := range c.Ports {
event := bus.Event{
flag: true,
"host": host,
"port": port.ContainerPort,
"kubernetes": kubemeta,
"meta": common.MapStr{
"kubernetes": meta,
},
}
p.publish(event)
}
}
}

func (p *Provider) publish(event bus.Event) {
// Try to match a config
if config := p.templates.GetConfig(event); config != nil {
event["config"] = config
}
p.bus.Publish(event)
}

func (p *Provider) Stop() {
close(p.stop)
}

func (p *Provider) String() string {
return "kubernetes"
}
1 change: 1 addition & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (

// Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker"
_ "github.com/elastic/beats/libbeat/autodiscover/providers/kubernetes"

// Register default monitoring reporting
_ "github.com/elastic/beats/libbeat/monitoring/report/elasticsearch"
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
"pod": common.MapStr{
"name": pod.Metadata.Name,
},
"node": common.MapStr{
"name": pod.Spec.NodeName,
},
"namespace": pod.Metadata.Namespace,
}

Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"encoding/json"
"strings"

"github.com/elastic/beats/libbeat/logp"

Expand Down Expand Up @@ -107,6 +108,17 @@ type Pod struct {
Status PodStatus `json:"status"`
}

func (s *PodContainerStatus) GetContainerID() string {
cID := s.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}

// GetPod converts Pod to our own type
func GetPod(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
Expand Down
62 changes: 62 additions & 0 deletions libbeat/common/kubernetes/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kubernetes

import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/ericchiang/k8s"
"github.com/ghodss/yaml"

"github.com/elastic/beats/libbeat/logp"
)

func GetKubernetesClient(in_cluster bool, kube_config string) (client *k8s.Client, err error) {
if in_cluster == true {
client, err = k8s.NewInClusterClient()
if err != nil {
return nil, fmt.Errorf("Unable to get in cluster configuration: %v", err)
}
} else {
data, err := ioutil.ReadFile(kube_config)
if err != nil {
return nil, fmt.Errorf("read kubeconfig: %v", err)
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err = yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %v", err)
}
client, err = k8s.NewClient(&config)
if err != nil {
return nil, err
}
}

return client, nil
}

func DiscoverKubernetesNode(host string, client *k8s.Client) string {
ctx := context.Background()
if host == "" {
podName := os.Getenv("HOSTNAME")
logp.Info("Using pod name %s and namespace %s", podName, client.Namespace)
if podName == "localhost" {
host = "localhost"
} else {
pod, error := client.CoreV1().GetPod(ctx, podName, client.Namespace)
if error != nil {
logp.Err("Querying for pod failed with error: ", error.Error())
logp.Info("Unable to find pod, setting host to localhost")
host = "localhost"
} else {
host = pod.Spec.GetNodeName()
}

}
}

return host
}
16 changes: 2 additions & 14 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package add_kubernetes_metadata

import (
"fmt"
"strings"
"sync"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -143,7 +142,7 @@ func NewContainerIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Ind
func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
var metadata []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
cID := status.ContainerID
if cID == "" {
continue
}
Expand All @@ -160,7 +159,7 @@ func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex {
func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string {
var containers []string
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
cID := status.GetContainerID()
if cID == "" {
continue
}
Expand All @@ -169,17 +168,6 @@ func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string {
return containers
}

func containerID(status kubernetes.PodContainerStatus) string {
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}

// IPPortIndexer indexes pods based on all their host:port combinations
type IPPortIndexer struct {
metaGen kubernetes.MetaGenerator
Expand Down
Loading

0 comments on commit 9123dcc

Please sign in to comment.