Skip to content

Commit

Permalink
update telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuminyi committed Nov 17, 2024
1 parent 8a2146f commit ee354a8
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions cmd/cluster-agent/api/v2/series/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
loadstore "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/loadstore"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
"k8s.io/apimachinery/pkg/util/wait"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

const (
subsystem = "autoscaling_workload"
subsystem = "autoscaling_workload"
payloadProcessQPS = 500
payloadProcessRateBurst = 50
)

var (
Expand All @@ -30,37 +32,37 @@ var (
// telemetryWorkloadStoreMemory tracks the total memory usage of the store
telemetryWorkloadStoreMemory = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_memory_usage",
"store_memory_usage",
nil,
"Total memory usage of the store",
commonOpts,
)
telemetryWorkloadMetricEntities = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_metric_entities",
"store_metric_entities",
[]string{"metric"},
"Number of entities by metric names in the store",
commonOpts,
)
telemetryWorkloadNamespaceEntities = telemetry.NewGaugeWithOpts(
subsystem,
"load_store_namespace_entities",
"store_namespace_entities",
[]string{"namespace"},
"Number of entities by namespaces in the store",
commonOpts,
)
telemetryWorkloadJobQueueLength = telemetry.NewGaugeWithOpts(
telemetryWorkloadJobQueueLength = telemetry.NewCounterWithOpts(
subsystem,
"load_store_job_queue_length",
nil,
"store_job_queue_length",
[]string{"status"},
"Length of the job queue",
commonOpts,
)
)

// jobQueue is a wrapper around workqueue.DelayingInterface to make it thread-safe.
type jobQueue struct {
queue workqueue.DelayingInterface
taskQueue workqueue.TypedRateLimitingInterface[*gogen.MetricPayload]
isStarted bool
store loadstore.Store
m sync.Mutex
Expand All @@ -69,66 +71,72 @@ type jobQueue struct {
// newJobQueue creates a new jobQueue with no delay for adding items
func newJobQueue(ctx context.Context) *jobQueue {
q := jobQueue{
queue: workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{
Name: "seriesPayloadJobQueue",
}),
taskQueue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
&workqueue.TypedBucketRateLimiter[*gogen.MetricPayload]{
Limiter: rate.NewLimiter(rate.Limit(payloadProcessQPS), payloadProcessRateBurst),
},
)),
store: loadstore.NewEntityStore(ctx),
isStarted: false,
}
go q.start(ctx)
return &q
}

func (jq *jobQueue) worker() {
for jq.processNextWorkItem() {
}
}

func (jq *jobQueue) start(ctx context.Context) {
jq.m.Lock()
if jq.isStarted {
return
}
jq.isStarted = true
jq.m.Unlock()
defer jq.queue.ShutDown()
go wait.Until(jq.worker, time.Second, ctx.Done())
infoTicker := time.NewTicker(60 * time.Second)
defer jq.taskQueue.ShutDown()
jq.reportTelemetry(ctx)
for {
select {
case <-ctx.Done():
log.Infof("Stopping series payload job queue")
return
case <-infoTicker.C:
info := jq.store.GetStoreInfo()
telemetryWorkloadStoreMemory.Set(float64(info.TotalMemoryUsage))
for k, v := range info.EntityCountByMetric {
telemetryWorkloadMetricEntities.Set(float64(v), k)
}
for k, v := range info.EntityCountByNamespace {
telemetryWorkloadNamespaceEntities.Set(float64(v), k)
}
telemetryWorkloadJobQueueLength.Set(float64(jq.queue.Len()))
log.Debugf("Store info: %+v", info)
default:
jq.processNextWorkItem()
}
}
}

func (jq *jobQueue) processNextWorkItem() bool {
obj, shutdown := jq.queue.Get()
metricPayload, shutdown := jq.taskQueue.Get()
if shutdown {
return false
}
defer jq.queue.Done(obj)
metricPayload, ok := obj.(*gogen.MetricPayload)
if !ok {
log.Errorf("Expected MetricPayload but got %T", obj)
return true
}
defer jq.taskQueue.Done(metricPayload)
telemetryWorkloadJobQueueLength.Inc("processed")
loadstore.ProcessLoadPayload(metricPayload, jq.store)
return true
}

func (jq *jobQueue) addJob(payload *gogen.MetricPayload) {
jq.queue.Add(payload)
jq.taskQueue.Add(payload)
telemetryWorkloadJobQueueLength.Inc("queued")
}

func (jq *jobQueue) reportTelemetry(ctx context.Context) {
go func() {
infoTicker := time.NewTicker(60 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-infoTicker.C:
info := jq.store.GetStoreInfo()
telemetryWorkloadStoreMemory.Set(float64(info.TotalMemoryUsage))
for k, v := range info.EntityCountByMetric {
telemetryWorkloadMetricEntities.Set(float64(v), k)
}
for k, v := range info.EntityCountByNamespace {
telemetryWorkloadNamespaceEntities.Set(float64(v), k)
}
log.Infof("Store info: %+v", info)
}
}
}()
}

0 comments on commit ee354a8

Please sign in to comment.