Skip to content

Commit

Permalink
Add GRPC client and server metrics (#5615)
Browse files Browse the repository at this point in the history
Signed-off-by: Ali Aqel <[email protected]>
  • Loading branch information
aliaqel-stripe authored Apr 10, 2024
1 parent 9be5740 commit 08aeb57
Show file tree
Hide file tree
Showing 25 changed files with 1,601 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add active trigger name in ScaledObject's scale out event ([#5577](https://github.com/kedacore/keda/issues/5577))
- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add GRPC Client and Server metrics ([#5502](https://github.com/kedacore/keda/issues/5502))
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
Expand Down
30 changes: 29 additions & 1 deletion cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net/http"
"os"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
_ "go.uber.org/automaxprocs"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -105,6 +107,8 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

clientMetrics := getMetricInterceptor()

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Metrics: server.Options{
BindAddress: "0", // disabled since we use our own server to serve metrics
Expand All @@ -124,7 +128,7 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
}

logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory, metricsServiceGRPCAuthority)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory, metricsServiceGRPCAuthority, clientMetrics)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, nil, err
Expand Down Expand Up @@ -158,6 +162,30 @@ func getMetricHandler() http.HandlerFunc {
}
}

// getMetricInterceptor returns a metrics inceptor that records metrics between the adapter and opertaor
func getMetricInterceptor() *grpcprom.ClientMetrics {
metricsNamespace := "keda_internal_metricsservice"

counterNamespace := func(o *prometheus.CounterOpts) {
o.Namespace = metricsNamespace
}

histogramNamespace := func(o *prometheus.HistogramOpts) {
o.Namespace = metricsNamespace
}

clientMetrics := grpcprom.NewClientMetrics(
grpcprom.WithClientHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
histogramNamespace,
),
grpcprom.WithClientCounterOptions(counterNamespace),
)
legacyregistry.Registerer().MustRegister(clientMetrics)

return clientMetrics
}

// RunMetricsServer runs a http listener and handles the /metrics endpoint
// this is needed to consolidate apiserver and controller-runtime metrics
// we have to use a separate http server & can't rely on the controller-runtime implementation
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/google/go-github/v50 v50.2.0
github.com/google/uuid v1.6.0
github.com/gophercloud/gophercloud v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/hashicorp/vault/api v1.11.0
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/jackc/pgx/v5 v5.5.2
Expand Down Expand Up @@ -112,6 +113,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.3.0
)

require github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect

replace (
// pin k8s.io to v0.28.5
github.com/google/cel-go => github.com/google/cel-go v0.16.1
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,10 @@ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZH
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tggROQKKcnh4eItay6z/HbHLqghBxS8g7pyMhmDio=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 h1:o95KDiV/b1xdkumY5YbLR0/n2+wBxUpgf3HgfKgTyLI=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3/go.mod h1:hTxjzRcX49ogbTGVJ1sM5mz5s+SSgiGIyL3jjPxl32E=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down Expand Up @@ -1944,6 +1948,7 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
Expand Down Expand Up @@ -2283,6 +2288,7 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -2432,6 +2438,7 @@ google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y=
google.golang.org/grpc/examples v0.0.0-20210424002626-9572fd6faeae/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
17 changes: 16 additions & 1 deletion pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package metricscollector

import (
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
)

const (
ClusterTriggerAuthenticationResource = "cluster_trigger_authentication"
TriggerAuthenticationResource = "trigger_authentication"
Expand All @@ -27,7 +31,8 @@ const (
)

var (
collectors []MetricsCollector
collectors []MetricsCollector
promServerMetrics *grpcprom.ServerMetrics
)

type MetricsCollector interface {
Expand Down Expand Up @@ -76,6 +81,10 @@ func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetri
if enablePrometheusMetrics {
promometrics := NewPromMetrics()
collectors = append(collectors, promometrics)

if promServerMetrics == nil {
promServerMetrics = newPromServerMetrics()
}
}

if enableOpenTelemetryMetrics {
Expand Down Expand Up @@ -184,3 +193,9 @@ func RecordCloudEventQueueStatus(namespace string, value int) {
element.RecordCloudEventQueueStatus(namespace, value)
}
}

// Returns the ServerMetrics object for GRPC Server metrics. Used to initialize the GRPC server with the proper intercepts
// Currently, only Prometheus metrics are supported.
func GetServerMetrics() *grpcprom.ServerMetrics {
return promServerMetrics
}
27 changes: 27 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"strconv"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -329,3 +330,29 @@ func (p *PromMetrics) RecordCloudEventEmittedError(namespace string, cloudevents
func (p *PromMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
cloudeventQueueStatus.With(prometheus.Labels{"namespace": namespace}).Set(float64(value))
}

// Returns a grpcprom server Metrics object and registers the metrics. The object contains
// interceptors to chain to the server so that all requests served are observed. Intended to be called
// as part of initialization of metricscollector, hence why this function is not exported
func newPromServerMetrics() *grpcprom.ServerMetrics {
metricsNamespace := "keda_internal_metricsservice"

counterNamespace := func(o *prometheus.CounterOpts) {
o.Namespace = metricsNamespace
}

histogramNamespace := func(o *prometheus.HistogramOpts) {
o.Namespace = metricsNamespace
}

serverMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
histogramNamespace,
),
grpcprom.WithServerCounterOptions(counterNamespace),
)
metrics.Registry.MustRegister(serverMetrics)

return serverMetrics
}
9 changes: 8 additions & 1 deletion pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/go-logr/logr"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -36,7 +37,7 @@ type GrpcClient struct {
connection *grpc.ClientConn
}

func NewGrpcClient(url, certDir, authority string) (*GrpcClient, error) {
func NewGrpcClient(url, certDir, authority string, clientMetrics *grpcprom.ClientMetrics) (*GrpcClient, error) {
defaultConfig := fmt.Sprintf(`{
"methodConfig": [{
"timeout": "3s",
Expand Down Expand Up @@ -64,6 +65,12 @@ func NewGrpcClient(url, certDir, authority string) (*GrpcClient, error) {
grpc.WithDefaultServiceConfig(defaultConfig),
}

opts = append(
opts,
grpc.WithChainUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithChainStreamInterceptor(clientMetrics.StreamClientInterceptor()),
)

if authority != "" {
// If an Authority header override is specified, add it to the client so it is set on every request.
// This is useful when the address used to dial the GRPC server does not match any hosts provided in the TLS certificate's
Expand Down
16 changes: 15 additions & 1 deletion pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/metricsservice/api"
"github.com/kedacore/keda/v2/pkg/metricsservice/utils"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand Down Expand Up @@ -96,7 +97,20 @@ func (s *GrpcServer) Start(ctx context.Context) error {
if err != nil {
return err
}
s.server = grpc.NewServer(grpc.Creds(creds))

grpcServerOpts := []grpc.ServerOption{
grpc.Creds(creds),
}

if metricscollector.GetServerMetrics() != nil {
grpcServerOpts = append(
grpcServerOpts,
grpc.ChainStreamInterceptor(metricscollector.GetServerMetrics().StreamServerInterceptor()),
grpc.ChainUnaryInterceptor(metricscollector.GetServerMetrics().UnaryServerInterceptor()),
)
}

s.server = grpc.NewServer(grpcServerOpts...)
api.RegisterMetricsServiceServer(s.server, s)

s.healthServer = health.NewServer()
Expand Down
Loading

0 comments on commit 08aeb57

Please sign in to comment.