Skip to content

Commit

Permalink
Use a custom kubernetes client, to handle network failures
Browse files Browse the repository at this point in the history
By setting a low HTTP timeout, and reseting connections on error, we
should be able to work around the http2 connection issues:
kubernetes/client-go#374

I observed this bug specifically when rolling an HA control plane
without a load balancer.
  • Loading branch information
justinsb committed Jun 9, 2020
1 parent 7a5dd52 commit bc685c0
Show file tree
Hide file tree
Showing 15 changed files with 307 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/kops/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ go_library(
"//pkg/featureflag:go_default_library",
"//pkg/formatter:go_default_library",
"//pkg/instancegroups:go_default_library",
"//pkg/k8sclient:go_default_library",
"//pkg/kopscodecs:go_default_library",
"//pkg/kubeconfig:go_default_library",
"//pkg/model/components:go_default_library",
Expand Down Expand Up @@ -113,7 +114,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/util/homedir:go_default_library",
Expand Down
8 changes: 4 additions & 4 deletions cmd/kops/rollingupdatecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/instancegroups"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/pretty"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi/cloudup"
Expand Down Expand Up @@ -244,14 +244,14 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}

var nodes []v1.Node
var k8sClient kubernetes.Interface
var k8sClient k8sclient.Interface
if !options.CloudOnly {
k8sClient, err = kubernetes.NewForConfig(config)
k8sClient, err = k8sclient.NewForConfig(config)
if err != nil {
return fmt.Errorf("cannot build kube client for %q: %v", contextName, err)
}

nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
nodeList, err := k8sClient.ListNodes(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to reach the kubernetes API.\n")
fmt.Fprintf(os.Stderr, "Use --cloudonly to do a rolling-update without confirming progress with the k8s API\n\n")
Expand Down
4 changes: 2 additions & 2 deletions cmd/kops/validate_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/util/pkg/tables"
)
Expand Down Expand Up @@ -136,7 +136,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, cmd *cobra.Command
return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
}

k8sClient, err := kubernetes.NewForConfig(config)
k8sClient, err := k8sclient.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err)
}
Expand Down
2 changes: 2 additions & 0 deletions hack/.packages
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ k8s.io/kops/pkg/flagbuilder
k8s.io/kops/pkg/formatter
k8s.io/kops/pkg/instancegroups
k8s.io/kops/pkg/jsonutils
k8s.io/kops/pkg/k8sclient
k8s.io/kops/pkg/k8sclient/fakek8sclient
k8s.io/kops/pkg/k8scodecs
k8s.io/kops/pkg/k8sversion
k8s.io/kops/pkg/kopscodecs
Expand Down
8 changes: 3 additions & 5 deletions pkg/instancegroups/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ go_library(
"//pkg/client/simple:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/featureflag:go_default_library",
"//pkg/k8sclient:go_default_library",
"//pkg/validation:go_default_library",
"//upup/pkg/fi:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/kubectl/pkg/drain:go_default_library",
],
Expand All @@ -41,6 +38,8 @@ go_test(
"//cloudmock/aws/mockautoscaling:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/k8sclient:go_default_library",
"//pkg/k8sclient/fakek8sclient:go_default_library",
"//pkg/validation:go_default_library",
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
Expand All @@ -52,7 +51,6 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
Expand Down
51 changes: 13 additions & 38 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kubectl/pkg/drain"
)

Expand Down Expand Up @@ -281,45 +278,24 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cl
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(ctx, n); err != nil {
if c.FailOnDrainError {
taint := corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
}
if err := k8sclient.TaintNode(ctx, c.K8sClient, n, taint); err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).Infof("ignoring not-found error tainting node %q: %v", n, err)
} else if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
} else {
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
}
}
return nil
}

func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}

node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
})

