Skip to content

Commit

Permalink
Use a custom kubernetes client, to handle network failures
Browse files Browse the repository at this point in the history
By setting a low HTTP timeout, and reseting connections on error, we
should be able to work around the http2 connection issues:
kubernetes/client-go#374

I observed this bug specifically when rolling an HA control plane
without a load balancer.
  • Loading branch information
justinsb committed Apr 28, 2020
1 parent f00e538 commit 744b555
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cmd/kops/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"//pkg/featureflag:go_default_library",
"//pkg/formatter:go_default_library",
"//pkg/instancegroups:go_default_library",
"//pkg/k8sclient:go_default_library",
"//pkg/k8sversion:go_default_library",
"//pkg/kopscodecs:go_default_library",
"//pkg/kubeconfig:go_default_library",
Expand Down Expand Up @@ -115,7 +116,6 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/util/homedir:go_default_library",
Expand Down
8 changes: 4 additions & 4 deletions cmd/kops/rollingupdatecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/instancegroups"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/pretty"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi/cloudup"
Expand Down Expand Up @@ -244,14 +244,14 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
}

var nodes []v1.Node
var k8sClient kubernetes.Interface
var k8sClient k8sclient.Interface
if !options.CloudOnly {
k8sClient, err = kubernetes.NewForConfig(config)
k8sClient, err = k8sclient.NewForConfig(config)
if err != nil {
return fmt.Errorf("cannot build kube client for %q: %v", contextName, err)
}

nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
nodeList, err := k8sClient.ListNodes(ctx)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to reach the kubernetes API.\n")
fmt.Fprintf(os.Stderr, "Use --cloudonly to do a rolling-update without confirming progress with the k8s API\n\n")
Expand Down
4 changes: 2 additions & 2 deletions cmd/kops/validate_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/util/pkg/tables"
)
Expand Down Expand Up @@ -136,7 +136,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, cmd *cobra.Command
return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
}

k8sClient, err := kubernetes.NewForConfig(config)
k8sClient, err := k8sclient.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/instancegroups/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ go_library(
"//pkg/client/simple:go_default_library",
"//pkg/cloudinstances:go_default_library",
"//pkg/featureflag:go_default_library",
"//pkg/k8sclient:go_default_library",
"//pkg/validation:go_default_library",
"//upup/pkg/fi:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/kubectl/pkg/drain:go_default_library",
],
Expand Down
50 changes: 12 additions & 38 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
Expand Down Expand Up @@ -289,45 +285,24 @@ func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cl
}
klog.Infof("Tainting %d %s in %q instancegroup.", len(toTaint), noun, group.InstanceGroup.Name)
for _, n := range toTaint {
if err := c.patchTaint(ctx, n); err != nil {
if c.FailOnDrainError {
taint := corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
}
if err := c.K8sClient.TaintNode(ctx, n, taint); err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).Infof("ignoring not-found error tainting node %q: %v", n, err)
} else if c.FailOnDrainError {
return fmt.Errorf("failed to taint node %q: %v", n, err)
} else {
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
klog.Infof("Ignoring error tainting node %q: %v", n, err)
}
}
}
return nil
}

func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}

node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
Key: rollingUpdateTaintKey,
Effect: corev1.TaintEffectPreferNoSchedule,
})

newData, err := json.Marshal(node)
if err != nil {
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}

_, err = c.K8sClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if apierrors.IsNotFound(err) {
return nil
}
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstanceGroupMember, isBastion bool, sleepAfterTerminate time.Duration) error {
instanceId := u.ID

Expand Down Expand Up @@ -545,7 +520,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem
}

