Skip to content

Commit

Permalink
Allow configuring informer resync time (#1216)
Browse files Browse the repository at this point in the history
* Allow configuring informer resync time

* make Vale happy
  • Loading branch information
mariomac authored Oct 2, 2024
1 parent 4c93dcd commit c6c0aa1
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 45 deletions.
19 changes: 19 additions & 0 deletions docs/sources/configure/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,25 @@ reduces the load of the Kubernetes API.
The Pods informer can't be disabled. For that purpose, you should disable the whole
Kubernetes metadata decoration.

| YAML | Environment variable | Type | Default |
|--------------------------|-------------------------------------|----------|---------|
| `informers_sync_timeout` | `BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT` | Duration | 30s |

Maximum time that Beyla waits for getting all the Kubernetes metadata before starting
to decorate metrics and traces. If this timeout is reached, Beyla starts normally but
the metadata attributes might be incomplete until all the Kubernetes metadata is locally
updated in background.

| YAML | Environment variable | Type | Default |
|---------------------------|--------------------------------------|----------|---------|
| `informers_resync_period` | `BEYLA_KUBE_INFORMERS_RESYNC_PERIOD` | Duration | 30m |

Beyla is subscribed to immediately receive any update on resources' metadata. In addition,
Beyla periodically resynchronizes the whole Kubernetes metadata at the frequency specified
by this property.

Higher values reduce the load on the Kubernetes API service.

## Routes decorator

YAML section `routes`.
Expand Down
5 changes: 3 additions & 2 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ var DefaultConfig = Config{
HostnameDNSResolution: true,
},
Kubernetes: transform.KubernetesDecorator{
Enable: kubeflags.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
Enable: kubeflags.EnabledDefault,
InformersSyncTimeout: 30 * time.Second,
InformersResyncPeriod: 30 * time.Minute,
},
HostID: HostIDConfig{
FetchTimeout: 500 * time.Millisecond,
Expand Down
7 changes: 4 additions & 3 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ network:
HostnameDNSResolution: true,
},
Kubernetes: transform.KubernetesDecorator{
KubeconfigPath: "/foo/bar",
Enable: kubeflags.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
KubeconfigPath: "/foo/bar",
Enable: kubeflags.EnabledTrue,
InformersSyncTimeout: 30 * time.Second,
InformersResyncPeriod: 30 * time.Minute,
},
HostID: HostIDConfig{
Override: "the-host-id",
Expand Down
15 changes: 8 additions & 7 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,14 @@ func buildCommonContextInfo(
promMgr := &connector.PrometheusManager{}
ctxInfo := &global.ContextInfo{
Prometheus: promMgr,
K8sInformer: kube.NewMetadataProvider(
config.Attributes.Kubernetes.Enable,
config.Attributes.Kubernetes.DisableInformers,
config.Attributes.Kubernetes.KubeconfigPath,
config.Enabled(beyla.FeatureNetO11y),
config.Attributes.Kubernetes.InformersSyncTimeout,
),
K8sInformer: kube.NewMetadataProvider(kube.MetadataConfig{
Enable: config.Attributes.Kubernetes.Enable,
EnableNetworkMetadata: config.Enabled(beyla.FeatureNetO11y),
KubeConfigPath: config.Attributes.Kubernetes.KubeconfigPath,
SyncTimeout: config.Attributes.Kubernetes.InformersSyncTimeout,
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
}),
}
switch {
case config.InternalMetrics.Prometheus.Port != 0:
Expand Down
8 changes: 4 additions & 4 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func TestWatcherKubeEnricher(t *testing.T) {
containerInfoForPID = fakeContainerInfo
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute))
informer := kube.Metadata{SyncTimeout: 30 * time.Minute}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, ""))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10)
Expand Down Expand Up @@ -118,8 +118,8 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) {
processInfo = fakeProcessInfo
// Setup a fake K8s API connected to the watcherKubeEnricher
k8sClient := fakek8sclientset.NewSimpleClientset()
informer := kube.Metadata{}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, "", 30*time.Minute))
informer := kube.Metadata{SyncTimeout: 30 * time.Minute}
require.NoError(t, informer.InitFromClient(context.TODO(), k8sClient, ""))
wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &informerProvider{informer: &informer}, fakeInternalMetrics{})()
require.NoError(t, err)
pipeConfig := beyla.Config{}
Expand Down
27 changes: 13 additions & 14 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (

const (
kubeConfigEnvVariable = "KUBECONFIG"
resyncTime = 10 * time.Minute
defaultSyncTimeout = 10 * time.Minute
defaultResyncTime = 30 * time.Minute
defaultSyncTimeout = 30 * time.Second
IndexPodByContainerIDs = "idx_pod_by_container"
IndexReplicaSetNames = "idx_rs"
IndexIP = "idx_ip"
typeNode = "Node"
typePod = "Pod"
Expand All @@ -53,6 +52,8 @@ type Metadata struct {

containerEventHandlers []ContainerEventHandler

SyncTimeout time.Duration
resyncPeriod time.Duration
disabledInformers maps.Bits
}

Expand Down Expand Up @@ -230,10 +231,10 @@ func rmContainerIDSchema(containerID string) string {
return containerID
}

func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string, timeout time.Duration) error {
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, restrictNode string) error {
// Initialization variables
k.log = klog()
return k.initInformers(ctx, client, restrictNode, timeout)
return k.initInformers(ctx, client, restrictNode)
}

func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
Expand Down Expand Up @@ -263,21 +264,18 @@ func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
return config, nil
}

