Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#675 from konryd/leaserollout
Browse files Browse the repository at this point in the history
KNP agent should pick the larger of the two server counts
  • Loading branch information
k8s-ci-robot authored Nov 15, 2024
2 parents 23b100b + 1b41692 commit 9808173
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 7 deletions.
11 changes: 11 additions & 0 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type GrpcProxyAgentOptions struct {
LeaseNamespace string
// Labels on which lease objects are managed.
LeaseLabel string
// ServerCountSource describes how server counts should be combined.
ServerCountSource string
// Path to kubeconfig (used by kubernetes client for lease listing)
KubeconfigPath string
// Content type of requests sent to apiserver.
Expand All @@ -108,6 +110,7 @@ func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption)
WarnOnChannelLimit: o.WarnOnChannelLimit,
SyncForever: o.SyncForever,
XfrChannelSize: o.XfrChannelSize,
ServerCountSource: o.ServerCountSource,
}
}

Expand Down Expand Up @@ -138,6 +141,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet {
flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.")
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "Namespace where lease objects are managed.")
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
flags.StringVar(&o.ServerCountSource, "server-count-source", o.ServerCountSource, "Defines how the server counts from lease and from server responses are combined. Possible values: 'default' to use only one source (server or leases depending on other flags), 'max' to take the larger value.")
flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file")
flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.")
return flags
Expand Down Expand Up @@ -168,6 +172,7 @@ func (o *GrpcProxyAgentOptions) Print() {
klog.V(1).Infof("CountServerLeases set to %v.\n", o.CountServerLeases)
klog.V(1).Infof("LeaseNamespace set to %s.\n", o.LeaseNamespace)
klog.V(1).Infof("LeaseLabel set to %s.\n", o.LeaseLabel)
klog.V(1).Infof("ServerCountSource set to %s.\n", o.ServerCountSource)
klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize)
klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType)
}
Expand Down Expand Up @@ -232,6 +237,11 @@ func (o *GrpcProxyAgentOptions) Validate() error {
return err
}
}
if o.ServerCountSource != "" {
if o.ServerCountSource != "default" && o.ServerCountSource != "max" {
return fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got %v", o.ServerCountSource)
}
}

