From 85b3f821b3b168e7bd5e51e20b9b63f23b266e6d Mon Sep 17 00:00:00 2001 From: Durgababu Neelam Date: Thu, 3 Oct 2024 14:49:52 +0530 Subject: [PATCH] periodic list support for list-watch mode --- receiver/k8sobjectsreceiver/receiver.go | 40 +++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index c6600bf1ac3e..9d4d0bdbecf3 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -273,11 +273,51 @@ func (kr *k8sobjectsreceiver) getResourceVersion(ctx context.Context, config *K8 logRecordCount := logs.LogRecordCount() err = kr.consumer.ConsumeLogs(obsCtx, logs) kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) + if config.Interval != 0 { + go kr.startPeriodicList(ctx, config, resource) + } } } return resourceVersion, nil } +func (kr *k8sobjectsreceiver) startPeriodicList(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) { + stopperChan := make(chan struct{}) + kr.mu.Lock() + kr.stopperChanList = append(kr.stopperChanList, stopperChan) + kr.mu.Unlock() + ticker := time.NewTicker(config.Interval) + listOption := metav1.ListOptions{ + FieldSelector: config.FieldSelector, + LabelSelector: config.LabelSelector, + } + + if config.ResourceVersion != "" { + listOption.ResourceVersion = config.ResourceVersion + listOption.ResourceVersionMatch = metav1.ResourceVersionMatchExact + } + + defer ticker.Stop() + for { + select { + case <-ticker.C: + objects, err := resource.List(ctx, listOption) + if err != nil { + kr.setting.Logger.Error("error in pulling object", zap.String("resource", config.gvr.String()), zap.Error(err)) + } else if len(objects.Items) > 0 { + logs := pullObjectsToLogData(objects, time.Now(), config) + obsCtx := kr.obsrecv.StartLogsOp(ctx) + logRecordCount := logs.LogRecordCount() + err = kr.consumer.ConsumeLogs(obsCtx, logs) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) + } + case <-stopperChan: + return + } + + } +} + // Start ticking immediately. // Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately func newTicker(ctx context.Context, repeat time.Duration) *time.Ticker {