From db2cd97f1224917c4577ef72a7ad465cf1e3c606 Mon Sep 17 00:00:00 2001 From: Konrad Delong Date: Fri, 15 Nov 2024 12:07:33 +0100 Subject: [PATCH 1/3] changed KNP agent clientset to pick up the larger of the two server counts --- pkg/agent/clientset.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 785f14f48..71a5b07f4 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -218,15 +218,25 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) ServerCount() int { - var serverCount int + countFromLeases := 0 if cs.leaseCounter != nil { - serverCount = cs.leaseCounter.Count() - } else { - serverCount = cs.lastReceivedServerCount + countFromLeases = cs.leaseCounter.Count() + } + countFromResponses := cs.lastReceivedServerCount + + serverCount := countFromLeases + countSource := "KNP server lease count" + if countFromResponses > serverCount { + serverCount = countFromResponses + countSource = "KNP server response headers" + } + if serverCount == 0 { + serverCount = 1 + countSource = "fallback to 1" } if serverCount != cs.lastServerCount { - klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount) + klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSource) cs.lastServerCount = serverCount } From 1eec50b5b6f46e8d1e61d16eb939e6c2f2d101d6 Mon Sep 17 00:00:00 2001 From: Konrad Delong Date: Fri, 15 Nov 2024 15:21:43 +0100 Subject: [PATCH 2/3] wrapped ServerLeaseCounter into an interface and added a unit test for the ServerCount() method in clientset --- pkg/agent/clientset.go | 4 +- pkg/agent/clientset_test.go | 75 +++++++++++++++++++++++++++++++++++++ pkg/agent/lease_counter.go | 4 ++ 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 pkg/agent/clientset_test.go diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 71a5b07f4..324dd12d4 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -39,7 +39,7 @@ type ClientSet struct { agentID string // ID of this agent address string // proxy server address. Assuming HA proxy server - leaseCounter *ServerLeaseCounter // counts number of proxy server leases + leaseCounter ServerCounter // counts number of proxy server leases lastReceivedServerCount int // last server count received from a proxy server lastServerCount int // last server count value from either lease system or proxy server, former takes priority @@ -147,7 +147,7 @@ type ClientSetConfig struct { WarnOnChannelLimit bool SyncForever bool XfrChannelSize int - ServerLeaseCounter *ServerLeaseCounter + ServerLeaseCounter ServerCounter } func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet { diff --git a/pkg/agent/clientset_test.go b/pkg/agent/clientset_test.go new file mode 100644 index 000000000..be9860daa --- /dev/null +++ b/pkg/agent/clientset_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "testing" +) + +type FakeServerCounter struct { + count int +} + +func (f *FakeServerCounter) Count() int { + return f.count +} + +func TestServerCount(t *testing.T) { + testCases := []struct{ + name string + responseCount int + leaseCount int + want int + } { + { + name: "higher from response", + responseCount: 42, + leaseCount: 24, + want: 42, + }, + { + name: "higher from leases", + responseCount: 3, + leaseCount: 6, + want: 6, + }, + { + name: "both zero", + responseCount: 0, + leaseCount: 0, + want: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lc := &FakeServerCounter{ + count: tc.leaseCount, + } + + cs := &ClientSet{ + clients: make(map[string]*Client), + leaseCounter: lc, + } + cs.lastReceivedServerCount = tc.responseCount + if got := cs.ServerCount(); got != tc.want { + t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want) + } + }) + } + +} diff --git a/pkg/agent/lease_counter.go b/pkg/agent/lease_counter.go index 4175d4ffe..cdc74ce70 100644 --- a/pkg/agent/lease_counter.go +++ b/pkg/agent/lease_counter.go @@ -36,6 +36,10 @@ import ( coordinationv1lister "k8s.io/client-go/listers/coordination/v1" ) +type ServerCounter interface { + Count() int +} + // A ServerLeaseCounter counts leases in the k8s apiserver to determine the // current proxy server count. type ServerLeaseCounter struct { From 1b41692d9c53384f1f468320560ad020c694574f Mon Sep 17 00:00:00 2001 From: Konrad Delong Date: Fri, 15 Nov 2024 20:20:26 +0100 Subject: [PATCH 3/3] added a flag server-count-source to control the behavior when two sources are present --- cmd/agent/app/options/options.go | 11 ++++++ cmd/agent/app/options/options_test.go | 4 ++ pkg/agent/clientset.go | 55 +++++++++++++++++++-------- pkg/agent/clientset_test.go | 34 +++++++++++++---- 4 files changed, 81 insertions(+), 23 deletions(-) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 3f0950963..e97189e7c 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -89,6 +89,8 @@ type GrpcProxyAgentOptions struct { LeaseNamespace string // Labels on which lease objects are managed. LeaseLabel string + // ServerCountSource describes how server counts should be combined. + ServerCountSource string // Path to kubeconfig (used by kubernetes client for lease listing) KubeconfigPath string // Content type of requests sent to apiserver. @@ -108,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) WarnOnChannelLimit: o.WarnOnChannelLimit, SyncForever: o.SyncForever, XfrChannelSize: o.XfrChannelSize, + ServerCountSource: o.ServerCountSource, } } @@ -138,6 +141,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.") flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.") flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.") + flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file") flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") return flags @@ -168,6 +172,7 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases) klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace) klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel) + klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource) klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize) klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) } @@ -232,6 +237,11 @@ func (o *GrpcProxyAgentOptions) Validate() error { return err } } + if o.ServerCountSource != "" { + if o.ServerCountSource != "default" && o.ServerCountSource != "max" { + return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource) + } + } return nil } @@ -281,6 +291,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { CountServerLeases: false, LeaseNamespace: "kube-system", LeaseLabel: "k8s-app=konnectivity-server", + ServerCountSource: "default", KubeconfigPath: "", APIContentType: runtime.ContentTypeProtobuf, } diff --git a/cmd/agent/app/options/options_test.go b/cmd/agent/app/options/options_test.go index 7ce38b2da..9432217e7 100644 --- a/cmd/agent/app/options/options_test.go +++ b/cmd/agent/app/options/options_test.go @@ -156,6 +156,10 @@ func TestValidate(t *testing.T) { fieldMap: map[string]interface{}{"XfrChannelSize": -10}, expected: fmt.Errorf("channel size -10 must be greater than 0"), }, + "ServerCountSource": { + fieldMap: map[string]interface{}{"ServerCountSource": "foobar"}, + expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"), + }, } { t.Run(desc, func(t *testing.T) { testAgentOptions := NewGrpcProxyAgentOptions() diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 324dd12d4..a17948b99 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -30,6 +30,12 @@ import ( "sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics" ) +const ( + fromResponses = "KNP server response headers" + fromLeases = "KNP lease count" + fromFallback = "fallback to 1" +) + // ClientSet consists of clients connected to each instance of an HA proxy server. type ClientSet struct { mu sync.Mutex //protects the clients. @@ -68,6 +74,7 @@ type ClientSet struct { xfrChannelSize int syncForever bool // Continue syncing (support dynamic server count). + serverCountSource string } func (cs *ClientSet) ClientsCount() int { @@ -148,6 +155,7 @@ type ClientSetConfig struct { SyncForever bool XfrChannelSize int ServerLeaseCounter ServerCounter + ServerCountSource string } func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet { @@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C xfrChannelSize: cc.XfrChannelSize, stopCh: stopCh, leaseCounter: cc.ServerLeaseCounter, + serverCountSource: cc.ServerCountSource, } } @@ -218,25 +227,41 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) ServerCount() int { - countFromLeases := 0 - if cs.leaseCounter != nil { - countFromLeases = cs.leaseCounter.Count() - } - countFromResponses := cs.lastReceivedServerCount - serverCount := countFromLeases - countSource := "KNP server lease count" - if countFromResponses > serverCount { - serverCount = countFromResponses - countSource = "KNP server response headers" - } - if serverCount == 0 { - serverCount = 1 - countSource = "fallback to 1" + var serverCount int + var countSourceLabel string + + switch cs.serverCountSource { + case "", "default": + if cs.leaseCounter != nil { + serverCount = cs.leaseCounter.Count() + countSourceLabel = fromLeases + } else { + serverCount = cs.lastReceivedServerCount + countSourceLabel = fromResponses + } + case "max": + countFromLeases := 0 + if cs.leaseCounter != nil { + countFromLeases = cs.leaseCounter.Count() + } + countFromResponses := cs.lastReceivedServerCount + + serverCount = countFromLeases + countSourceLabel = fromLeases + if countFromResponses > serverCount { + serverCount = countFromResponses + countSourceLabel = fromResponses + } + if serverCount == 0 { + serverCount = 1 + countSourceLabel = fromFallback + } + } if serverCount != cs.lastServerCount { - klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSource) + klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel) cs.lastServerCount = serverCount } diff --git a/pkg/agent/clientset_test.go b/pkg/agent/clientset_test.go index be9860daa..a7b674cf9 100644 --- a/pkg/agent/clientset_test.go +++ b/pkg/agent/clientset_test.go @@ -31,39 +31,57 @@ func (f *FakeServerCounter) Count() int { func TestServerCount(t *testing.T) { testCases := []struct{ name string + serverCountSource string + leaseCounter ServerCounter responseCount int - leaseCount int want int } { { name: "higher from response", + serverCountSource: "max", responseCount: 42, - leaseCount: 24, + leaseCounter: &FakeServerCounter{24}, want: 42, }, { name: "higher from leases", + serverCountSource: "max", responseCount: 3, - leaseCount: 6, + leaseCounter: &FakeServerCounter{6}, want: 6, }, { name: "both zero", + serverCountSource: "max", responseCount: 0, - leaseCount: 0, + leaseCounter: &FakeServerCounter{0}, want: 1, }, + + { + name: "response picked by default when no lease counter", + serverCountSource: "default", + responseCount: 3, + leaseCounter: nil, + want: 3, + }, + { + name: "lease counter always picked when present", + serverCountSource: "default", + responseCount: 6, + leaseCounter: &FakeServerCounter{3}, + want: 3, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - lc := &FakeServerCounter{ - count: tc.leaseCount, - } cs := &ClientSet{ clients: make(map[string]*Client), - leaseCounter: lc, + leaseCounter: tc.leaseCounter, + serverCountSource: tc.serverCountSource, + } cs.lastReceivedServerCount = tc.responseCount if got := cs.ServerCount(); got != tc.want {