Skip to content

Commit

Permalink
Revert: reducing scope of informer (#1245)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Oct 11, 2024
1 parent 8c3c9db commit 392baa8
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 48 deletions.
11 changes: 5 additions & 6 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,11 @@ func buildCommonContextInfo(
ctxInfo := &global.ContextInfo{
Prometheus: promMgr,
K8sInformer: kube.NewMetadataProvider(kube.MetadataConfig{
Enable: config.Attributes.Kubernetes.Enable,
EnableNetworkMetadata: config.Enabled(beyla.FeatureNetO11y),
KubeConfigPath: config.Attributes.Kubernetes.KubeconfigPath,
SyncTimeout: config.Attributes.Kubernetes.InformersSyncTimeout,
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
Enable: config.Attributes.Kubernetes.Enable,
KubeConfigPath: config.Attributes.Kubernetes.KubeconfigPath,
SyncTimeout: config.Attributes.Kubernetes.InformersSyncTimeout,
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
}),
}
switch {
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWatcherKubeEnricher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{SyncTimeout: 30 * time.Minute}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, ""))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{SyncTimeout: 30 * time.Minute}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, ""))
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
24 changes: 4 additions & 20 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -231,10 +230,10 @@ func rmContainerIDSchema(containerID string) string {
return containerID
}

func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string) error {
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface) error {
// Initialization variables
k.log = klog()
return k.initInformers(ctx, client, restrictNode)
return k.initInformers(ctx, client)
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
Expand Down Expand Up @@ -264,23 +263,8 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string) error {
var informerFactory informers.SharedInformerFactory
if restrictNode == "" {
k.log.Debug("no node selector provided. Listening to global resources")
informerFactory = informers.NewSharedInformerFactory(client, k.resyncPeriod)
} else {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", restrictNode).String()
k.log.Debug("using field selector", "selector", fieldSelector)
opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
})
informerFactory = informers.NewSharedInformerFactoryWithOptions(client, k.resyncPeriod, opts)
// In the App O11y use case, we restrict to local nodes as we don't need to listen to global resources.
// In App O11y, we don't need neither Node nor Service informers, so we disable them.
k.disabledInformers |= InformerNode
k.disabledInformers |= InformerService
}
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface) error {
informerFactory := informers.NewSharedInformerFactory(client, k.resyncPeriod)

if err := k.initPodInformer(informerFactory); err != nil {
return err
Expand Down
26 changes: 6 additions & 20 deletions pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (
)

type MetadataConfig struct {
Enable kubeflags.EnableFlag
DisabledInformers []string
KubeConfigPath string
EnableNetworkMetadata bool
SyncTimeout time.Duration
ResyncPeriod time.Duration
Enable kubeflags.EnableFlag
DisabledInformers []string
KubeConfigPath string
SyncTimeout time.Duration
ResyncPeriod time.Duration
}

type MetadataProvider struct {
Expand All @@ -35,7 +34,6 @@ type MetadataProvider struct {

enable atomic.Value
disabledInformers maps.Bits
enableNetworkMeta bool
}

func NewMetadataProvider(config MetadataConfig) *MetadataProvider {
Expand All @@ -46,7 +44,6 @@ func NewMetadataProvider(config MetadataConfig) *MetadataProvider {
config.ResyncPeriod = defaultResyncTime
}
mp := &MetadataProvider{
enableNetworkMeta: config.EnableNetworkMetadata,
kubeConfigPath: config.KubeConfigPath,
syncTimeout: config.SyncTimeout,
resyncPeriod: config.ResyncPeriod,
Expand Down Expand Up @@ -106,23 +103,12 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Metadata, error) {
return nil, fmt.Errorf("kubernetes client can't be initialized: %w", err)
}

// restricting the node name of the informers for App O11y, as we will only decorate
// instances running on the same node that Beyla
// however, for network o11y, we need to get all the nodes so the node name restriction
// would remain unset
restrictNodeName := ""
if !mp.enableNetworkMeta {
restrictNodeName, err = mp.CurrentNodeName(ctx)
if err != nil {
return nil, fmt.Errorf("can't get current node name: %w", err)
}
}
mp.metadata = &Metadata{
disabledInformers: mp.disabledInformers,
SyncTimeout: mp.syncTimeout,
resyncPeriod: mp.resyncPeriod,
}
if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName); err != nil {
if err := mp.metadata.InitFromClient(ctx, kubeClient); err != nil {
return nil, fmt.Errorf("can't initialize kubernetes metadata: %w", err)
}
return mp.metadata, nil
Expand Down

0 comments on commit 392baa8

Please sign in to comment.