Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONTP-477] Add workload store to DCA for storing fail over application load from node agent #30562

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions cmd/cluster-agent/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

languagedetection "github.com/DataDog/datadog-agent/cmd/cluster-agent/api/v1/languagedetection"
"github.com/DataDog/datadog-agent/cmd/cluster-agent/api/v2/series"

"github.com/gorilla/mux"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
Expand Down Expand Up @@ -69,6 +70,10 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge
// API V1 Language Detection APIs
languagedetection.InstallLanguageDetectionEndpoints(ctx, apiRouter, w, cfg)

// API V2 Series APIs
v2ApiRouter := router.PathPrefix("/api/v2").Subrouter()
series.InstallNodeMetricsEndpoints(ctx, v2ApiRouter, cfg)

// Validate token for every request
router.Use(validateToken)

Expand Down Expand Up @@ -192,6 +197,7 @@ func isExternalPath(path string) bool {
return strings.HasPrefix(path, "/api/v1/metadata/") && len(strings.Split(path, "/")) == 7 || // support for agents < 6.5.0
path == "/version" ||
path == "/api/v1/languagedetection" ||
path == "/api/v2/series" ||
strings.HasPrefix(path, "/api/v1/annotations/node/") && len(strings.Split(path, "/")) == 6 ||
strings.HasPrefix(path, "/api/v1/cf/apps") && len(strings.Split(path, "/")) == 5 ||
strings.HasPrefix(path, "/api/v1/cf/apps/") && len(strings.Split(path, "/")) == 6 ||
Expand Down
9 changes: 9 additions & 0 deletions cmd/cluster-agent/api/v2/series/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

/*
Package series implements API handler for metric series submitted by node agent
*/
package series
130 changes: 130 additions & 0 deletions cmd/cluster-agent/api/v2/series/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

package series

import (
"context"
"sync"
"time"

"github.com/DataDog/agent-payload/v5/gogen"
loadstore "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/loadstore"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

const (
subsystem = "autoscaling_workload"
payloadProcessQPS = 1000
payloadProcessRateBurst = 50
)

var (
commonOpts = telemetry.Options{NoDoubleUnderscoreSep: true}

telemetryWorkloadEntities = telemetry.NewGaugeWithOpts(
subsystem,
"store_load_entities",
[]string{"namespace", "deployment", "loadname"},
"Number of entities in the store",
commonOpts,
)

telemetryWorkloadJobQueueLength = telemetry.NewCounterWithOpts(
subsystem,
"store_job_queue_length",
[]string{"status"},
"Length of the job queue",
commonOpts,
)
)

// jobQueue is a wrapper around workqueue.DelayingInterface to make it thread-safe.
type jobQueue struct {
taskQueue workqueue.TypedRateLimitingInterface[*gogen.MetricPayload]
isStarted bool
store loadstore.Store
m sync.Mutex
}

// newJobQueue creates a new jobQueue with no delay for adding items
func newJobQueue(ctx context.Context) *jobQueue {
q := jobQueue{
taskQueue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
&workqueue.TypedBucketRateLimiter[*gogen.MetricPayload]{
Limiter: rate.NewLimiter(rate.Limit(payloadProcessQPS), payloadProcessRateBurst),
},
)),
store: loadstore.GetWorkloadMetricStore(ctx),
isStarted: false,
}
go q.start(ctx)
return &q
}

func (jq *jobQueue) start(ctx context.Context) {
jq.m.Lock()
if jq.isStarted {
return
}
jq.isStarted = true
jq.m.Unlock()
defer jq.taskQueue.ShutDown()
jq.reportTelemetry(ctx)
for {
select {
case <-ctx.Done():
log.Infof("Stopping series payload job queue")
return
default:
jq.processNextWorkItem()
}
}
}

func (jq *jobQueue) processNextWorkItem() bool {
metricPayload, shutdown := jq.taskQueue.Get()
if shutdown {
return false
}
defer jq.taskQueue.Done(metricPayload)
telemetryWorkloadJobQueueLength.Inc("processed")
loadstore.ProcessLoadPayload(metricPayload, jq.store)
return true
}

