From 9123dcc923312aeaf6be0832a2e741d1255aab9b Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Thu, 11 Jan 2018 10:16:11 -0800 Subject: [PATCH] Add autodiscover for kubernetes --- CHANGELOG.asciidoc | 1 + .../providers/kubernetes/config.go | 30 +++ .../providers/kubernetes/kubernetes.go | 175 ++++++++++++++++++ libbeat/cmd/instance/beat.go | 1 + libbeat/common/kubernetes/metadata.go | 3 + libbeat/common/kubernetes/types.go | 12 ++ libbeat/common/kubernetes/util.go | 62 +++++++ .../add_kubernetes_metadata/indexers.go | 16 +- .../add_kubernetes_metadata/kubernetes.go | 49 +---- 9 files changed, 290 insertions(+), 59 deletions(-) create mode 100644 libbeat/autodiscover/providers/kubernetes/config.go create mode 100644 libbeat/autodiscover/providers/kubernetes/kubernetes.go create mode 100644 libbeat/common/kubernetes/util.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 958e0452ba1..628795ed8bc 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di `logging.metrics` feature. {pull}5915[5915] - Add the ability to log to the Windows Event Log. {pull}5913[5813] - Improve Elasticsearch output metrics to count number of dropped and duplicate (if event ID is given) events. {pull}5811[5811] +- Add autodiscover for kubernetes. {pull}6055[6055] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go new file mode 100644 index 00000000000..ae7064a2e02 --- /dev/null +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -0,0 +1,30 @@ +package kubernetes + +import ( + "time" + + "github.com/elastic/beats/libbeat/autodiscover/template" +) + +// Config for kubernetes autodiscover provider +type Config struct { + InCluster bool `config:"in_cluster"` + KubeConfig string `config:"kube_config"` + Host string `config:"host"` + Namespace string `config:"namespace"` + SyncPeriod time.Duration `config:"sync_period"` + CleanupTimeout time.Duration `config:"cleanup_timeout"` + + IncludeLabels []string `config:"include_labels"` + ExcludeLabels []string `config:"exclude_labels"` + IncludeAnnotations []string `config:"include_annotations"` + + Templates template.MapperSettings `config:"templates"` +} + +func defaultConfig() *Config { + return &Config{ + InCluster: true, + SyncPeriod: 1 * time.Second, + } +} diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go new file mode 100644 index 00000000000..3a6d5738dc4 --- /dev/null +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -0,0 +1,175 @@ +package kubernetes + +import ( + "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/common/kubernetes" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + autodiscover.ProviderRegistry.AddProvider("kubernetes", AutodiscoverBuilder) +} + +// Provider implements autodiscover provider for docker containers +type Provider struct { + config *Config + bus bus.Bus + watcher kubernetes.Watcher + metagen kubernetes.MetaGenerator + templates *template.Mapper + stop chan interface{} + startListener bus.Listener + stopListener bus.Listener + updateListener bus.Listener +} + +// AutodiscoverBuilder builds and returns an autodiscover provider +func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) { + config := defaultConfig() + err := c.Unpack(&config) + if err != nil { + return nil, err + } + + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig) + if err != nil { + return nil, err + } + + metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels) + + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) + watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host) + + start := watcher.ListenStart() + stop := watcher.ListenStop() + update := watcher.ListenUpdate() + + if err := watcher.Start(); err != nil { + return nil, err + } + + return &Provider{ + config: config, + bus: bus, + templates: mapper, + metagen: metagen, + watcher: watcher, + stop: make(chan interface{}), + startListener: start, + stopListener: stop, + updateListener: update, + }, nil +} + +func (p *Provider) Start() { + go func() { + for { + select { + case <-p.stop: + p.startListener.Stop() + p.stopListener.Stop() + return + + case event := <-p.startListener.Events(): + p.emit(event, "start") + + case event := <-p.stopListener.Events(): + p.emit(event, "stop") + + case event := <-p.updateListener.Events(): + //On updates, first send a stop signal followed by a start signal to simulate a restart + p.emit(event, "stop") + p.emit(event, "start") + } + } + }() +} + +func (p *Provider) emit(event bus.Event, flag string) { + pod, ok := event["pod"].(*kubernetes.Pod) + if !ok { + logp.Err("Couldn't get a pod from watcher event") + return + } + + host := pod.Status.PodIP + + // Emit pod container IDs + for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + cmeta := common.MapStr{ + "id": c.GetContainerID(), + "name": c.Name, + "image": c.Image, + } + + // Metadata appended to each event + meta := p.metagen.ContainerMetadata(pod, c.Name) + + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["container"] = cmeta + + // Emit container info + p.publish(bus.Event{ + flag: true, + "host": host, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + }) + } + + // Emit pod ports + for _, c := range pod.Spec.Containers { + cmeta := common.MapStr{ + "name": c.Name, + "image": c.Image, + } + + // Metadata appended to each event + meta := p.metagen.ContainerMetadata(pod, c.Name) + + // Information that can be used in discovering a workload + kubemeta := meta.Clone() + kubemeta["container"] = cmeta + + for _, port := range c.Ports { + event := bus.Event{ + flag: true, + "host": host, + "port": port.ContainerPort, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + } + p.publish(event) + } + } +} + +func (p *Provider) publish(event bus.Event) { + // Try to match a config + if config := p.templates.GetConfig(event); config != nil { + event["config"] = config + } + p.bus.Publish(event) +} + +func (p *Provider) Stop() { + close(p.stop) +} + +func (p *Provider) String() string { + return "kubernetes" +} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6a5664d375b..4621776fcbc 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -50,6 +50,7 @@ import ( // Register autodiscover providers _ "github.com/elastic/beats/libbeat/autodiscover/providers/docker" + _ "github.com/elastic/beats/libbeat/autodiscover/providers/kubernetes" // Register default monitoring reporting _ "github.com/elastic/beats/libbeat/monitoring/report/elasticsearch" diff --git a/libbeat/common/kubernetes/metadata.go b/libbeat/common/kubernetes/metadata.go index 481f4100332..9ae5b32cf5e 100644 --- a/libbeat/common/kubernetes/metadata.go +++ b/libbeat/common/kubernetes/metadata.go @@ -47,6 +47,9 @@ func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr { "pod": common.MapStr{ "name": pod.Metadata.Name, }, + "node": common.MapStr{ + "name": pod.Spec.NodeName, + }, "namespace": pod.Metadata.Namespace, } diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 05501094343..ccadf17651d 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -2,6 +2,7 @@ package kubernetes import ( "encoding/json" + "strings" "github.com/elastic/beats/libbeat/logp" @@ -107,6 +108,17 @@ type Pod struct { Status PodStatus `json:"status"` } +func (s *PodContainerStatus) GetContainerID() string { + cID := s.ContainerID + if cID != "" { + parts := strings.Split(cID, "//") + if len(parts) == 2 { + return parts[1] + } + } + return "" +} + // GetPod converts Pod to our own type func GetPod(pod *corev1.Pod) *Pod { bytes, err := json.Marshal(pod) diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go new file mode 100644 index 00000000000..4a74928ffed --- /dev/null +++ b/libbeat/common/kubernetes/util.go @@ -0,0 +1,62 @@ +package kubernetes + +import ( + "context" + "fmt" + "io/ioutil" + "os" + + "github.com/ericchiang/k8s" + "github.com/ghodss/yaml" + + "github.com/elastic/beats/libbeat/logp" +) + +func GetKubernetesClient(in_cluster bool, kube_config string) (client *k8s.Client, err error) { + if in_cluster == true { + client, err = k8s.NewInClusterClient() + if err != nil { + return nil, fmt.Errorf("Unable to get in cluster configuration: %v", err) + } + } else { + data, err := ioutil.ReadFile(kube_config) + if err != nil { + return nil, fmt.Errorf("read kubeconfig: %v", err) + } + + // Unmarshal YAML into a Kubernetes config object. + var config k8s.Config + if err = yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) + } + client, err = k8s.NewClient(&config) + if err != nil { + return nil, err + } + } + + return client, nil +} + +func DiscoverKubernetesNode(host string, client *k8s.Client) string { + ctx := context.Background() + if host == "" { + podName := os.Getenv("HOSTNAME") + logp.Info("Using pod name %s and namespace %s", podName, client.Namespace) + if podName == "localhost" { + host = "localhost" + } else { + pod, error := client.CoreV1().GetPod(ctx, podName, client.Namespace) + if error != nil { + logp.Err("Querying for pod failed with error: ", error.Error()) + logp.Info("Unable to find pod, setting host to localhost") + host = "localhost" + } else { + host = pod.Spec.GetNodeName() + } + + } + } + + return host +} diff --git a/libbeat/processors/add_kubernetes_metadata/indexers.go b/libbeat/processors/add_kubernetes_metadata/indexers.go index 57e4e58584b..f535253f151 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers.go @@ -2,7 +2,6 @@ package add_kubernetes_metadata import ( "fmt" - "strings" "sync" "github.com/elastic/beats/libbeat/common" @@ -143,7 +142,7 @@ func NewContainerIndexer(_ common.Config, metaGen kubernetes.MetaGenerator) (Ind func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { var metadata []MetadataIndex for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - cID := containerID(status) + cID := status.ContainerID if cID == "" { continue } @@ -160,7 +159,7 @@ func (c *ContainerIndexer) GetMetadata(pod *kubernetes.Pod) []MetadataIndex { func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string { var containers []string for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { - cID := containerID(status) + cID := status.GetContainerID() if cID == "" { continue } @@ -169,17 +168,6 @@ func (c *ContainerIndexer) GetIndexes(pod *kubernetes.Pod) []string { return containers } -func containerID(status kubernetes.PodContainerStatus) string { - cID := status.ContainerID - if cID != "" { - parts := strings.Split(cID, "//") - if len(parts) == 2 { - return parts[1] - } - } - return "" -} - // IPPortIndexer indexes pods based on all their host:port combinations type IPPortIndexer struct { metaGen kubernetes.MetaGenerator diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 87cacf74520..ec43b1f4f6a 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -1,11 +1,8 @@ package add_kubernetes_metadata import ( - "context" "errors" "fmt" - "io/ioutil" - "os" "sync" "time" @@ -16,9 +13,6 @@ import ( "github.com/elastic/beats/libbeat/common/kubernetes" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - - "github.com/ericchiang/k8s" - "github.com/ghodss/yaml" ) const ( @@ -93,47 +87,12 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("Can not initialize kubernetes plugin with zero matcher plugins") } - var client *k8s.Client - if config.InCluster == true { - client, err = k8s.NewInClusterClient() - if err != nil { - return nil, fmt.Errorf("Unable to get in cluster configuration: %v", err) - } - } else { - data, err := ioutil.ReadFile(config.KubeConfig) - if err != nil { - return nil, fmt.Errorf("read kubeconfig: %v", err) - } - - // Unmarshal YAML into a Kubernetes config object. - var config k8s.Config - if err = yaml.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("unmarshal kubeconfig: %v", err) - } - client, err = k8s.NewClient(&config) - if err != nil { - return nil, err - } + client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig) + if err != nil { + return nil, err } - ctx := context.Background() - if config.Host == "" { - podName := os.Getenv("HOSTNAME") - logp.Info("Using pod name %s and namespace %s", podName, client.Namespace) - if podName == "localhost" { - config.Host = "localhost" - } else { - pod, error := client.CoreV1().GetPod(ctx, podName, client.Namespace) - if error != nil { - logp.Err("Querying for pod failed with error: ", error.Error()) - logp.Info("Unable to find pod, setting host to localhost") - config.Host = "localhost" - } else { - config.Host = pod.Spec.GetNodeName() - } - - } - } + config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client) logp.Debug("kubernetes", "Using host ", config.Host) logp.Debug("kubernetes", "Initializing watcher")