diff --git a/docs/sources/configure/options.md b/docs/sources/configure/options.md index 482c84742..67bbecfd0 100644 --- a/docs/sources/configure/options.md +++ b/docs/sources/configure/options.md @@ -681,6 +681,25 @@ reduces the load of the Kubernetes API. The Pods informer can't be disabled. For that purpose, you should disable the whole Kubernetes metadata decoration. +| YAML | Environment variable | Type | Default | +|--------------------------|-------------------------------------|----------|---------| +| `informers_sync_timeout` | `BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT` | Duration | 30s | + +Maximum time that Beyla waits for getting all the Kubernetes metadata before starting +to decorate metrics and traces. If this timeout is reached, Beyla starts normally but +the metadata attributes might be incomplete until all the Kubernetes metadata is locally +updated in background. + +| YAML | Environment variable | Type | Default | +|---------------------------|--------------------------------------|----------|---------| +| `informers_resync_period` | `BEYLA_KUBE_INFORMERS_RESYNC_PERIOD` | Duration | 30m | + +Beyla is subscribed to immediately receive any update on resources' metadata. In addition, +Beyla periodically resynchronizes the whole Kubernetes metadata at the frequency specified +by this property. + +Higher values reduce the load on the Kubernetes API service. + ## Routes decorator YAML section `routes`. diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go index 79daff0d9..12255fc98 100644 --- a/pkg/beyla/config.go +++ b/pkg/beyla/config.go @@ -110,8 +110,9 @@ var DefaultConfig = Config{ HostnameDNSResolution: true, }, Kubernetes: transform.KubernetesDecorator{ - Enable: kubeflags.EnabledDefault, - InformersSyncTimeout: 30 * time.Second, + Enable: kubeflags.EnabledDefault, + InformersSyncTimeout: 30 * time.Second, + InformersResyncPeriod: 30 * time.Minute, }, HostID: HostIDConfig{ FetchTimeout: 500 * time.Millisecond, diff --git a/pkg/beyla/config_test.go b/pkg/beyla/config_test.go index ce8185e66..61cd67abd 100644 --- a/pkg/beyla/config_test.go +++ b/pkg/beyla/config_test.go @@ -174,9 +174,10 @@ network: HostnameDNSResolution: true, }, Kubernetes: transform.KubernetesDecorator{ - KubeconfigPath: "/foo/bar", - Enable: kubeflags.EnabledTrue, - InformersSyncTimeout: 30 * time.Second, + KubeconfigPath: "/foo/bar", + Enable: kubeflags.EnabledTrue, + InformersSyncTimeout: 30 * time.Second, + InformersResyncPeriod: 30 * time.Minute, }, HostID: HostIDConfig{ Override: "the-host-id", diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index 2c94ae7a8..4f1ceab7a 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -107,13 +107,14 @@ func buildCommonContextInfo( promMgr := &connector.PrometheusManager{} ctxInfo := &global.ContextInfo{ Prometheus: promMgr, - K8sInformer: kube.NewMetadataProvider( - config.Attributes.Kubernetes.Enable, - config.Attributes.Kubernetes.DisableInformers, - config.Attributes.Kubernetes.KubeconfigPath, - config.Enabled(beyla.FeatureNetO11y), - config.Attributes.Kubernetes.InformersSyncTimeout, - ), + K8sInformer: kube.NewMetadataProvider(kube.MetadataConfig{ + Enable: config.Attributes.Kubernetes.Enable, + EnableNetworkMetadata: config.Enabled(beyla.FeatureNetO11y), + KubeConfigPath: config.Attributes.Kubernetes.KubeconfigPath, + SyncTimeout: config.Attributes.Kubernetes.InformersSyncTimeout, + ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod, + DisabledInformers: config.Attributes.Kubernetes.DisableInformers, + }), } switch { case config.InternalMetrics.Prometheus.Port != 0: diff --git a/pkg/internal/discover/watcher_kube_test.go b/pkg/internal/discover/watcher_kube_test.go index ef5a5de27..4c3919f62 100644 --- a/pkg/internal/discover/watcher_kube_test.go +++ b/pkg/internal/discover/watcher_kube_test.go @@ -72,8 +72,8 @@ func TestWatcherKubeEnricher(t *testing.T) { containerInfoForPID = fakeContainerInfo // Setup a fake K8s API connected to the watcherKubeEnricher k8sClient := fakek8sclientset.NewSimpleClientset() - informer := kube.Metadata{} - require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute)) + informer := kube.Metadata{SyncTimeout: 30 * time.Minute} + require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "")) wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})() require.NoError(t, err) inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10) @@ -118,8 +118,8 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { processInfo = fakeProcessInfo // Setup a fake K8s API connected to the watcherKubeEnricher k8sClient := fakek8sclientset.NewSimpleClientset() - informer := kube.Metadata{} - require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute)) + informer := kube.Metadata{SyncTimeout: 30 * time.Minute} + require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "")) wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})() require.NoError(t, err) pipeConfig := beyla.Config{} diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index 56dd67c6f..ae8ca3afa 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -23,10 +23,9 @@ import ( const ( kubeConfigEnvVariable = "KUBECONFIG" - resyncTime = 10 * time.Minute - defaultSyncTimeout = 10 * time.Minute + defaultResyncTime = 30 * time.Minute + defaultSyncTimeout = 30 * time.Second IndexPodByContainerIDs = "idx_pod_by_container" - IndexReplicaSetNames = "idx_rs" IndexIP = "idx_ip" typeNode = "Node" typePod = "Pod" @@ -53,6 +52,8 @@ type Metadata struct { containerEventHandlers []ContainerEventHandler + SyncTimeout time.Duration + resyncPeriod time.Duration disabledInformers maps.Bits } @@ -230,10 +231,10 @@ func rmContainerIDSchema(containerID string) string { return containerID } -func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string, timeout time.Duration) error { +func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string) error { // Initialization variables k.log = klog() - return k.initInformers(ctx, client, restrictNode, timeout) + return k.initInformers(ctx, client, restrictNode) } func LoadConfig(kubeConfigPath string) (*rest.Config, error) { @@ -263,21 +264,18 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) { return config, nil } -func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string, syncTimeout time.Duration) error { - if syncTimeout <= 0 { - syncTimeout = defaultSyncTimeout - } +func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string) error { var informerFactory informers.SharedInformerFactory if restrictNode == "" { k.log.Debug("no node selector provided. Listening to global resources") - informerFactory = informers.NewSharedInformerFactory(client, resyncTime) + informerFactory = informers.NewSharedInformerFactory(client, k.resyncPeriod) } else { fieldSelector := fields.OneTermEqualSelector("spec.nodeName", restrictNode).String() k.log.Debug("using field selector", "selector", fieldSelector) opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector }) - informerFactory = informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts) + informerFactory = informers.NewSharedInformerFactoryWithOptions(client, k.resyncPeriod, opts) // In the App O11y use case, we restrict to local nodes as we don't need to listen to global resources. // In App O11y, we don't need neither Node nor Service informers, so we disable them. k.disabledInformers |= InformerNode @@ -304,10 +302,11 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac select { case <-finishedCacheSync: k.log.Debug("kubernetes informers started") - return nil - case <-time.After(syncTimeout): - return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout) + case <-time.After(k.SyncTimeout): + k.log.Warn("kubernetes cache has not been synced after timeout. The kubernetes attributes might be incomplete."+ + " Consider increasing the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT value", "timeout", k.SyncTimeout) } + return nil } func (k *Metadata) AddContainerEventHandler(eh ContainerEventHandler) { diff --git a/pkg/internal/kube/informer_provider.go b/pkg/internal/kube/informer_provider.go index 0e6f821c2..face3b134 100644 --- a/pkg/internal/kube/informer_provider.go +++ b/pkg/internal/kube/informer_provider.go @@ -16,32 +16,43 @@ import ( "github.com/grafana/beyla/pkg/kubeflags" ) +type MetadataConfig struct { + Enable kubeflags.EnableFlag + DisabledInformers []string + KubeConfigPath string + EnableNetworkMetadata bool + SyncTimeout time.Duration + ResyncPeriod time.Duration +} + type MetadataProvider struct { mt sync.Mutex metadata *Metadata kubeConfigPath string syncTimeout time.Duration + resyncPeriod time.Duration enable atomic.Value disabledInformers maps.Bits enableNetworkMeta bool } -func NewMetadataProvider( - enable kubeflags.EnableFlag, - disabledInformers []string, - kubeConfigPath string, - enableNetworkMetadata bool, - syncTimeout time.Duration, -) *MetadataProvider { +func NewMetadataProvider(config MetadataConfig) *MetadataProvider { + if config.SyncTimeout == 0 { + config.SyncTimeout = defaultSyncTimeout + } + if config.ResyncPeriod == 0 { + config.ResyncPeriod = defaultResyncTime + } mp := &MetadataProvider{ - enableNetworkMeta: enableNetworkMetadata, - kubeConfigPath: kubeConfigPath, - syncTimeout: syncTimeout, - disabledInformers: informerTypes(disabledInformers), + enableNetworkMeta: config.EnableNetworkMetadata, + kubeConfigPath: config.KubeConfigPath, + syncTimeout: config.SyncTimeout, + resyncPeriod: config.ResyncPeriod, + disabledInformers: informerTypes(config.DisabledInformers), } - mp.enable.Store(enable) + mp.enable.Store(config.Enable) return mp } @@ -106,8 +117,12 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Metadata, error) { return nil, fmt.Errorf("can't get current node name: %w", err) } } - mp.metadata = &Metadata{disabledInformers: mp.disabledInformers} - if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName, mp.syncTimeout); err != nil { + mp.metadata = &Metadata{ + disabledInformers: mp.disabledInformers, + SyncTimeout: mp.syncTimeout, + resyncPeriod: mp.resyncPeriod, + } + if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName); err != nil { return nil, fmt.Errorf("can't initialize kubernetes metadata: %w", err) } return mp.metadata, nil diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index 18b6d74f6..ff39277cb 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -40,7 +40,7 @@ func gctx(groups attributes.AttrGroups) *global.ContextInfo { return &global.ContextInfo{ Metrics: imetrics.NoopReporter{}, MetricAttributeGroups: groups, - K8sInformer: kube.NewMetadataProvider(kubeflags.EnabledFalse, nil, "", false, 0), + K8sInformer: kube.NewMetadataProvider(kube.MetadataConfig{Enable: kubeflags.EnabledFalse}), HostID: "host-id", } } diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index 92011d205..0e3d3589a 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -31,6 +31,9 @@ type KubernetesDecorator struct { InformersSyncTimeout time.Duration `yaml:"informers_sync_timeout" env:"BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT"` + // InformersResyncPeriod defaults to 30m. Higher values will reduce the load on the Kube API. + InformersResyncPeriod time.Duration `yaml:"informers_resync_period" env:"BEYLA_KUBE_INFORMERS_RESYNC_PERIOD"` + // DropExternal will drop, in NetO11y component, any flow where the source or destination // IPs are not matched to any kubernetes entity, assuming they are cluster-external DropExternal bool `yaml:"drop_external" env:"BEYLA_NETWORK_DROP_EXTERNAL"`