Skip to content

Commit

Permalink
Add kubernetes composable inputs provider (#21480) (#21586)
Browse files Browse the repository at this point in the history
* Add kubernetes composable inputs provider

(cherry picked from commit d6ee968)
  • Loading branch information
Carlos Pérez-Aradros Herce authored Oct 6, 2020
1 parent 1045e6e commit 5feee4a
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 0 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Add support for EQL based condition on inputs {pull}20994[20994]
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Add `kubernetes` composable dynamic provider. {pull}21480[21480]
- Send updating state {pull}21461[21461]
- Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/docker"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/path"
Expand Down
31 changes: 31 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// TODO review the need for this
// +build linux darwin windows

package kubernetes

import (
"time"
)

// Config for kubernetes provider
type Config struct {
KubeConfig string `config:"kube_config"`
SyncPeriod time.Duration `config:"sync_period"`
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`

// Needed when resource is a pod
Node string `config:"node"`

// Scope of the provider (cluster or node)
Scope string `config:"scope"`
}

// InitDefaults initializes the default values for the config.
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
}
215 changes: 215 additions & 0 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package kubernetes

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func init() {
composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder)
}

type dynamicProvider struct {
logger *logger.Logger
config *Config
}

type eventWatcher struct {
logger *logger.Logger
cleanupTimeout time.Duration
comm composable.DynamicProviderComm
}

// DynamicProviderBuilder builds the dynamic provider.
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) {
var cfg Config
if c == nil {
c = config.New()
}
err := c.Unpack(&cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the environment context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider skipped, unable to connect: %s", err)
return nil
}

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if p.config.Scope == "node" {
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
} else {
p.config.Node = ""
}
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Node: p.config.Node,
//Namespace: p.config.Namespace,
}, nil)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
}

watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher")
}

return nil
}

func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)

// TODO deal with init containers stopping after initialization
p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses)
}

func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) {
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
cid, runtime := kubernetes.ContainerIDWithRuntime(c)
containerIDs[c.Name] = cid
runtimes[c.Name] = runtime
}

for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.Name]
if cid == "" {
continue
}

// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)

mapping := map[string]interface{}{
"namespace": pod.GetNamespace(),
"pod": map[string]interface{}{
"uid": string(pod.GetUID()),
"name": pod.GetName(),
"labels": pod.GetLabels(),
"ip": pod.Status.PodIP,
},
"container": map[string]interface{}{
"id": cid,
"name": c.Name,
"image": c.Image,
"runtime": runtimes[c.Name],
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
},
}

// Emit the container
p.comm.AddOrUpdate(eventID, mapping, processors)
}
}

func (p *eventWatcher) emitStopped(pod *kubernetes.Pod) {
p.comm.Remove(string(pod.GetUID()))

for _, c := range pod.Spec.Containers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}

for _, c := range pod.Spec.InitContainers {
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name)
p.comm.Remove(eventID)
}
}

// OnAdd ensures processing of pod objects that are newly added
func (p *eventWatcher) OnAdd(obj interface{}) {
p.logger.Debugf("pod add: %+v", obj)
p.emitRunning(obj.(*kubernetes.Pod))
}

// OnUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *eventWatcher) OnUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)

p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase)
switch pod.Status.Phase {
case kubernetes.PodSucceeded, kubernetes.PodFailed:
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
return
case kubernetes.PodPending:
p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj)
return
}

p.logger.Debugf("pod update: %+v", obj)
p.emitRunning(pod)
}

// OnDelete stops pod objects that are deleted
func (p *eventWatcher) OnDelete(obj interface{}) {
p.logger.Debugf("pod delete: %+v", obj)
pod := obj.(*kubernetes.Pod)
time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) })
}

0 comments on commit 5feee4a

Please sign in to comment.