Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configurable throughput for clients #91

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cmd/csi-external-health-monitor-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package main

import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"

flag "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -74,6 +75,11 @@ var (
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")

retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed pv monitoring. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed pv monitoring. Default is 5 minutes.")
kubeAPIQPS = flag.Float32("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are trying to decrease the API call frequency, I wonder if we should set the default lower than 5.0.
@pohly @msau42 what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. You would have to run a scalability experiment to determine which default works better. Also note that client-side throttling is being replaced by API priority and fairness.

IMHO more important than tweaks like this is to look at why health monitor causes API traffic. Wasn't it because it establishes watches for objects that change a lot (nodes, pods)? Then throttling won't help at all because it is applied to outbound requests, not incoming watch updates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API priority and fairness is now beta. Have you run any experiments that can demonstrate that this change is still effective?

kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
)

var (
Expand Down Expand Up @@ -112,6 +118,9 @@ func main() {
os.Exit(1)
}

config.QPS = *kubeAPIQPS
config.Burst = *kubeAPIBurst

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
Expand Down Expand Up @@ -210,8 +219,18 @@ func main() {
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(v1.NamespaceAll)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-pv-monitor-controller-%s", option.DriverName)})

monitorController := monitorcontroller.NewPVMonitorController(clientset, csiConn, factory.Core().V1().PersistentVolumes(),
factory.Core().V1().PersistentVolumeClaims(), factory.Core().V1().Pods(), factory.Core().V1().Nodes(), factory.Core().V1().Events(), eventRecorder, &option)
monitorController := monitorcontroller.NewPVMonitorController(
clientset,
csiConn,
factory.Core().V1().PersistentVolumes(),
factory.Core().V1().PersistentVolumeClaims(),
factory.Core().V1().Pods(),
factory.Core().V1().Nodes(),
factory.Core().V1().Events(),
eventRecorder,
&option,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
)

run := func(ctx context.Context) {
stopCh := ctx.Done()
Expand All @@ -224,7 +243,13 @@ func main() {
} else {
// Name of config map with leader election lock
lockName := "external-health-monitor-leader-" + storageDriver
le := leaderelection.NewLeaderElection(clientset, lockName, run)
// create a new clientset for leader election
leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
}
le := leaderelection.NewLeaderElection(leClientset, lockName, run)

if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/imdario/mergo v0.3.11 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.10.0
github.com/kubernetes-csi/csi-test/v3 v3.1.2-0.20200722022205-189919973123
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"k8s.io/client-go/util/workqueue"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -92,7 +94,17 @@ func runTest(t *testing.T, tc *testCase) {
}

mockCSIcontrollerServer(controllerServer, tc.supportListVolumes, volumes)
pvMonitorController := NewPVMonitorController(client, csiConn, pvInformer, pvcInformer, podInformer, nodeInformer, eventInformer, &eventRecorder, option)
pvMonitorController := NewPVMonitorController(client,
csiConn,
pvInformer,
pvcInformer,
podInformer,
nodeInformer,
eventInformer,
&eventRecorder,
option,
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
)
assert.NotNil(pvMonitorController)

if tc.hasRecoveryEvent {
Expand Down
16 changes: 12 additions & 4 deletions pkg/controller/node_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,17 @@ type NodeWatcher struct {
}

// NewNodeWatcher creates a node watcher object that will watch the nodes
func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister corelisters.PersistentVolumeLister,
pvcLister corelisters.PersistentVolumeClaimLister, nodeInformer coreinformers.NodeInformer,
recorder record.EventRecorder, pvcToPodsCache *util.PVCToPodsCache, nodeWorkerExecuteInterval time.Duration, nodeListAndAddInterval time.Duration) *NodeWatcher {
func NewNodeWatcher(driverName string,
client kubernetes.Interface,
volumeLister corelisters.PersistentVolumeLister,
pvcLister corelisters.PersistentVolumeClaimLister,
nodeInformer coreinformers.NodeInformer,
recorder record.EventRecorder,
pvcToPodsCache *util.PVCToPodsCache,
nodeWorkerExecuteInterval time.Duration,
nodeListAndAddInterval time.Duration,
contentRateLimiter workqueue.RateLimiter,
) *NodeWatcher {

watcher := &NodeWatcher{
driverName: driverName,
Expand All @@ -83,7 +91,7 @@ func NewNodeWatcher(driverName string, client kubernetes.Interface, volumeLister
recorder: recorder,
volumeLister: volumeLister,
pvcLister: pvcLister,
nodeQueue: workqueue.NewNamed("nodes"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "nodes"),
nodeFirstBrokenMap: make(map[string]time.Time),
nodeEverMarkedDown: make(map[string]bool),
pvcToPodsCache: pvcToPodsCache,
Expand Down
29 changes: 25 additions & 4 deletions pkg/controller/pv_monitor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,18 @@ type PVMonitorOptions struct {
}

// NewPVMonitorController creates PV monitor controller
func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn, pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, eventInformer coreinformers.EventInformer, eventRecorder record.EventRecorder, option *PVMonitorOptions) *PVMonitorController {
func NewPVMonitorController(
client kubernetes.Interface,
conn *grpc.ClientConn,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
eventInformer coreinformers.EventInformer,
eventRecorder record.EventRecorder,
option *PVMonitorOptions,
contentRateLimiter workqueue.RateLimiter,
) *PVMonitorController {

ctrl := &PVMonitorController{
csiConn: conn,
Expand All @@ -106,7 +116,7 @@ func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn,
enableNodeWatcher: option.EnableNodeWatcher,
client: client,
driverName: option.DriverName,
pvQueue: workqueue.NewNamed("csi-monitor-pv-queue"),
pvQueue: workqueue.NewNamedRateLimitingQueue(contentRateLimiter, "csi-monitor-pv-queue"),

pvcToPodsCache: util.NewPVCToPodsCache(),
pvEnqueued: make(map[string]bool),
Expand Down Expand Up @@ -151,7 +161,18 @@ func NewPVMonitorController(client kubernetes.Interface, conn *grpc.ClientConn,
})

if ctrl.enableNodeWatcher {
ctrl.nodeWatcher = NewNodeWatcher(ctrl.driverName, ctrl.client, ctrl.pvLister, ctrl.pvcLister, nodeInformer, ctrl.eventRecorder, ctrl.pvcToPodsCache, option.NodeWorkerExecuteInterval, option.NodeListAndAddInterval)
ctrl.nodeWatcher = NewNodeWatcher(
ctrl.driverName,
ctrl.client,
ctrl.pvLister,
ctrl.pvcLister,
nodeInformer,
ctrl.eventRecorder,
ctrl.pvcToPodsCache,
option.NodeWorkerExecuteInterval,
option.NodeListAndAddInterval,
contentRateLimiter,
)
}

ctrl.pvChecker = handler.NewPVHealthConditionChecker(option.DriverName, conn, client, option.ContextTimeout, ctrl.pvcLister, ctrl.pvLister, eventInformer, ctrl.eventRecorder)
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
github.com/prometheus/procfs/internal/util
# github.com/spf13/pflag v1.0.5
## explicit
github.com/spf13/pflag
# github.com/stretchr/testify v1.7.0
## explicit
Expand Down