func (jq *jobQueue) addJob(payload *gogen.MetricPayload) {
jq.taskQueue.Add(payload)
telemetryWorkloadJobQueueLength.Inc("queued")
}

func (jq *jobQueue) reportTelemetry(ctx context.Context) {
go func() {
infoTicker := time.NewTicker(60 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-infoTicker.C:
if jq.store == nil {
continue
}
info := jq.store.GetStoreInfo()
statsResults := info.StatsResults
for _, statsResult := range statsResults {
telemetryWorkloadEntities.Set(float64(statsResult.Count),
statsResult.Namespace,
statsResult.PodOwner,
statsResult.MetricName)
}
}
}
}()
}
93 changes: 93 additions & 0 deletions cmd/cluster-agent/api/v2/series/series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

package series

import (
"compress/gzip"
"compress/zlib"
"context"
"io"
"net/http"

"github.com/DataDog/agent-payload/v5/gogen"
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/pkg/clusteragent/api"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/gorilla/mux"
)

const (
encodingGzip = "gzip"
encodingDeflate = "deflate"
loadMetricsHandlerName = "load-metrics-handler"
)

// InstallNodeMetricsEndpoints register handler for node metrics collection
func InstallNodeMetricsEndpoints(ctx context.Context, r *mux.Router, cfg config.Component) {
leaderHander := newSeriesHandler(ctx)
handler := api.WithLeaderProxyHandler(
loadMetricsHandlerName,
func(w http.ResponseWriter, r *http.Request) bool { // preHandler
if !cfg.GetBool("autoscaling.failover.enabled") {
http.Error(w, "Autoscaling workload failover store is disabled on the cluster agent", http.StatusServiceUnavailable)
return false
}
if r.Body == nil {
http.Error(w, "Request body is empty", http.StatusBadRequest)
return false
}
return true
},
leaderHander.handle,
)
r.HandleFunc("/series", api.WithTelemetryWrapper(loadMetricsHandlerName, handler)).Methods("POST")
}

// Handler handles the series request and store the metrics to loadstore
type seriesHandler struct {
jobQueue *jobQueue
}

func newSeriesHandler(ctx context.Context) *seriesHandler {
handler := seriesHandler{
jobQueue: newJobQueue(ctx),
}
return &handler
}

func (h *seriesHandler) handle(w http.ResponseWriter, r *http.Request) {
log.Tracef("Received series request from %s", r.RemoteAddr)
var err error
var rc io.ReadCloser
switch r.Header.Get("Content-Encoding") {
case encodingGzip:
rc, err = gzip.NewReader(r.Body)
case encodingDeflate:
rc, err = zlib.NewReader(r.Body)
default:
rc = r.Body
}
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

payload, err := io.ReadAll(rc)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

metricPayload := &gogen.MetricPayload{}
if err := metricPayload.Unmarshal(payload); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
h.jobQueue.addJob(metricPayload)
w.WriteHeader(http.StatusOK)
}
19 changes: 19 additions & 0 deletions cmd/cluster-agent/api/v2/series/series_nocompile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build !kubeapiserver

package series

import (
"context"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/gorilla/mux"
)

// InstallNodeMetricsEndpoints installs node metrics collection endpoints
func InstallNodeMetricsEndpoints(_ context.Context, _ *mux.Router, _ config.Component) {
}
6 changes: 4 additions & 2 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,10 @@ func start(log log.Component,
}
}()

// Setup the leader forwarder for language detection and cluster checks
if config.GetBool("cluster_checks.enabled") || (config.GetBool("language_detection.enabled") && config.GetBool("language_detection.reporting.enabled")) {
// Setup the leader forwarder for autoscaling failover store, language detection and cluster checks
if config.GetBool("cluster_checks.enabled") ||
(config.GetBool("language_detection.enabled") && config.GetBool("language_detection.reporting.enabled")) ||
config.GetBool("autoscaling.failover.enabled") {
apidca.NewGlobalLeaderForwarder(
config.GetInt("cluster_agent.cmd_port"),
config.GetInt("cluster_agent.max_connections"),
Expand Down
11 changes: 11 additions & 0 deletions pkg/clusteragent/autoscaling/workload/loadstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

/*
Package loadstore stores local failover metrics for the workload that need autoscaling
*/
package loadstore
Loading
Loading