From 6056ba1765f2892a1ffba93b34d932be1293190c Mon Sep 17 00:00:00 2001 From: Garrybest Date: Mon, 29 Nov 2021 23:01:22 +0800 Subject: [PATCH] Reform main function to adapt custom resource registry Signed-off-by: Garrybest --- main.go | 269 +-------------------------------------- pkg/app/server.go | 313 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+), 263 deletions(-) create mode 100644 pkg/app/server.go diff --git a/main.go b/main.go index a1a15b3ef5..ff8e5ff0cd 100644 --- a/main.go +++ b/main.go @@ -19,65 +19,21 @@ package main import ( "context" "fmt" - "net" - "net/http" - "net/http/pprof" "os" - "strconv" - "time" - "github.com/oklog/run" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" - "github.com/prometheus/exporter-toolkit/web" - vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" - clientset "k8s.io/client-go/kubernetes" - _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" - "k8s.io/kube-state-metrics/v2/internal/store" - "k8s.io/kube-state-metrics/v2/pkg/allowdenylist" - generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" - "k8s.io/kube-state-metrics/v2/pkg/metricshandler" - "k8s.io/kube-state-metrics/v2/pkg/optin" + "k8s.io/kube-state-metrics/v2/pkg/app" "k8s.io/kube-state-metrics/v2/pkg/options" - "k8s.io/kube-state-metrics/v2/pkg/util/proc" ) -const ( - metricsPath = "/metrics" - healthzPath = "/healthz" -) - -// promLogger implements promhttp.Logger -type promLogger struct{} - -func (pl promLogger) Println(v ...interface{}) { - klog.Error(v...) -} - -// promLogger implements the Logger interface -func (pl promLogger) Log(v ...interface{}) error { - klog.Info(v...) - return nil -} - func main() { opts := options.NewOptions() opts.AddFlags() - promLogger := promLogger{} - - ctx := context.Background() - - err := opts.Parse() - if err != nil { - klog.Fatalf("Error: %s", err) + if err := opts.Parse(); err != nil { + klog.Fatalf("Parsing flag definitions error: %v", err) } if opts.Version { @@ -89,222 +45,9 @@ func main() { opts.Usage() os.Exit(0) } - storeBuilder := store.NewBuilder() - - ksmMetricsRegistry := prometheus.NewRegistry() - ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics")) - durationVec := promauto.With(ksmMetricsRegistry).NewHistogramVec( - prometheus.HistogramOpts{ - Name: "http_request_duration_seconds", - Help: "A histogram of requests for kube-state-metrics metrics handler.", - Buckets: prometheus.DefBuckets, - ConstLabels: prometheus.Labels{"handler": "metrics"}, - }, []string{"method"}, - ) - storeBuilder.WithMetrics(ksmMetricsRegistry) - - var resources []string - if len(opts.Resources) == 0 { - klog.Info("Using default resources") - resources = options.DefaultResources.AsSlice() - } else { - klog.Infof("Using resources %s", opts.Resources.String()) - resources = opts.Resources.AsSlice() - } - - if err := storeBuilder.WithEnabledResources(resources); err != nil { - klog.Fatalf("Failed to set up resources: %v", err) - } - - namespaces := opts.Namespaces.GetNamespaces() - nsFieldSelector := namespaces.GetExcludeNSFieldSelector(opts.NamespacesDenylist) - storeBuilder.WithNamespaces(namespaces, nsFieldSelector) - - allowDenyList, err := allowdenylist.New(opts.MetricAllowlist, opts.MetricDenylist) - if err != nil { - klog.Fatal(err) - } - - err = allowDenyList.Parse() - if err != nil { - klog.Fatalf("error initializing the allowdeny list : %v", err) - } - - klog.Infof("metric allow-denylisting: %v", allowDenyList.Status()) - - optInMetricFamilyFilter, err := optin.NewMetricFamilyFilter(opts.MetricOptInList) - if err != nil { - klog.Fatalf("error initializing the opt-in metric list : %v", err) - } - - if optInMetricFamilyFilter.Count() > 0 { - klog.Infof("metrics which were opted into: %v", optInMetricFamilyFilter.Status()) - } - - storeBuilder.WithFamilyGeneratorFilter(generator.NewCompositeFamilyGeneratorFilter( - allowDenyList, - optInMetricFamilyFilter, - )) - - storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc(), opts.UseAPIServerCache) - - proc.StartReaper() - - kubeClient, vpaClient, err := createKubeClient(opts.Apiserver, opts.Kubeconfig) - if err != nil { - klog.Fatalf("Failed to create client: %v", err) - } - storeBuilder.WithKubeClient(kubeClient) - storeBuilder.WithVPAClient(vpaClient) - storeBuilder.WithSharding(opts.Shard, opts.TotalShards) - storeBuilder.WithAllowAnnotations(opts.AnnotationsAllowList) - storeBuilder.WithAllowLabels(opts.LabelsAllowList) - ksmMetricsRegistry.MustRegister( - collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - collectors.NewGoCollector(), - ) - - var g run.Group - - m := metricshandler.New( - opts, - kubeClient, - storeBuilder, - opts.EnableGZIPEncoding, - ) - // Run MetricsHandler - { - ctxMetricsHandler, cancel := context.WithCancel(ctx) - g.Add(func() error { - return m.Run(ctxMetricsHandler) - }, func(error) { - cancel() - }) - } - - tlsConfig := opts.TLSConfig - - telemetryMux := buildTelemetryServer(ksmMetricsRegistry) - telemetryListenAddress := net.JoinHostPort(opts.TelemetryHost, strconv.Itoa(opts.TelemetryPort)) - telemetryServer := http.Server{Handler: telemetryMux, Addr: telemetryListenAddress} - - metricsMux := buildMetricsServer(m, durationVec) - metricsServerListenAddress := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) - metricsServer := http.Server{Handler: metricsMux, Addr: metricsServerListenAddress} - - // Run Telemetry server - { - g.Add(func() error { - klog.Infof("Starting kube-state-metrics self metrics server: %s", telemetryListenAddress) - return web.ListenAndServe(&telemetryServer, tlsConfig, promLogger) - }, func(error) { - ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - telemetryServer.Shutdown(ctxShutDown) - }) - } - // Run Metrics server - { - g.Add(func() error { - klog.Infof("Starting metrics server: %s", metricsServerListenAddress) - return web.ListenAndServe(&metricsServer, tlsConfig, promLogger) - }, func(error) { - ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - metricsServer.Shutdown(ctxShutDown) - }) - } - - if err := g.Run(); err != nil { - klog.Fatalf("RunGroup Error: %v", err) - } - klog.Info("Exiting") -} - -func createKubeClient(apiserver string, kubeconfig string) (clientset.Interface, vpaclientset.Interface, error) { - config, err := clientcmd.BuildConfigFromFlags(apiserver, kubeconfig) - if err != nil { - return nil, nil, err - } - - config.UserAgent = version.Version - config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" - config.ContentType = "application/vnd.kubernetes.protobuf" - - kubeClient, err := clientset.NewForConfig(config) - if err != nil { - return nil, nil, err - } - - vpaClient, err := vpaclientset.NewForConfig(config) - if err != nil { - return nil, nil, err - } - // Informers don't seem to do a good job logging error messages when it - // can't reach the server, making debugging hard. This makes it easier to - // figure out if apiserver is configured incorrectly. - klog.Infof("Testing communication with server") - v, err := kubeClient.Discovery().ServerVersion() - if err != nil { - return nil, nil, errors.Wrap(err, "error while trying to communicate with apiserver") + ctx := context.Background() + if err := app.RunKubeStateMetrics(ctx, opts); err != nil { + klog.Fatalf("Failed to run kube-state-metrics: %v", err) } - klog.Infof("Running with Kubernetes cluster version: v%s.%s. git version: %s. git tree state: %s. commit: %s. platform: %s", - v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform) - klog.Infof("Communication with server successful") - - return kubeClient, vpaClient, nil -} - -func buildTelemetryServer(registry prometheus.Gatherer) *http.ServeMux { - mux := http.NewServeMux() - - // Add metricsPath - mux.Handle(metricsPath, promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: promLogger{}})) - // Add index - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(` - Kube-State-Metrics Metrics Server - -

