Skip to content

Commit

Permalink
Use replicaset and Job MetaGen based on watchers (#35483)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored and chrisberkhout committed Jun 1, 2023
1 parent f35f9db commit eecb1c0
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 358 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow users to enable features via configuration, starting with the FQDN reporting feature. {issue}1070[1070] {pull}34456[34456]
- Add Hetzner Cloud as a provider for `add_cloud_metadata`. {pull}35456[35456]
- Reload Beat when TLS certificates or key files are modified. {issue}34408[34408] {pull}34416[34416]
- Upgrade version of elastic-agent-autodiscover to v0.6.1 for improved memory consumption on k8s. {pull}35483[35483]


*Auditbeat*
Expand Down
16 changes: 8 additions & 8 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11963,11 +11963,11 @@ SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-autodiscover
Version: v0.5.0
Version: v0.6.1
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.5.0/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.1/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -24360,11 +24360,11 @@ THE SOFTWARE.

--------------------------------------------------------------------------------
Dependency : golang.org/x/crypto
Version: v0.0.0-20220622213112-05595931fe9d
Version: v0.1.0
Licence type (autodetected): BSD-3-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.0.0-20220622213112-05595931fe9d/LICENSE:
Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.1.0/LICENSE:

Copyright (c) 2009 The Go Authors. All rights reserved.

Expand Down Expand Up @@ -45512,11 +45512,11 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/onsi/gomega
Version: v1.10.3
Version: v1.10.1
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/onsi/[email protected].3/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/onsi/[email protected].1/LICENSE:

Copyright (c) 2013-2014 Onsi Fakhouri

Expand Down Expand Up @@ -47860,11 +47860,11 @@ Contents of probable licence file $GOMODCACHE/github.com/urso/[email protected]

--------------------------------------------------------------------------------
Dependency : github.com/vishvananda/netlink
Version: v1.1.1-0.20201029203352-d40f9887b852
Version: v1.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/vishvananda/[email protected].1-0.20201029203352-d40f9887b852/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/vishvananda/[email protected].0/LICENSE:


Apache License
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/crypto v0.1.0
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/mod v0.9.0
golang.org/x/net v0.9.0
Expand Down Expand Up @@ -195,7 +195,7 @@ require (
github.com/aws/smithy-go v1.13.5
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.5.0
github.com/elastic/elastic-agent-autodiscover v0.6.1
github.com/elastic/elastic-agent-libs v0.3.8
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3
github.com/elastic/elastic-agent-system-metrics v0.6.1
Expand Down
312 changes: 6 additions & 306 deletions go.sum

Large diffs are not rendered by default.

84 changes: 67 additions & 17 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ import (
)

type pod struct {
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publishFunc func([]bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publishFunc func([]bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
replicasetWatcher kubernetes.Watcher
jobWatcher kubernetes.Watcher

// Mutex used by configuration updates not triggered by the main watcher,
// to avoid race conditions between cross updates and deletions.
Expand All @@ -57,6 +59,8 @@ type pod struct {
func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.pod")

var replicaSetWatcher, jobWatcher kubernetes.Watcher

config := defaultConfig()
err := cfg.Unpack(&config)
if err != nil {
Expand Down Expand Up @@ -109,17 +113,41 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)

// Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

p := &pod{
config: config,
uuid: uuid,
publishFunc: publish,
metagen: metaGen,
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
config: config,
uuid: uuid,
publishFunc: publish,
metagen: metaGen,
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
replicasetWatcher: replicaSetWatcher,
jobWatcher: jobWatcher,
}

watcher.AddEventHandler(p)
Expand Down Expand Up @@ -244,6 +272,20 @@ func (p *pod) Start() error {
}
}

if p.replicasetWatcher != nil {
err := p.replicasetWatcher.Start()
if err != nil {
return err
}
}

if p.jobWatcher != nil {
err := p.jobWatcher.Start()
if err != nil {
return err
}
}

return p.watcher.Start()
}

Expand All @@ -258,6 +300,14 @@ func (p *pod) Stop() {
if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}

if p.replicasetWatcher != nil {
p.replicasetWatcher.Stop()
}

if p.jobWatcher != nil {
p.jobWatcher.Stop()
}
}

// emit emits the events for the given pod according to its state and
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,7 @@ func TestPod_EmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(conf.NewConfig(), nil, client, nil, nil, addResourceMetadata)
metaGen := metadata.NewPodMetadataGenerator(conf.NewConfig(), nil, client, nil, nil, nil, nil, addResourceMetadata)
p := &Provider{
config: defaultConfig(),
bus: bus.New(logp.NewLogger("bus"), "test"),
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

var addResourceMetadata = metadata.GetDefaultResourceMetadataConfig()
var metagen = metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, addResourceMetadata)
var metagen = metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, nil, nil, addResourceMetadata)

func TestPodIndexer(t *testing.T) {
var testConfig = config.NewConfig()
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestPodIndexer(t *testing.T) {
func TestPodUIDIndexer(t *testing.T) {
var testConfig = config.NewConfig()

metaGenWithPodUID := metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, addResourceMetadata)
metaGenWithPodUID := metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, nil, nil, addResourceMetadata)

podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID)
assert.NoError(t, err)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestFilteredGenMeta(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, addResourceMetadata)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil, nil, addResourceMetadata)

podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, addResourceMetadata)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil, nil, addResourceMetadata)

podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down
38 changes: 37 additions & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig,

func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.initOnce.Do(func() {
var replicaSetWatcher, jobWatcher kubernetes.Watcher

client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
Expand Down Expand Up @@ -208,8 +210,30 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}

// Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

// TODO: refactor the above section to a common function to be used by NeWPodEventer too
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
Expand Down Expand Up @@ -247,6 +271,18 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
return
}
}
if replicaSetWatcher != nil {
if err := replicaSetWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
return
}
}
if jobWatcher != nil {
if err := jobWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
return
}
}
if err := watcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start pod watcher: %v", err)
return
Expand Down
Loading

0 comments on commit eecb1c0

Please sign in to comment.