Skip to content

Commit

Permalink
Merge pull request #30 from opsramp/release/v0.104.x
Browse files Browse the repository at this point in the history
periodic list support for list-watch mode
  • Loading branch information
opsrampdeveloper authored Oct 3, 2024
2 parents 97fad21 + 85b3f82 commit 779daf8
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,51 @@ func (kr *k8sobjectsreceiver) getResourceVersion(ctx context.Context, config *K8
logRecordCount := logs.LogRecordCount()
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)
if config.Interval != 0 {
go kr.startPeriodicList(ctx, config, resource)
}
}
}
return resourceVersion, nil
}

func (kr *k8sobjectsreceiver) startPeriodicList(ctx context.Context, config *K8sObjectsConfig, resource dynamic.ResourceInterface) {
stopperChan := make(chan struct{})
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()
ticker := time.NewTicker(config.Interval)
listOption := metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
}

if config.ResourceVersion != "" {
listOption.ResourceVersion = config.ResourceVersion
listOption.ResourceVersionMatch = metav1.ResourceVersionMatchExact
}

defer ticker.Stop()
for {
select {
case <-ticker.C:
objects, err := resource.List(ctx, listOption)
if err != nil {
kr.setting.Logger.Error("error in pulling object", zap.String("resource", config.gvr.String()), zap.Error(err))
} else if len(objects.Items) > 0 {
logs := pullObjectsToLogData(objects, time.Now(), config)
obsCtx := kr.obsrecv.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err)
}
case <-stopperChan:
return
}

}
}

// Start ticking immediately.
// Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
func newTicker(ctx context.Context, repeat time.Duration) *time.Ticker {
Expand Down

0 comments on commit 779daf8

Please sign in to comment.