return nil
}
Expand Down Expand Up @@ -281,6 +291,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions {
CountServerLeases: false,
LeaseNamespace: "kube-system",
LeaseLabel: "k8s-app=konnectivity-server",
ServerCountSource: "default",
KubeconfigPath: "",
APIContentType: runtime.ContentTypeProtobuf,
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/agent/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func TestValidate(t *testing.T) {
fieldMap: map[string]interface{}{"XfrChannelSize": -10},
expected: fmt.Errorf("channel size -10 must be greater than 0"),
},
"ServerCountSource": {
fieldMap: map[string]interface{}{"ServerCountSource": "foobar"},
expected: fmt.Errorf("--server-count-source must be one of '', 'default', 'max', got foobar"),
},
} {
t.Run(desc, func(t *testing.T) {
testAgentOptions := NewGrpcProxyAgentOptions()
Expand Down
49 changes: 42 additions & 7 deletions pkg/agent/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics"
)

const (
fromResponses = "KNP server response headers"
fromLeases = "KNP lease count"
fromFallback = "fallback to 1"
)

// ClientSet consists of clients connected to each instance of an HA proxy server.
type ClientSet struct {
mu sync.Mutex //protects the clients.
Expand All @@ -39,7 +45,7 @@ type ClientSet struct {
agentID string // ID of this agent
address string // proxy server address. Assuming HA proxy server

leaseCounter *ServerLeaseCounter // counts number of proxy server leases
leaseCounter ServerCounter // counts number of proxy server leases
lastReceivedServerCount int // last server count received from a proxy server
lastServerCount int // last server count value from either lease system or proxy server, former takes priority

Expand Down Expand Up @@ -68,6 +74,7 @@ type ClientSet struct {
xfrChannelSize int

syncForever bool // Continue syncing (support dynamic server count).
serverCountSource string
}

func (cs *ClientSet) ClientsCount() int {
Expand Down Expand Up @@ -147,7 +154,8 @@ type ClientSetConfig struct {
WarnOnChannelLimit bool
SyncForever bool
XfrChannelSize int
ServerLeaseCounter *ServerLeaseCounter
ServerLeaseCounter ServerCounter
ServerCountSource string
}

func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet {
Expand All @@ -167,6 +175,7 @@ func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *C
xfrChannelSize: cc.XfrChannelSize,
stopCh: stopCh,
leaseCounter: cc.ServerLeaseCounter,
serverCountSource: cc.ServerCountSource,
}
}

Expand Down Expand Up @@ -218,15 +227,41 @@ func (cs *ClientSet) sync() {
}

func (cs *ClientSet) ServerCount() int {

var serverCount int
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
} else {
serverCount = cs.lastReceivedServerCount
var countSourceLabel string

switch cs.serverCountSource {
case "", "default":
if cs.leaseCounter != nil {
serverCount = cs.leaseCounter.Count()
countSourceLabel = fromLeases
} else {
serverCount = cs.lastReceivedServerCount
countSourceLabel = fromResponses
}
case "max":
countFromLeases := 0
if cs.leaseCounter != nil {
countFromLeases = cs.leaseCounter.Count()
}
countFromResponses := cs.lastReceivedServerCount

serverCount = countFromLeases
countSourceLabel = fromLeases
if countFromResponses > serverCount {
serverCount = countFromResponses
countSourceLabel = fromResponses
}
if serverCount == 0 {
serverCount = 1
countSourceLabel = fromFallback
}

}

if serverCount != cs.lastServerCount {
klog.Warningf("change detected in proxy server count (was: %d, now: %d)", cs.lastServerCount, serverCount)
klog.Warningf("change detected in proxy server count (was: %d, now: %d, source: %q)", cs.lastServerCount, serverCount, countSourceLabel)
cs.lastServerCount = serverCount
}

Expand Down
93 changes: 93 additions & 0 deletions pkg/agent/clientset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package agent

import (
"testing"
)

type FakeServerCounter struct {
count int
}

func (f *FakeServerCounter) Count() int {
return f.count
}

func TestServerCount(t *testing.T) {
testCases := []struct{
name string
serverCountSource string
leaseCounter ServerCounter
responseCount int
want int
} {
{
name: "higher from response",
serverCountSource: "max",
responseCount: 42,
leaseCounter: &FakeServerCounter{24},
want: 42,
},
{
name: "higher from leases",
serverCountSource: "max",
responseCount: 3,
leaseCounter: &FakeServerCounter{6},
want: 6,
},
{
name: "both zero",
serverCountSource: "max",
responseCount: 0,
leaseCounter: &FakeServerCounter{0},
want: 1,
},

{
name: "response picked by default when no lease counter",
serverCountSource: "default",
responseCount: 3,
leaseCounter: nil,
want: 3,
},
{
name: "lease counter always picked when present",
serverCountSource: "default",
responseCount: 6,
leaseCounter: &FakeServerCounter{3},
want: 3,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

cs := &ClientSet{
clients: make(map[string]*Client),
leaseCounter: tc.leaseCounter,
serverCountSource: tc.serverCountSource,

}
cs.lastReceivedServerCount = tc.responseCount
if got := cs.ServerCount(); got != tc.want {
t.Errorf("cs.ServerCount() = %v, want: %v", got, tc.want)
}
})
}

}
4 changes: 4 additions & 0 deletions pkg/agent/lease_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ import (
coordinationv1lister "k8s.io/client-go/listers/coordination/v1"
)

type ServerCounter interface {
Count() int
}

// A ServerLeaseCounter counts leases in the k8s apiserver to determine the
// current proxy server count.
type ServerLeaseCounter struct {
Expand Down

0 comments on commit 9808173

Please sign in to comment.