From e68f77bad89ca47e79366f6c651fef28905aa2cb Mon Sep 17 00:00:00 2001 From: Antoni Zawodny Date: Mon, 16 Sep 2024 16:43:16 +0200 Subject: [PATCH] Use protobuf encoding for core K8s APIs in apiserver-network-proxy --- cmd/agent/app/options/options.go | 6 ++++++ cmd/agent/app/options/options_test.go | 1 + cmd/agent/app/server.go | 1 + cmd/server/app/options/options.go | 6 ++++++ cmd/server/app/options/options_test.go | 1 + cmd/server/app/server.go | 1 + 6 files changed, 16 insertions(+) diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index c6fd21360..f164d15cd 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -86,6 +87,8 @@ type GrpcProxyAgentOptions struct { CountServerLeases bool // Path to kubeconfig (used by kubernetes client for lease listing) KubeconfigPath string + // Content type of requests sent to apiserver. + APIContentType string } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -130,6 +133,7 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.") flags.BoolVar(&o.CountServerLeases, "count-server-leases", o.CountServerLeases, "Enables lease counting system to determine the number of proxy servers to connect to.") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "Path to the kubeconfig file") + flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") return flags } @@ -156,6 +160,7 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit) klog.V(1).Infof("SyncForever set to %v.\n", o.SyncForever) klog.V(1).Infof("ChannelSize set to %d.\n", o.XfrChannelSize) + klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) } func (o *GrpcProxyAgentOptions) Validate() error { @@ -259,6 +264,7 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { XfrChannelSize: 150, CountServerLeases: false, KubeconfigPath: "", + APIContentType: runtime.ContentTypeProtobuf, } return &o } diff --git a/cmd/agent/app/options/options_test.go b/cmd/agent/app/options/options_test.go index e302f4b58..7ce38b2da 100644 --- a/cmd/agent/app/options/options_test.go +++ b/cmd/agent/app/options/options_test.go @@ -51,6 +51,7 @@ func TestDefaultServerOptions(t *testing.T) { assertDefaultValue(t, "WarnOnChannelLimit", defaultAgentOptions.WarnOnChannelLimit, false) assertDefaultValue(t, "SyncForever", defaultAgentOptions.SyncForever, false) assertDefaultValue(t, "XfrChannelSize", defaultAgentOptions.XfrChannelSize, 150) + assertDefaultValue(t, "APIContentType", defaultAgentOptions.APIContentType, "application/vnd.kubernetes.protobuf") } func assertDefaultValue(t *testing.T, fieldName string, actual, expected interface{}) { diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index ce0c6d355..0bd0ad321 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -157,6 +157,7 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st return nil, fmt.Errorf("failed to load in cluster kubernetes client config: %w", err) } } + config.ContentType = o.APIContentType k8sClient, err := kubernetes.NewForConfig(config) if err != nil { diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index 1b0a7066d..de0b89902 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/server" @@ -86,6 +87,8 @@ type ProxyRunOptions struct { KubeconfigQPS float32 // Client maximum burst for throttle. KubeconfigBurst int + // Content type of requests sent to apiserver. + APIContentType string // Proxy strategies used by the server. // NOTE the order of the strategies matters. e.g., for list @@ -137,6 +140,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") flags.Float32Var(&o.KubeconfigQPS, "kubeconfig-qps", o.KubeconfigQPS, "Maximum client QPS (proxy server uses this client to authenticate agent tokens).") flags.IntVar(&o.KubeconfigBurst, "kubeconfig-burst", o.KubeconfigBurst, "Maximum client burst (proxy server uses this client to authenticate agent tokens).") + flags.StringVar(&o.APIContentType, "kube-api-content-type", o.APIContentType, "Content type of requests sent to apiserver.") flags.StringVar(&o.AuthenticationAudience, "authentication-audience", o.AuthenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).") flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick an agent/tunnel, available strategies are: default, destHost, defaultRoute.") flags.StringSliceVar(&o.CipherSuites, "cipher-suites", o.CipherSuites, "The comma separated list of allowed cipher suites. Has no effect on TLS1.3. Empty means allow default list.") @@ -178,6 +182,7 @@ func (o *ProxyRunOptions) Print() { klog.V(1).Infof("KubeconfigPath set to %q.\n", o.KubeconfigPath) klog.V(1).Infof("KubeconfigQPS set to %f.\n", o.KubeconfigQPS) klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst) + klog.V(1).Infof("APIContentType set to %v.\n", o.APIContentType) klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies) klog.V(1).Infof("CipherSuites set to %q.\n", o.CipherSuites) klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize) @@ -350,6 +355,7 @@ func NewProxyRunOptions() *ProxyRunOptions { KubeconfigPath: "", KubeconfigQPS: 0, KubeconfigBurst: 0, + APIContentType: runtime.ContentTypeProtobuf, AuthenticationAudience: "", ProxyStrategies: "default", CipherSuites: make([]string, 0), diff --git a/cmd/server/app/options/options_test.go b/cmd/server/app/options/options_test.go index 2f3ba1766..8e85b51d6 100644 --- a/cmd/server/app/options/options_test.go +++ b/cmd/server/app/options/options_test.go @@ -62,6 +62,7 @@ func TestDefaultServerOptions(t *testing.T) { assertDefaultValue(t, "ProxyStrategies", defaultServerOptions.ProxyStrategies, "default") assertDefaultValue(t, "CipherSuites", defaultServerOptions.CipherSuites, make([]string, 0)) assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10) + assertDefaultValue(t, "APIContentType", defaultServerOptions.APIContentType, "application/vnd.kubernetes.protobuf") } diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index 2e9129f20..4ab967586 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -120,6 +120,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { klog.V(1).Infof("Setting k8s client Burst: %v", o.KubeconfigBurst) config.Burst = o.KubeconfigBurst } + config.ContentType = o.APIContentType k8sClient, err = kubernetes.NewForConfig(config) if err != nil { return fmt.Errorf("failed to create kubernetes clientset: %v", err)