Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use replicaset and Job MetaGen based on watchers #35483

Merged
merged 17 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Allow users to enable features via configuration, starting with the FQDN reporting feature. {issue}1070[1070] {pull}34456[34456]
- Add Hetzner Cloud as a provider for `add_cloud_metadata`. {pull}35456[35456]
- Reload Beat when TLS certificates or key files are modified. {issue}34408[34408] {pull}34416[34416]
- Upgrade version of elastic-agent-autodiscover to v0.6.1 for improved memory consumption on k8s. {pull}35483[35483]


*Auditbeat*
Expand Down
16 changes: 8 additions & 8 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11963,11 +11963,11 @@ SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-autodiscover
Version: v0.5.0
Version: v0.6.1
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.5.0/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.6.1/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down Expand Up @@ -24360,11 +24360,11 @@ THE SOFTWARE.

--------------------------------------------------------------------------------
Dependency : golang.org/x/crypto
Version: v0.0.0-20220622213112-05595931fe9d
Version: v0.1.0
Licence type (autodetected): BSD-3-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.0.0-20220622213112-05595931fe9d/LICENSE:
Contents of probable licence file $GOMODCACHE/golang.org/x/crypto@v0.1.0/LICENSE:

Copyright (c) 2009 The Go Authors. All rights reserved.

Expand Down Expand Up @@ -45512,11 +45512,11 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/onsi/gomega
Version: v1.10.3
Version: v1.10.1
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/onsi/[email protected].3/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/onsi/[email protected].1/LICENSE:

Copyright (c) 2013-2014 Onsi Fakhouri

Expand Down Expand Up @@ -47860,11 +47860,11 @@ Contents of probable licence file $GOMODCACHE/github.com/urso/[email protected]

--------------------------------------------------------------------------------
Dependency : github.com/vishvananda/netlink
Version: v1.1.1-0.20201029203352-d40f9887b852
Version: v1.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/vishvananda/[email protected].1-0.20201029203352-d40f9887b852/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/vishvananda/[email protected].0/LICENSE:


Apache License
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/crypto v0.1.0
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/mod v0.9.0
golang.org/x/net v0.9.0
Expand Down Expand Up @@ -195,7 +195,7 @@ require (
github.com/aws/smithy-go v1.13.5
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.5.0
github.com/elastic/elastic-agent-autodiscover v0.6.1
github.com/elastic/elastic-agent-libs v0.3.8
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3
github.com/elastic/elastic-agent-system-metrics v0.6.1
Expand Down
312 changes: 6 additions & 306 deletions go.sum

Large diffs are not rendered by default.

84 changes: 67 additions & 17 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ import (
)

type pod struct {
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publishFunc func([]bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publishFunc func([]bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
replicasetWatcher kubernetes.Watcher
jobWatcher kubernetes.Watcher

// Mutex used by configuration updates not triggered by the main watcher,
// to avoid race conditions between cross updates and deletions.
Expand All @@ -57,6 +59,8 @@ type pod struct {
func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish func(event []bus.Event)) (Eventer, error) {
logger := logp.NewLogger("autodiscover.pod")

var replicaSetWatcher, jobWatcher kubernetes.Watcher

config := defaultConfig()
err := cfg.Unpack(&config)
if err != nil {
Expand Down Expand Up @@ -109,17 +113,41 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)

// Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

p := &pod{
config: config,
uuid: uuid,
publishFunc: publish,
metagen: metaGen,
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
config: config,
uuid: uuid,
publishFunc: publish,
metagen: metaGen,
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
replicasetWatcher: replicaSetWatcher,
jobWatcher: jobWatcher,
}

watcher.AddEventHandler(p)
Expand Down Expand Up @@ -244,6 +272,20 @@ func (p *pod) Start() error {
}
}

if p.replicasetWatcher != nil {
err := p.replicasetWatcher.Start()
if err != nil {
return err
}
}

if p.jobWatcher != nil {
err := p.jobWatcher.Start()
if err != nil {
return err
}
}

return p.watcher.Start()
}

Expand All @@ -258,6 +300,14 @@ func (p *pod) Stop() {
if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}

if p.replicasetWatcher != nil {
p.replicasetWatcher.Stop()
}

if p.jobWatcher != nil {
p.jobWatcher.Stop()
}
}

// emit emits the events for the given pod according to its state and
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,7 +1935,7 @@ func TestPod_EmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(conf.NewConfig(), nil, client, nil, nil, addResourceMetadata)
metaGen := metadata.NewPodMetadataGenerator(conf.NewConfig(), nil, client, nil, nil, nil, nil, addResourceMetadata)
p := &Provider{
config: defaultConfig(),
bus: bus.New(logp.NewLogger("bus"), "test"),
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/add_kubernetes_metadata/indexers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

var addResourceMetadata = metadata.GetDefaultResourceMetadataConfig()
var metagen = metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, addResourceMetadata)
var metagen = metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, nil, nil, addResourceMetadata)

func TestPodIndexer(t *testing.T) {
var testConfig = config.NewConfig()
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestPodIndexer(t *testing.T) {
func TestPodUIDIndexer(t *testing.T) {
var testConfig = config.NewConfig()

metaGenWithPodUID := metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, addResourceMetadata)
metaGenWithPodUID := metadata.NewPodMetadataGenerator(config.NewConfig(), nil, nil, nil, nil, nil, nil, addResourceMetadata)

podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID)
assert.NoError(t, err)
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestFilteredGenMeta(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, addResourceMetadata)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil, nil, addResourceMetadata)

podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
})
assert.NoError(t, err)

filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, addResourceMetadata)
filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil, nil, nil, addResourceMetadata)

podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen)
assert.NoError(t, err)
Expand Down
38 changes: 37 additions & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func newProcessorConfig(cfg *config.C, register *Register) (kubeAnnotatorConfig,

func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
k.initOnce.Do(func() {
var replicaSetWatcher, jobWatcher kubernetes.Watcher

client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
Expand Down Expand Up @@ -208,8 +210,30 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}

// Resource is Pod so we need to create watchers for Replicasets and Jobs that it might belongs to
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
}
if metaConf.CronJob {
jobWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}
}

// TODO: refactor the above section to a common function to be used by NeWPodEventer too
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
Expand Down Expand Up @@ -247,6 +271,18 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
return
}
}
if replicaSetWatcher != nil {
if err := replicaSetWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start replicaSet watcher: %v", err)
return
}
}
if jobWatcher != nil {
if err := jobWatcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start job watcher: %v", err)
return
}
}
if err := watcher.Start(); err != nil {
k.log.Debugf("add_kubernetes_metadata", "Couldn't start pod watcher: %v", err)
return
Expand Down
Loading