newData, err := json.Marshal(node)
if err != nil {
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error {
instanceID := u.ID

Expand Down Expand Up @@ -511,7 +487,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem
}

helper := &drain.Helper{
Client: c.K8sClient,
Client: c.K8sClient.RawClient(),
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Expand Down Expand Up @@ -549,8 +525,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem

// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
err := c.K8sClient.DeleteNode(ctx, node.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"sync"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/klog"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi"
)
Expand All @@ -46,7 +46,7 @@ type RollingUpdateCluster struct {
Force bool

// K8sClient is the kubernetes client, used for draining etc
K8sClient kubernetes.Interface
K8sClient k8sclient.Interface

// ClusterValidator is used for validating the cluster. Unused if CloudOnly
ClusterValidator validation.ClusterValidator
Expand Down
24 changes: 12 additions & 12 deletions pkg/instancegroups/rollingupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import (
v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
testingclient "k8s.io/client-go/testing"
"k8s.io/kops/cloudmock/aws/mockautoscaling"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/k8sclient/fakek8sclient"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
)
Expand All @@ -49,8 +50,7 @@ const (
)

func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud, *kopsapi.Cluster) {
k8sClient := fake.NewSimpleClientset()

k8sClient := &fakek8sclient.Fake{Clientset: fake.NewSimpleClientset()}
mockcloud := awsup.BuildMockAWSCloud("us-east-1", "abc")
mockAutoscaling := &mockautoscaling.MockAutoscaling{}
mockcloud.MockAutoscaling = mockAutoscaling
Expand Down Expand Up @@ -111,8 +111,8 @@ func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationClus
return nil, errors.New("validator called unexpectedly")
}

func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient kubernetes.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) {
fakeClient := k8sClient.(*fake.Clientset)
func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k8sclient.Interface, cloud awsup.AWSCloud, name string, role kopsapi.InstanceGroupRole, count int, needUpdate int) {
fakeClient := k8sClient.(*fakek8sclient.Fake).Clientset

groups[name] = &cloudinstances.CloudInstanceGroup{
HumanName: name,
Expand Down Expand Up @@ -161,7 +161,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
})
}

func getGroups(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup {
func getGroups(k8sClient k8sclient.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, k8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 0)
makeGroup(groups, k8sClient, cloud, "node-2", kopsapi.InstanceGroupRoleNode, 3, 0)
Expand All @@ -170,7 +170,7 @@ func getGroups(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string]
return groups
}

func getGroupsAllNeedUpdate(k8sClient kubernetes.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup {
func getGroupsAllNeedUpdate(k8sClient k8sclient.Interface, cloud awsup.AWSCloud) map[string]*cloudinstances.CloudInstanceGroup {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, k8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 3, 3)
makeGroup(groups, k8sClient, cloud, "node-2", kopsapi.InstanceGroupRoleNode, 3, 3)
Expand All @@ -191,7 +191,7 @@ func TestRollingUpdateAllNeedUpdate(t *testing.T) {
cordoned := ""
tainted := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() {
switch a := action.(type) {
case testingclient.PatchAction:
if string(a.GetPatch()) == cordonPatch {
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestRollingUpdateAllNeedUpdateCloudonly(t *testing.T) {
err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions())
assert.Empty(t, c.K8sClient.(*fakek8sclient.Fake).Actions())

asgGroups, _ := cloud.Autoscaling().DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{})
for _, group := range asgGroups.AutoScalingGroups {
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestRollingUpdateNoneNeedUpdate(t *testing.T) {
err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

assert.Empty(t, c.K8sClient.(*fake.Clientset).Actions())
assert.Empty(t, c.K8sClient.(*fakek8sclient.Fake).Actions())

assertGroupInstanceCount(t, cloud, "node-1", 3)
assertGroupInstanceCount(t, cloud, "node-2", 3)
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestRollingUpdateTaintAllButOneNeedUpdate(t *testing.T) {
cordoned := ""
tainted := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() {
switch a := action.(type) {
case testingclient.PatchAction:
if string(a.GetPatch()) == cordonPatch {
Expand Down Expand Up @@ -765,7 +765,7 @@ func TestRollingUpdateMaxSurgeIgnoredForMaster(t *testing.T) {
cordoned := ""
tainted := map[string]bool{}
deleted := map[string]bool{}
for _, action := range c.K8sClient.(*fake.Clientset).Actions() {
for _, action := range c.K8sClient.(*fakek8sclient.Fake).Actions() {
switch a := action.(type) {
case testingclient.PatchAction:
if string(a.GetPatch()) == cordonPatch {
Expand Down
20 changes: 20 additions & 0 deletions pkg/k8sclient/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["client.go"],
importpath = "k8s.io/kops/pkg/k8sclient",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Loading

0 comments on commit bc685c0

Please sign in to comment.