helper := &drain.Helper{
Client: c.K8sClient,
Client: c.K8sClient.RawClient(),
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
Expand Down Expand Up @@ -583,8 +558,7 @@ func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMem

// deleteNode deletes a node from the k8s API. It does not delete the underlying instance.
func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node) error {
var options metav1.DeleteOptions
err := c.K8sClient.CoreV1().Nodes().Delete(ctx, node.Name, options)
err := c.K8sClient.DeleteNode(ctx, node.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"sync"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/klog"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/k8sclient"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi"
)
Expand All @@ -46,7 +46,7 @@ type RollingUpdateCluster struct {
Force bool

// K8sClient is the kubernetes client, used for draining etc
K8sClient kubernetes.Interface
K8sClient k8sclient.Interface

// ClusterValidator is used for validating the cluster. Unused if CloudOnly
ClusterValidator validation.ClusterValidator
Expand Down
20 changes: 20 additions & 0 deletions pkg/k8sclient/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["client.go"],
importpath = "k8s.io/kops/pkg/k8sclient",
visibility = ["//visibility:public"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
176 changes: 176 additions & 0 deletions pkg/k8sclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package k8sclient

import (
"context"
"fmt"
"net/http"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
apimachinerynet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog"
)

const maxTimeout = 15 * time.Second

// Interface is a wrapper around kubernetes.Interface, that recovers
// better from network failures. This error handling is only
// available on the helper methods, but for compatability RawClient
// exposes the underlying kubernetes.Interface
type Interface interface {
// RawClient returns the current kubernetes.Interface; it should not be cached
// Using wrapper methods is preferrable because we can do richer retry logic and error handling.
RawClient() kubernetes.Interface

// DeleteNode wraps CoreV1.Nodes.Delete
DeleteNode(ctx context.Context, nodeName string) error

// ListNodes wraps CoreV1.Nodes.List
ListNodes(ctx context.Context) (*corev1.NodeList, error)

// TaintNode adds a taint to the specified Node
TaintNode(ctx context.Context, node *corev1.Node, taint corev1.Taint) error
}

var _ Interface = &client{}

type client struct {
inner kubernetes.Interface
}

// NewForConfig creates a client for the specified rest.Config config.
// It is a wrapper around kubernetes.NewForConfig, but ensures a short
// timeout.
func NewForConfig(config *rest.Config) (Interface, error) {
// Set a lower timeout, to work around
// https://github.com/kubernetes/client-go/issues/374 We
// trigger a timeout error and then to recover we need to
// reset any existing connections
if config.Timeout < maxTimeout {
config.Timeout = maxTimeout
}

c, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes client: %v", err)
}

return &client{inner: c}, nil
}

func (c *client) RawClient() kubernetes.Interface {
return c.inner
}

func (c *client) ListNodes(ctx context.Context) (*corev1.NodeList, error) {
client := c.RawClient()
nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
c.handleError(err)
return nodes, err
}

func (c *client) DeleteNode(ctx context.Context, nodeName string) error {
client := c.RawClient()
err := client.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{})
c.handleError(err)
return err
}

func (c *client) TaintNode(ctx context.Context, node *corev1.Node, taint corev1.Taint) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}

node.Spec.Taints = append(node.Spec.Taints, taint)

newData, err := json.Marshal(node)
if err != nil {
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return err
}

client := c.RawClient()
_, err = client.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
c.handleError(err)
return err
}

// handleError is called with all potential errors.
// When it observes a timeout error, it closes all open / idle connections.
func (c *client) handleError(err error) {
if err == nil {
return
}

isTimeout := false
s := err.Error()
// TODO: move this to the chained errors framework when that's available
if strings.Contains(s, "Client.Timeout exceeded while awaiting headers") {
isTimeout = true
}

if isTimeout {
restClientInterface := c.inner.CoreV1().RESTClient()
restClient, ok := restClientInterface.(*rest.RESTClient)
if !ok {
klog.Warningf("client timed out, but rest client was not of expected type, was %T", restClientInterface)
return
}

httpTransport := findHTTPTransport(restClient.Client.Transport)
if httpTransport == nil {
klog.Warningf("client timed out, but http transport was not of expected type, was %T", restClient.Client.Transport)
return
}
httpTransport.CloseIdleConnections()
klog.Infof("client timed out; reset connections")
}
}

// findHTTPTransport returns the http.Transport under a RoundTripper.
// If it cannot be determined, it returns nil.
func findHTTPTransport(transport http.RoundTripper) *http.Transport {
httpTransport, ok := transport.(*http.Transport)
if ok {
return httpTransport
}

wrapper, ok := transport.(apimachinerynet.RoundTripperWrapper)
if ok {
wrapped := wrapper.WrappedRoundTripper()
if wrapped != nil {
return findHTTPTransport(wrapped)
}
}

return nil
}
Loading

0 comments on commit 744b555

Please sign in to comment.