func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string, syncTimeout time.Duration) error {
if syncTimeout <= 0 {
syncTimeout = defaultSyncTimeout
}
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, restrictNode string) error {
var informerFactory informers.SharedInformerFactory
if restrictNode == "" {
k.log.Debug("no node selector provided. Listening to global resources")
informerFactory = informers.NewSharedInformerFactory(client, resyncTime)
informerFactory = informers.NewSharedInformerFactory(client, k.resyncPeriod)
} else {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", restrictNode).String()
k.log.Debug("using field selector", "selector", fieldSelector)
opts := informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector
})
informerFactory = informers.NewSharedInformerFactoryWithOptions(client, resyncTime, opts)
informerFactory = informers.NewSharedInformerFactoryWithOptions(client, k.resyncPeriod, opts)
// In the App O11y use case, we restrict to local nodes as we don't need to listen to global resources.
// In App O11y, we don't need neither Node nor Service informers, so we disable them.
k.disabledInformers |= InformerNode
Expand All @@ -304,10 +302,11 @@ func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interfac
select {
case <-finishedCacheSync:
k.log.Debug("kubernetes informers started")
return nil
case <-time.After(syncTimeout):
return fmt.Errorf("kubernetes cache has not been synced after %s timeout", syncTimeout)
case <-time.After(k.SyncTimeout):
k.log.Warn("kubernetes cache has not been synced after timeout. The kubernetes attributes might be incomplete."+
" Consider increasing the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT value", "timeout", k.SyncTimeout)
}
return nil
}

func (k *Metadata) AddContainerEventHandler(eh ContainerEventHandler) {
Expand Down
43 changes: 29 additions & 14 deletions pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,43 @@ import (
"github.com/grafana/beyla/pkg/kubeflags"
)

type MetadataConfig struct {
Enable kubeflags.EnableFlag
DisabledInformers []string
KubeConfigPath string
EnableNetworkMetadata bool
SyncTimeout time.Duration
ResyncPeriod time.Duration
}

type MetadataProvider struct {
mt sync.Mutex
metadata *Metadata

kubeConfigPath string
syncTimeout time.Duration
resyncPeriod time.Duration

enable atomic.Value
disabledInformers maps.Bits
enableNetworkMeta bool
}

func NewMetadataProvider(
enable kubeflags.EnableFlag,
disabledInformers []string,
kubeConfigPath string,
enableNetworkMetadata bool,
syncTimeout time.Duration,
) *MetadataProvider {
func NewMetadataProvider(config MetadataConfig) *MetadataProvider {
if config.SyncTimeout == 0 {
config.SyncTimeout = defaultSyncTimeout
}
if config.ResyncPeriod == 0 {
config.ResyncPeriod = defaultResyncTime
}
mp := &MetadataProvider{
enableNetworkMeta: enableNetworkMetadata,
kubeConfigPath: kubeConfigPath,
syncTimeout: syncTimeout,
disabledInformers: informerTypes(disabledInformers),
enableNetworkMeta: config.EnableNetworkMetadata,
kubeConfigPath: config.KubeConfigPath,
syncTimeout: config.SyncTimeout,
resyncPeriod: config.ResyncPeriod,
disabledInformers: informerTypes(config.DisabledInformers),
}
mp.enable.Store(enable)
mp.enable.Store(config.Enable)
return mp
}

Expand Down Expand Up @@ -106,8 +117,12 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Metadata, error) {
return nil, fmt.Errorf("can't get current node name: %w", err)
}
}
mp.metadata = &Metadata{disabledInformers: mp.disabledInformers}
if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName, mp.syncTimeout); err != nil {
mp.metadata = &Metadata{
disabledInformers: mp.disabledInformers,
SyncTimeout: mp.syncTimeout,
resyncPeriod: mp.resyncPeriod,
}
if err := mp.metadata.InitFromClient(ctx, kubeClient, restrictNodeName); err != nil {
return nil, fmt.Errorf("can't initialize kubernetes metadata: %w", err)
}
return mp.metadata, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/pipe/instrumenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func gctx(groups attributes.AttrGroups) *global.ContextInfo {
return &global.ContextInfo{
Metrics: imetrics.NoopReporter{},
MetricAttributeGroups: groups,
K8sInformer: kube.NewMetadataProvider(kubeflags.EnabledFalse, nil, "", false, 0),
K8sInformer: kube.NewMetadataProvider(kube.MetadataConfig{Enable: kubeflags.EnabledFalse}),
HostID: "host-id",
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type KubernetesDecorator struct {

InformersSyncTimeout time.Duration `yaml:"informers_sync_timeout" env:"BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT"`

// InformersResyncPeriod defaults to 30m. Higher values will reduce the load on the Kube API.
InformersResyncPeriod time.Duration `yaml:"informers_resync_period" env:"BEYLA_KUBE_INFORMERS_RESYNC_PERIOD"`

// DropExternal will drop, in NetO11y component, any flow where the source or destination
// IPs are not matched to any kubernetes entity, assuming they are cluster-external
DropExternal bool `yaml:"drop_external" env:"BEYLA_NETWORK_DROP_EXTERNAL"`
Expand Down

0 comments on commit c6c0aa1

Please sign in to comment.