Kube-State-Metrics Metrics

- - - `)) - }) - return mux -} - -func buildMetricsServer(m *metricshandler.MetricsHandler, durationObserver prometheus.ObserverVec) *http.ServeMux { - mux := http.NewServeMux() - - // TODO: This doesn't belong into serveMetrics - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - - mux.Handle(metricsPath, promhttp.InstrumentHandlerDuration(durationObserver, m)) - - // Add healthzPath - mux.HandleFunc(healthzPath, func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte(http.StatusText(http.StatusOK))) - }) - // Add index - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(` - Kube Metrics Server - -

Kube Metrics

- - - `)) - }) - return mux } diff --git a/pkg/app/server.go b/pkg/app/server.go new file mode 100644 index 0000000000..d8205de28f --- /dev/null +++ b/pkg/app/server.go @@ -0,0 +1,313 @@ +/* +Copyright 2021 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/pprof" + "strconv" + "time" + + "github.com/oklog/run" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/version" + "github.com/prometheus/exporter-toolkit/web" + vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" + clientset "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth" // Initialize common client auth plugins. + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + "k8s.io/kube-state-metrics/v2/internal/store" + "k8s.io/kube-state-metrics/v2/pkg/allowdenylist" + "k8s.io/kube-state-metrics/v2/pkg/customresource" + generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator" + "k8s.io/kube-state-metrics/v2/pkg/metricshandler" + "k8s.io/kube-state-metrics/v2/pkg/optin" + "k8s.io/kube-state-metrics/v2/pkg/options" + "k8s.io/kube-state-metrics/v2/pkg/util/proc" +) + +const ( + metricsPath = "/metrics" + healthzPath = "/healthz" +) + +// promLogger implements promhttp.Logger +type promLogger struct{} + +func (pl promLogger) Println(v ...interface{}) { + klog.Error(v...) +} + +// promLogger implements the Logger interface +func (pl promLogger) Log(v ...interface{}) error { + klog.Info(v...) + return nil +} + +// RunKubeStateMetrics will build and run the kube-state-metrics. +// Any out-of-tree custom resource metrics could be registered by newing a registry factory +// which implements customresource.RegistryFactory and pass all factories into this function. +func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error { + promLogger := promLogger{} + + storeBuilder := store.NewBuilder() + storeBuilder.WithCustomResourceStoreFactories(factories...) + + ksmMetricsRegistry := prometheus.NewRegistry() + ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics")) + durationVec := promauto.With(ksmMetricsRegistry).NewHistogramVec( + prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "A histogram of requests for kube-state-metrics metrics handler.", + Buckets: prometheus.DefBuckets, + ConstLabels: prometheus.Labels{"handler": "metrics"}, + }, []string{"method"}, + ) + storeBuilder.WithMetrics(ksmMetricsRegistry) + + var resources []string + if len(opts.Resources) == 0 { + klog.Info("Using default resources") + resources = options.DefaultResources.AsSlice() + // enable custom resource + for _, factory := range factories { + resources = append(resources, factory.Name()) + } + } else { + klog.Infof("Using resources %s", opts.Resources.String()) + resources = opts.Resources.AsSlice() + } + + if err := storeBuilder.WithEnabledResources(resources); err != nil { + return fmt.Errorf("failed to set up resources: %v", err) + } + + namespaces := opts.Namespaces.GetNamespaces() + nsFieldSelector := namespaces.GetExcludeNSFieldSelector(opts.NamespacesDenylist) + storeBuilder.WithNamespaces(namespaces, nsFieldSelector) + + allowDenyList, err := allowdenylist.New(opts.MetricAllowlist, opts.MetricDenylist) + if err != nil { + return err + } + + err = allowDenyList.Parse() + if err != nil { + return fmt.Errorf("error initializing the allowdeny list: %v", err) + } + + klog.Infof("Metric allow-denylisting: %v", allowDenyList.Status()) + + optInMetricFamilyFilter, err := optin.NewMetricFamilyFilter(opts.MetricOptInList) + if err != nil { + return fmt.Errorf("error initializing the opt-in metric list: %v", err) + } + + if optInMetricFamilyFilter.Count() > 0 { + klog.Infof("Metrics which were opted into: %v", optInMetricFamilyFilter.Status()) + } + + storeBuilder.WithFamilyGeneratorFilter(generator.NewCompositeFamilyGeneratorFilter( + allowDenyList, + optInMetricFamilyFilter, + )) + + storeBuilder.WithUsingAPIServerCache(opts.UseAPIServerCache) + storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc()) + storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) + + proc.StartReaper() + + kubeClient, vpaClient, customResourceClients, err := createKubeClient(opts.Apiserver, opts.Kubeconfig, factories...) + if err != nil { + return fmt.Errorf("failed to create client: %v", err) + } + storeBuilder.WithKubeClient(kubeClient) + storeBuilder.WithVPAClient(vpaClient) + storeBuilder.WithCustomResourceClients(customResourceClients) + storeBuilder.WithSharding(opts.Shard, opts.TotalShards) + storeBuilder.WithAllowAnnotations(opts.AnnotationsAllowList) + storeBuilder.WithAllowLabels(opts.LabelsAllowList) + + ksmMetricsRegistry.MustRegister( + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + collectors.NewGoCollector(), + ) + + var g run.Group + + m := metricshandler.New( + opts, + kubeClient, + storeBuilder, + opts.EnableGZIPEncoding, + ) + // Run MetricsHandler + { + ctxMetricsHandler, cancel := context.WithCancel(ctx) + g.Add(func() error { + return m.Run(ctxMetricsHandler) + }, func(error) { + cancel() + }) + } + + tlsConfig := opts.TLSConfig + + telemetryMux := buildTelemetryServer(ksmMetricsRegistry) + telemetryListenAddress := net.JoinHostPort(opts.TelemetryHost, strconv.Itoa(opts.TelemetryPort)) + telemetryServer := http.Server{Handler: telemetryMux, Addr: telemetryListenAddress} + + metricsMux := buildMetricsServer(m, durationVec) + metricsServerListenAddress := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) + metricsServer := http.Server{Handler: metricsMux, Addr: metricsServerListenAddress} + + // Run Telemetry server + { + g.Add(func() error { + klog.Infof("Starting kube-state-metrics self metrics server: %s", telemetryListenAddress) + return web.ListenAndServe(&telemetryServer, tlsConfig, promLogger) + }, func(error) { + ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + telemetryServer.Shutdown(ctxShutDown) + }) + } + // Run Metrics server + { + g.Add(func() error { + klog.Infof("Starting metrics server: %s", metricsServerListenAddress) + return web.ListenAndServe(&metricsServer, tlsConfig, promLogger) + }, func(error) { + ctxShutDown, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + metricsServer.Shutdown(ctxShutDown) + }) + } + + if err := g.Run(); err != nil { + return fmt.Errorf("run server group error: %v", err) + } + klog.Info("Exiting") + return nil +} + +func createKubeClient(apiserver string, kubeconfig string, factories ...customresource.RegistryFactory) (clientset.Interface, vpaclientset.Interface, map[string]interface{}, error) { + config, err := clientcmd.BuildConfigFromFlags(apiserver, kubeconfig) + if err != nil { + return nil, nil, nil, err + } + + config.UserAgent = version.Version + config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" + config.ContentType = "application/vnd.kubernetes.protobuf" + + kubeClient, err := clientset.NewForConfig(config) + if err != nil { + return nil, nil, nil, err + } + + vpaClient, err := vpaclientset.NewForConfig(config) + if err != nil { + return nil, nil, nil, err + } + + customResourceClients := make(map[string]interface{}, len(factories)) + for _, f := range factories { + customResourceClient, err := f.CreateClient(config) + if err != nil { + return nil, nil, nil, err + } + customResourceClients[f.Name()] = customResourceClient + } + + // Informers don't seem to do a good job logging error messages when it + // can't reach the server, making debugging hard. This makes it easier to + // figure out if apiserver is configured incorrectly. + klog.Infof("Testing communication with server") + v, err := kubeClient.Discovery().ServerVersion() + if err != nil { + return nil, nil, nil, errors.Wrap(err, "error while trying to communicate with apiserver") + } + klog.Infof("Running with Kubernetes cluster version: v%s.%s. git version: %s. git tree state: %s. commit: %s. platform: %s", + v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform) + klog.Infof("Communication with server successful") + + return kubeClient, vpaClient, customResourceClients, nil +} + +func buildTelemetryServer(registry prometheus.Gatherer) *http.ServeMux { + mux := http.NewServeMux() + + // Add metricsPath + mux.Handle(metricsPath, promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: promLogger{}})) + // Add index + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + Kube-State-Metrics Metrics Server + +

Kube-State-Metrics Metrics

+ + + `)) + }) + return mux +} + +func buildMetricsServer(m *metricshandler.MetricsHandler, durationObserver prometheus.ObserverVec) *http.ServeMux { + mux := http.NewServeMux() + + // TODO: This doesn't belong into serveMetrics + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + + mux.Handle(metricsPath, promhttp.InstrumentHandlerDuration(durationObserver, m)) + + // Add healthzPath + mux.HandleFunc(healthzPath, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(http.StatusText(http.StatusOK))) + }) + // Add index + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + Kube Metrics Server + +

Kube Metrics

+ + + `)) + }) + return mux +}