diff --git a/cmd/kops/BUILD.bazel b/cmd/kops/BUILD.bazel index ec9e2a5d09a63..5e2e15c8b37ce 100644 --- a/cmd/kops/BUILD.bazel +++ b/cmd/kops/BUILD.bazel @@ -74,6 +74,7 @@ go_library( "//pkg/featureflag:go_default_library", "//pkg/formatter:go_default_library", "//pkg/instancegroups:go_default_library", + "//pkg/k8sclient:go_default_library", "//pkg/kopscodecs:go_default_library", "//pkg/kubeconfig:go_default_library", "//pkg/model/components:go_default_library", @@ -113,7 +114,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/util/homedir:go_default_library", diff --git a/cmd/kops/rollingupdatecluster.go b/cmd/kops/rollingupdatecluster.go index 67ecc6132518c..245c3994141c3 100644 --- a/cmd/kops/rollingupdatecluster.go +++ b/cmd/kops/rollingupdatecluster.go @@ -29,12 +29,12 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/instancegroups" + "k8s.io/kops/pkg/k8sclient" "k8s.io/kops/pkg/pretty" "k8s.io/kops/pkg/validation" "k8s.io/kops/upup/pkg/fi/cloudup" @@ -244,14 +244,14 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer } var nodes []v1.Node - var k8sClient kubernetes.Interface + var k8sClient k8sclient.Interface if !options.CloudOnly { - k8sClient, err = kubernetes.NewForConfig(config) + k8sClient, err = k8sclient.NewForConfig(config) if err != nil { return fmt.Errorf("cannot build kube client for %q: %v", contextName, err) } - nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodeList, err := k8sClient.ListNodes(ctx) if err != nil { fmt.Fprintf(os.Stderr, "Unable to reach the kubernetes API.\n") fmt.Fprintf(os.Stderr, "Use --cloudonly to do a rolling-update without confirming progress with the k8s API\n\n") diff --git a/cmd/kops/validate_cluster.go b/cmd/kops/validate_cluster.go index 5dc70ebad95ab..749c1bacd1c4e 100644 --- a/cmd/kops/validate_cluster.go +++ b/cmd/kops/validate_cluster.go @@ -31,11 +31,11 @@ import ( "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/k8sclient" "k8s.io/kops/pkg/validation" "k8s.io/kops/util/pkg/tables" ) @@ -136,7 +136,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, cmd *cobra.Command return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) } - k8sClient, err := kubernetes.NewForConfig(config) + k8sClient, err := k8sclient.NewForConfig(config) if err != nil { return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err) } diff --git a/hack/.packages b/hack/.packages index f0f1ebc012413..a5a593ceae558 100644 --- a/hack/.packages +++ b/hack/.packages @@ -88,6 +88,8 @@ k8s.io/kops/pkg/flagbuilder k8s.io/kops/pkg/formatter k8s.io/kops/pkg/instancegroups k8s.io/kops/pkg/jsonutils +k8s.io/kops/pkg/k8sclient +k8s.io/kops/pkg/k8sclient/fakek8sclient k8s.io/kops/pkg/k8scodecs k8s.io/kops/pkg/k8sversion k8s.io/kops/pkg/kopscodecs diff --git a/pkg/instancegroups/BUILD.bazel b/pkg/instancegroups/BUILD.bazel index d14c8991cae8e..55f3b8b6c0ce2 100644 --- a/pkg/instancegroups/BUILD.bazel +++ b/pkg/instancegroups/BUILD.bazel @@ -15,16 +15,13 @@ go_library( "//pkg/client/simple:go_default_library", "//pkg/cloudinstances:go_default_library", "//pkg/featureflag:go_default_library", + "//pkg/k8sclient:go_default_library", "//pkg/validation:go_default_library", "//upup/pkg/fi:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/kubectl/pkg/drain:go_default_library", ], @@ -41,6 +38,8 @@ go_test( "//cloudmock/aws/mockautoscaling:go_default_library", "//pkg/apis/kops:go_default_library", "//pkg/cloudinstances:go_default_library", + "//pkg/k8sclient:go_default_library", + "//pkg/k8sclient/fakek8sclient:go_default_library", "//pkg/validation:go_default_library", "//upup/pkg/fi/cloudup/awsup:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", @@ -52,7 +51,6 @@ go_test( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", - "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", ], diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index a4c94ead80f5d..9e8a71bf93d83 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -26,13 +26,10 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/klog" api "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/k8sclient" "k8s.io/kubectl/pkg/drain" ) @@ -281,45 +278,24 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cl } klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name) for _, n := range toTaint { - if err := c.patchTaint(ctx, n); err != nil { - if c.FailOnDrainError { + taint := corev1.Taint{ + Key: rollingUpdateTaintKey, + Effect: corev1.TaintEffectPreferNoSchedule, + } + if err := k8sclient.TaintNode(ctx, c.K8sClient, n, taint); err != nil { + if apierrors.IsNotFound(err) { + klog.V(2).Infof("ignoring not-found error tainting node %q: %v", n, err) + } else if c.FailOnDrainError { return fmt.Errorf("failed to taint node %q: %v", n, err) + } else { + klog.Infof("Ignoring error tainting node %q: %v", n, err) } - klog.Infof("Ignoring error tainting node %q: %v", n, err) } } } return nil } -func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error { - oldData, err := json.Marshal(node) - if err != nil { - return err - } - - node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{ - Key: rollingUpdateTaintKey, - Effect: corev1.TaintEffectPreferNoSchedule, - }) - - newData, err := json.Marshal(node) - if err != nil { - return err - } - - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node) - if err != nil { - return err - } - - _, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) - if apierrors.IsNotFound(err) { - return nil - } - return err -} - func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error { instanceID := u.ID @@ -511,7 +487,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem } helper := &drain.Helper{ - Client: c.K8sClient, + Client: c.K8sClient.RawClient(), Force: true, GracePeriodSeconds: -1, IgnoreAllDaemonSets: true, @@ -549,8 +525,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem // deleteNode deletes a node from the k8s API. It does not delete the underlying instance. func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error { - var options metav1.DeleteOptions - err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options) + err := c.K8sClient.DeleteNode(ctx, node.Name) if err != nil { if apierrors.IsNotFound(err) { return nil diff --git a/pkg/instancegroups/rollingupdate.go b/pkg/instancegroups/rollingupdate.go index 0ce3b36c4c7b6..75bbc08313f7a 100644 --- a/pkg/instancegroups/rollingupdate.go +++ b/pkg/instancegroups/rollingupdate.go @@ -22,10 +22,10 @@ import ( "sync" "time" - "k8s.io/client-go/kubernetes" "k8s.io/klog" api "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/k8sclient" "k8s.io/kops/pkg/validation" "k8s.io/kops/upup/pkg/fi" ) @@ -46,7 +46,7 @@ type RollingUpdateCluster struct { Force bool // K8sClient is the kubernetes client, used for draining etc - K8sClient kubernetes.Interface + K8sClient k8sclient.Interface // ClusterValidator is used for validating the cluster. Unused if CloudOnly ClusterValidator validation.ClusterValidator diff --git a/pkg/instancegroups/rollingupdate_test.go b/pkg/instancegroups/rollingupdate_test.go index 53ab1637981cc..e50088bf24067 100644 --- a/pkg/instancegroups/rollingupdate_test.go +++ b/pkg/instancegroups/rollingupdate_test.go @@ -33,12 +33,13 @@ import ( v1 "k8s.io/api/core/v1" v1meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" testingclient "k8s.io/client-go/testing" "k8s.io/kops/cloudmock/aws/mockautoscaling" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/k8sclient" + "k8s.io/kops/pkg/k8sclient/fakek8sclient" "k8s.io/kops/pkg/validation" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" ) @@ -49,8 +50,7 @@ const ( ) func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluster) { - k8sClient := fake.NewSimpleClientset() - + k8sClient := &fakek8sclient.Fake{Clientset: fake.NewSimpleClientset()} mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc") mockAutoscaling := &mockautoscaling.MockAutoscaling{} mockcloud.MockAutoscaling = mockAutoscaling @@ -111,8 +111,8 @@ func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationClus return nil, errors.New("validator called unexpectedly") } -func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient kubernetes.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) { - fakeClient := k8sClient.(*fake.Clientset) +func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k8sclient.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) { + fakeClient := k8sClient.(*fakek8sclient.Fake).Clientset groups[name] = &cloudinstances.CloudInstanceGroup{ HumanName: name, @@ -161,7 +161,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k }) } -func getGroups(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup { +func getGroups(k8sClient k8sclient.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup { groups := make(map[string]*cloudinstances.CloudInstanceGroup) makeGroup(groups, k8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 0) makeGroup(groups, k8sClient, cloud, "node-2", kopsapi.InstanceGroupRoleNode, 3, 0) @@ -170,7 +170,7 @@ func getGroups(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string] return groups } -func getGroupsAllNeedUpdate(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup { +func getGroupsAllNeedUpdate(k8sClient k8sclient.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup { groups := make(map[string]*cloudinstances.CloudInstanceGroup) makeGroup(groups, k8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 3) makeGroup(groups, k8sClient, cloud, "node-2", kopsapi.InstanceGroupRoleNode, 3, 3) @@ -191,7 +191,7 @@ func TestRollingUpdateAllNeedUpdate(t *testing.T) { cordoned := "" tainted := map[string]bool{} deleted := map[string]bool{} - for _, action := range c.K8sClient.(*fake.Clientset).Actions() { + for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() { switch a := action.(type) { case testingclient.PatchAction: if string(a.GetPatch()) == cordonPatch { @@ -240,7 +240,7 @@ func TestRollingUpdateAllNeedUpdateCloudonly(t *testing.T) { err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{}) assert.NoError(t, err, "rolling update") - assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions()) + assert.Empty(t, c.K8sClient.(*fakek8sclient.Fake).Actions()) asgGroups, _ := cloud.Autoscaling().DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) for _, group := range asgGroups.AutoScalingGroups { @@ -275,7 +275,7 @@ func TestRollingUpdateNoneNeedUpdate(t *testing.T) { err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{}) assert.NoError(t, err, "rolling update") - assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions()) + assert.Empty(t, c.K8sClient.(*fakek8sclient.Fake).Actions()) assertGroupInstanceCount(t, cloud, "node-1", 3) assertGroupInstanceCount(t, cloud, "node-2", 3) @@ -718,7 +718,7 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) { cordoned := "" tainted := map[string]bool{} deleted := map[string]bool{} - for _, action := range c.K8sClient.(*fake.Clientset).Actions() { + for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() { switch a := action.(type) { case testingclient.PatchAction: if string(a.GetPatch()) == cordonPatch { @@ -765,7 +765,7 @@ func TestRollingUpdateMaxSurgeIgnoredForMaster(t *testing.T) { cordoned := "" tainted := map[string]bool{} deleted := map[string]bool{} - for _, action := range c.K8sClient.(*fake.Clientset).Actions() { + for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() { switch a := action.(type) { case testingclient.PatchAction: if string(a.GetPatch()) == cordonPatch { diff --git a/pkg/k8sclient/BUILD.bazel b/pkg/k8sclient/BUILD.bazel new file mode 100644 index 0000000000000..c44475b3c48a5 --- /dev/null +++ b/pkg/k8sclient/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["client.go"], + importpath = "k8s.io/kops/pkg/k8sclient", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) diff --git a/pkg/k8sclient/client.go b/pkg/k8sclient/client.go new file mode 100644 index 0000000000000..45a00eec08768 --- /dev/null +++ b/pkg/k8sclient/client.go @@ -0,0 +1,177 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8sclient + +import ( + "context" + "net/http" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + apimachinerynet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" + "k8s.io/klog" +) + +const maxTimeout = 15 * time.Second + +// Interface is a wrapper around kubernetes.Interface, that recovers +// better from network failures. This error handling is only +// available on the helper methods, but for compatability RawClient +// exposes the underlying kubernetes.Interface +type Interface interface { + // RawClient returns the current kubernetes.Interface; it should not be cached + // Using wrapper methods is preferrable because we can do richer retry logic and error handling. + // Deprecated: use wrapper methods instead; this is used to ease the transition. + RawClient() kubernetes.Interface + + // DeleteNode wraps CoreV1.Nodes.Delete + DeleteNode(ctx context.Context, nodeName string) error + + // ListNodes wraps CoreV1.Nodes.List + ListNodes(ctx context.Context) (*corev1.NodeList, error) +} + +var _ Interface = &client{} + +type client struct { + inner kubernetes.Interface +} + +// NewForConfig creates a client for the specified rest.Config config. +// It is a wrapper around kubernetes.NewForConfig, but ensures a short +// timeout. +func NewForConfig(config *rest.Config) (Interface, error) { + // Set a lower timeout, to work around + // https://github.com/kubernetes/client-go/issues/374 We + // trigger a timeout error and then to recover we need to + // reset any existing connections + if config.Timeout < maxTimeout { + config.Timeout = maxTimeout + } + + c, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + return &client{inner: c}, nil +} + +func (c *client) RawClient() kubernetes.Interface { + return c.inner +} + +func (c *client) ListNodes(ctx context.Context) (*corev1.NodeList, error) { + client := c.RawClient() + nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + c.handleError(err) + return nodes, err +} + +func (c *client) DeleteNode(ctx context.Context, nodeName string) error { + client := c.RawClient() + err := client.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{}) + c.handleError(err) + return err +} + +// TaintNode applies the taint to the specified node +func TaintNode(ctx context.Context, k8sClient Interface, node *corev1.Node, taint corev1.Taint) error { + oldData, err := json.Marshal(node) + if err != nil { + return err + } + + node.Spec.Taints = append(node.Spec.Taints, taint) + + newData, err := json.Marshal(node) + if err != nil { + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node) + if err != nil { + return err + } + + _, err = k8sClient.RawClient().CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + if c, ok := k8sClient.(*client); ok { + c.handleError(err) + } else { + klog.Warningf("unexpected type for client %T", c) + } + return err +} + +// handleError is called with all potential errors. +// When it observes a timeout error, it closes all open / idle connections. +func (c *client) handleError(err error) { + if err == nil { + return + } + + isTimeout := false + s := err.Error() + // TODO: move this to the chained errors framework when that's available + if strings.Contains(s, "Client.Timeout exceeded while awaiting headers") { + isTimeout = true + } + + if isTimeout { + restClientInterface := c.inner.CoreV1().RESTClient() + restClient, ok := restClientInterface.(*rest.RESTClient) + if !ok { + klog.Warningf("client timed out, but rest client was not of expected type, was %T", restClientInterface) + return + } + + httpTransport := findHTTPTransport(restClient.Client.Transport) + if httpTransport == nil { + klog.Warningf("client timed out, but http transport was not of expected type, was %T", restClient.Client.Transport) + return + } + httpTransport.CloseIdleConnections() + klog.Infof("client timed out; reset connections") + } +} + +// findHTTPTransport returns the http.Transport under a RoundTripper. +// If it cannot be determined, it returns nil. +func findHTTPTransport(transport http.RoundTripper) *http.Transport { + httpTransport, ok := transport.(*http.Transport) + if ok { + return httpTransport + } + + wrapper, ok := transport.(apimachinerynet.RoundTripperWrapper) + if ok { + wrapped := wrapper.WrappedRoundTripper() + if wrapped != nil { + return findHTTPTransport(wrapped) + } + } + + return nil +} diff --git a/pkg/k8sclient/fakek8sclient/BUILD.bazel b/pkg/k8sclient/fakek8sclient/BUILD.bazel new file mode 100644 index 0000000000000..37305ddb9af4e --- /dev/null +++ b/pkg/k8sclient/fakek8sclient/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["fake.go"], + importpath = "k8s.io/kops/pkg/k8sclient/fakek8sclient", + visibility = ["//visibility:public"], + deps = [ + "//pkg/k8sclient:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + ], +) diff --git a/pkg/k8sclient/fakek8sclient/fake.go b/pkg/k8sclient/fakek8sclient/fake.go new file mode 100644 index 0000000000000..b0762a5493343 --- /dev/null +++ b/pkg/k8sclient/fakek8sclient/fake.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fakek8sclient + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kops/pkg/k8sclient" +) + +type Fake struct { + *fake.Clientset +} + +var _ k8sclient.Interface = &Fake{} + +func (f *Fake) RawClient() kubernetes.Interface { + return f.Clientset +} + +func (f *Fake) DeleteNode(ctx context.Context, nodeName string) error { + return f.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{}) +} + +func (f *Fake) ListNodes(ctx context.Context) (*corev1.NodeList, error) { + return f.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) +} diff --git a/pkg/validation/BUILD.bazel b/pkg/validation/BUILD.bazel index 4687848b7b3dc..78812e74e6d1c 100644 --- a/pkg/validation/BUILD.bazel +++ b/pkg/validation/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/apis/kops:go_default_library", "//pkg/cloudinstances:go_default_library", "//pkg/dns:go_default_library", + "//pkg/k8sclient:go_default_library", "//upup/pkg/fi:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -30,6 +31,7 @@ go_test( deps = [ "//pkg/apis/kops:go_default_library", "//pkg/cloudinstances:go_default_library", + "//pkg/k8sclient/fakek8sclient:go_default_library", "//upup/pkg/fi:go_default_library", "//upup/pkg/fi/cloudup/awsup:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/validation/validate_cluster.go b/pkg/validation/validate_cluster.go index 8ab668ec29831..e704b82a2ccfb 100644 --- a/pkg/validation/validate_cluster.go +++ b/pkg/validation/validate_cluster.go @@ -35,6 +35,7 @@ import ( "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/dns" + "k8s.io/kops/pkg/k8sclient" ) // ValidationCluster uses a cluster to validate. @@ -60,7 +61,7 @@ type clusterValidatorImpl struct { cluster *kops.Cluster cloud fi.Cloud instanceGroups []*kops.InstanceGroup - k8sClient kubernetes.Interface + k8sClient k8sclient.Interface } func (v *ValidationCluster) addError(failure *ValidationError) { @@ -104,7 +105,7 @@ func hasPlaceHolderIP(clusterName string) (bool, error) { return false, nil } -func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, k8sClient kubernetes.Interface) (ClusterValidator, error) { +func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, k8sClient k8sclient.Interface) (ClusterValidator, error) { var instanceGroups []*kops.InstanceGroup for i := range instanceGroupList.Items { @@ -156,7 +157,7 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) { } } - nodeList, err := v.k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodeList, err := v.k8sClient.ListNodes(ctx) if err != nil { return nil, fmt.Errorf("error listing nodes: %v", err) } @@ -168,11 +169,11 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) { } readyNodes := validation.validateNodes(cloudGroups, v.instanceGroups) - if err := validation.collectComponentFailures(ctx, v.k8sClient); err != nil { + if err := validation.collectComponentFailures(ctx, v.k8sClient.RawClient()); err != nil { return nil, fmt.Errorf("cannot get component status for %q: %v", clusterName, err) } - if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes); err != nil { + if err := validation.collectPodFailures(ctx, v.k8sClient.RawClient(), readyNodes); err != nil { return nil, fmt.Errorf("cannot get pod health for %q: %v", clusterName, err) } diff --git a/pkg/validation/validate_cluster_test.go b/pkg/validation/validate_cluster_test.go index 895f27339a4f2..5d88c2a1fd686 100644 --- a/pkg/validation/validate_cluster_test.go +++ b/pkg/validation/validate_cluster_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/fake" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/k8sclient/fakek8sclient" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" ) @@ -121,7 +122,7 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG mockcloud := BuildMockCloud(t, groups, cluster, instanceGroups) - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, fake.NewSimpleClientset(objects...)) + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, &fakek8sclient.Fake{Clientset: fake.NewSimpleClientset(objects...)}) if err != nil { return nil, err } @@ -145,7 +146,7 @@ func Test_ValidateCloudGroupMissing(t *testing.T) { mockcloud := BuildMockCloud(t, nil, cluster, instanceGroups) - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, fake.NewSimpleClientset()) + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, &fakek8sclient.Fake{Clientset: fake.NewSimpleClientset()}) require.NoError(t, err) v, err := validator.Validate() require.NoError(t, err)