diff --git a/Gopkg.lock b/Gopkg.lock index 724e64ff2410..d979caf4cb4e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -104,6 +104,17 @@ revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:eaa7c96baf38f6abde2f720aac540a49dfc2229b74c3c591c3f84d2ff7e84269" + name = "github.com/go-log/log" + packages = [ + ".", + "info", + ] + pruneopts = "UT" + revision = "a514cf01a3eb56ec6aff9ae5663453d990e68ed3" + [[projects]] branch = "master" digest = "1:edd2fa4578eb086265db78a9201d15e76b298dfd0d5c379da83e9c61712cf6df" @@ -364,6 +375,14 @@ revision = "65fb64232476ad9046e57c26cd0bff3d3a8dc6cd" version = "v1.4.3" +[[projects]] + branch = "master" + digest = "1:f7646c654e93258958dba300641f8f674d5a9ed015c11119793ba1156e2acbe9" + name = "github.com/openshift/kubernetes-drain" + packages = ["."] + pruneopts = "UT" + revision = "c2e51be1758efa30d71a4d30dc4e2db86b70a4df" + [[projects]] digest = "1:e5d0bd87abc2781d14e274807a470acd180f0499f8bf5bb18606e9ec22ad9de9" name = "github.com/pborman/uuid" @@ -1201,8 +1220,10 @@ input-imports = [ "github.com/davecgh/go-spew/spew", "github.com/emicklei/go-restful", + "github.com/go-log/log/info", "github.com/onsi/ginkgo", "github.com/onsi/gomega", + "github.com/openshift/kubernetes-drain", "github.com/pkg/errors", "github.com/sergi/go-diff/diffmatchpatch", "github.com/spf13/cobra", diff --git a/pkg/controller/machine/BUILD.bazel b/pkg/controller/machine/BUILD.bazel index 97904693b7f9..ee70c72aed08 100644 --- a/pkg/controller/machine/BUILD.bazel +++ b/pkg/controller/machine/BUILD.bazel @@ -13,10 +13,15 @@ go_library( "//pkg/apis/machine/v1beta1:go_default_library", "//pkg/controller/error:go_default_library", "//pkg/util:go_default_library", + "//vendor/github.com/go-log/log/info:go_default_library", + "//vendor/github.com/openshift/kubernetes-drain:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library", "//vendor/sigs.k8s.io/controller-runtime/pkg/controller:go_default_library", diff --git a/pkg/controller/machine/controller.go b/pkg/controller/machine/controller.go index 28df79fee619..c47a6f74345a 100644 --- a/pkg/controller/machine/controller.go +++ b/pkg/controller/machine/controller.go @@ -18,15 +18,22 @@ package machine import ( "context" + "fmt" "os" + "time" + "github.com/go-log/log/info" machinev1 "github.com/openshift/cluster-api/pkg/apis/machine/v1beta1" controllerError "github.com/openshift/cluster-api/pkg/controller/error" "github.com/openshift/cluster-api/pkg/util" + kubedrain "github.com/openshift/kubernetes-drain" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -38,6 +45,9 @@ import ( const ( NodeNameEnvVar = "NODE_NAME" + + // ExcludeNodeDrainingAnnotation annotation explicitly skips node draining if set + ExcludeNodeDrainingAnnotation = "machine.openshift.io/exclude-node-draining" ) var DefaultActuator Actuator @@ -49,10 +59,12 @@ func AddWithActuator(mgr manager.Manager, actuator Actuator) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager, actuator Actuator) reconcile.Reconciler { r := &ReconcileMachine{ - Client: mgr.GetClient(), - scheme: mgr.GetScheme(), - nodeName: os.Getenv(NodeNameEnvVar), - actuator: actuator, + Client: mgr.GetClient(), + eventRecorder: mgr.GetRecorder("machine-controller"), + config: mgr.GetConfig(), + scheme: mgr.GetScheme(), + nodeName: os.Getenv(NodeNameEnvVar), + actuator: actuator, } if r.nodeName == "" { @@ -80,8 +92,11 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // ReconcileMachine reconciles a Machine object type ReconcileMachine struct { client.Client + config *rest.Config scheme *runtime.Scheme + eventRecorder record.EventRecorder + actuator Actuator // nodeName is the name of the node on which the machine controller is running, if not present, it is loaded from NODE_NAME. @@ -168,6 +183,18 @@ func (r *ReconcileMachine) Reconcile(request reconcile.Request) (reconcile.Resul } klog.Infof("Reconciling machine %q triggers delete", name) + + // Drain node before deletion + // If a machine is not linked to a node, just delete the machine. Since a node + // can be unlinked from a machine when the node goes NotReady and is removed + // by cloud controller manager. In that case some machines would never get + // deleted without a manual intervention. + if _, exists := m.ObjectMeta.Annotations[ExcludeNodeDrainingAnnotation]; !exists && m.Status.NodeRef != nil { + if err := r.drainNode(m); err != nil { + return reconcile.Result{}, err + } + } + if err := r.actuator.Delete(ctx, cluster, m); err != nil { if requeueErr, ok := err.(*controllerError.RequeueAfterError); ok { klog.Infof("Actuator returned requeue-after error: %v", requeueErr) @@ -233,6 +260,41 @@ func (r *ReconcileMachine) Reconcile(request reconcile.Request) (reconcile.Resul return reconcile.Result{}, nil } +func (r *ReconcileMachine) drainNode(machine *machinev1.Machine) error { + kubeClient, err := kubernetes.NewForConfig(r.config) + if err != nil { + return fmt.Errorf("unable to build kube client: %v", err) + } + node, err := kubeClient.CoreV1().Nodes().Get(machine.Status.NodeRef.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("unable to get node %q: %v", machine.Status.NodeRef.Name, err) + } + + if err := kubedrain.Drain( + kubeClient, + []*corev1.Node{node}, + &kubedrain.DrainOptions{ + Force: true, + IgnoreDaemonsets: true, + DeleteLocalData: true, + GracePeriodSeconds: -1, + Logger: info.New(klog.V(0)), + // If a pod is not evicted in 20 second, retry the eviction next time the + // machine gets reconciled again (to allow other machines to be reconciled) + Timeout: 20 * time.Second, + }, + ); err != nil { + // Machine still tries to terminate after drain failure + klog.Warningf("drain failed for machine %q: %v", machine.Name, err) + return &controllerError.RequeueAfterError{RequeueAfter: 20 * time.Second} + } + + klog.Infof("drain successful for machine %q", machine.Name) + r.eventRecorder.Eventf(machine, corev1.EventTypeNormal, "Deleted", "Node %q drained", node.Name) + + return nil +} + func (r *ReconcileMachine) getCluster(ctx context.Context, machine *machinev1.Machine) (*machinev1.Cluster, error) { if machine.Labels[machinev1.MachineClusterLabelName] == "" { klog.Infof("Machine %q in namespace %q doesn't specify %q label, assuming nil cluster", machine.Name, machinev1.MachineClusterLabelName, machine.Namespace) diff --git a/vendor/github.com/go-log/log/BUILD.bazel b/vendor/github.com/go-log/log/BUILD.bazel new file mode 100644 index 000000000000..97983463eea1 --- /dev/null +++ b/vendor/github.com/go-log/log/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["log.go"], + importmap = "github.com/openshift/cluster-api/vendor/github.com/go-log/log", + importpath = "github.com/go-log/log", + visibility = ["//visibility:public"], +) diff --git a/vendor/github.com/go-log/log/LICENSE b/vendor/github.com/go-log/log/LICENSE new file mode 100644 index 000000000000..0522e076ec1d --- /dev/null +++ b/vendor/github.com/go-log/log/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2017 Go Log + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/go-log/log/README.md b/vendor/github.com/go-log/log/README.md new file mode 100644 index 000000000000..d36cdedd6e06 --- /dev/null +++ b/vendor/github.com/go-log/log/README.md @@ -0,0 +1,92 @@ +# Log [![GoDoc](https://godoc.org/github.com/go-log/log?status.svg)](https://godoc.org/github.com/go-log/log) + +Log is a logging interface for Go. That's it. Pass around the interface. + +## Rationale + +Users want to standardise logging. Sometimes libraries log. We leave the underlying logging implementation to the user +while allowing libraries to log by simply expecting something that satisfies the `Logger` interface. This leaves +the user free to pre-configure structure, output, etc. + +## Interface + +The interface is minimalistic on purpose: + +```go +type Logger interface { + Log(v ...interface{}) + Logf(format string, v ...interface{}) +} +``` + +For more motivation for this minimal interface, see [Dave Cheney's blog post][cheney]. + +## Implementations + +Libraries will only need [the `Logger` interface](#interface), although they may choose to use [the `nest` package][nest] to create subloggers with additional context. + +Calling code will need to create a `Logger` interface, and there are a number of implementations and wrappers available to make that easy: + +* [capture][] is an implementation that saves logged lines in memory. + It is especially useful for unit tests that want to check for logged messages. +* [fmt][] is an implementation wrapping [an `io.Writer`][io.Writer] like [`os.Stdout`][os.Stdout]. + It uses [`fmt.Sprint`][fmt.Sprint] and [`Sprintf`][fmt.Sprintf] to generate the logged lines. +* [info][] is an implementation wrapping `Info` and `Infof` calls. + It can be used to wrap implementations like [`glog.Verbose`][glog.Verbose] and [`logrus.Entry`][logrus.Entry.Info]. +* [print][] is an implementation wrapping `Print` and `Printf` calls. + It can be used to wrap implementations like [`glog.Verbose`][logrus.Entry.Print]. +* [log][] is an implementation wrapping [`log.Print`][log.Print] and [`log.Printf`][log.Printf]. + +Outside of this repository, there are additional wrappers for: + +* [appengine/log][appengine], [here][appengine-wrapper]. +* [logrus][], [here][logrus-wrapper]. + Although as mentioned above, you can also use the [info][] and [print][] wrappers for logrus. + +The `Logger` interface is also simple enough to make writing your own implementation or wrapper very straightforward. + +## Example + +Pre-configure a logger using [`WithFields`][logrus.WithFields] and pass it as an option to a library: + +```go +import ( + "github.com/go-log/log/print" + "github.com/lib/foo" + "github.com/sirupsen/logrus" +) + +logger := print.New(logrus.WithFields(logrus.Fields{ + "library": "github.com/lib/foo", +})) + +f := foo.New(logger) +``` + +## Related projects + +[github.com/go-logr/logr][logr] is a similar interface approach to logging, although [the `logr.Logger` interface][logr.Logger] is more elaborate. + +[appengine]: https://cloud.google.com/appengine/docs/standard/go/logs/ +[appengine-wrapper]: https://github.com/go-log/appengine +[capture]: https://godoc.org/github.com/go-log/log/capture +[cheney]: https://dave.cheney.net/2015/11/05/lets-talk-about-logging +[fmt]: https://godoc.org/github.com/go-log/log/fmt +[fmt.Sprint]: https://golang.org/pkg/fmt/#Sprint +[fmt.Sprintf]: https://golang.org/pkg/fmt/#Sprintf +[glog.Verbose]: https://godoc.org/github.com/golang/glog#Verbose.Info +[info]: https://godoc.org/github.com/go-log/log/info +[io.Writer]: https://golang.org/pkg/io/#Writer +[log]: https://godoc.org/github.com/go-log/log/log +[log.Print]: https://golang.org/pkg/log/#Print +[log.Printf]: https://golang.org/pkg/log/#Printf +[logr]: https://github.com/go-logr/logr +[logr.Logger]: https://godoc.org/github.com/go-logr/logr#Logger +[logrus]: https://github.com/sirupsen/logrus +[logrus-wrapper]: https://github.com/go-log/logrus +[logrus.Entry.Info]: https://godoc.org/github.com/sirupsen/logrus#Entry.Info +[logrus.Entry.Print]: https://godoc.org/github.com/sirupsen/logrus#Entry.Print +[logrus.WithFields]: https://godoc.org/github.com/sirupsen/logrus#WithFields +[nest]: https://godoc.org/github.com/go-log/log/nest +[os.Stdout]: https://golang.org/pkg/os/#Stdout +[print]: https://godoc.org/github.com/go-log/log/print diff --git a/vendor/github.com/go-log/log/info/BUILD.bazel b/vendor/github.com/go-log/log/info/BUILD.bazel new file mode 100644 index 000000000000..dfc26bc6c523 --- /dev/null +++ b/vendor/github.com/go-log/log/info/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["info.go"], + importmap = "github.com/openshift/cluster-api/vendor/github.com/go-log/log/info", + importpath = "github.com/go-log/log/info", + visibility = ["//visibility:public"], +) diff --git a/vendor/github.com/go-log/log/info/info.go b/vendor/github.com/go-log/log/info/info.go new file mode 100644 index 000000000000..f65647e10bb8 --- /dev/null +++ b/vendor/github.com/go-log/log/info/info.go @@ -0,0 +1,28 @@ +// Package info allows users to create a Logger interface from any +// object that supports Info and Infof. +package info + +// Info is an interface for Info and Infof. +type Info interface { + Info(v ...interface{}) + Infof(format string, v ...interface{}) +} + +type logger struct{ + info Info +} + +func (logger *logger) Log(v ...interface{}) { + logger.info.Info(v...) +} + +func (logger *logger) Logf(format string, v ...interface{}) { + logger.info.Infof(format, v...) +} + +// New creates a new logger wrapping info. +func New(info Info) *logger { + return &logger{ + info: info, + } +} diff --git a/vendor/github.com/go-log/log/log.go b/vendor/github.com/go-log/log/log.go new file mode 100644 index 000000000000..f0662a662866 --- /dev/null +++ b/vendor/github.com/go-log/log/log.go @@ -0,0 +1,35 @@ +// Package log provides a log interface +package log + +// Logger is a generic logging interface +type Logger interface { + // Log inserts a log entry. Arguments may be handled in the manner + // of fmt.Print, but the underlying logger may also decide to handle + // them differently. + Log(v ...interface{}) + // Logf insets a log entry. Arguments are handled in the manner of + // fmt.Printf. + Logf(format string, v ...interface{}) +} + +var ( + // The global default logger + DefaultLogger Logger = &noOpLogger{} +) + +// noOpLogger is used as a placeholder for the default logger +type noOpLogger struct{} + +func (n *noOpLogger) Log(v ...interface{}) {} + +func (n *noOpLogger) Logf(format string, v ...interface{}) {} + +// Log logs using the default logger +func Log(v ...interface{}) { + DefaultLogger.Log(v...) +} + +// Logf logs formatted using the default logger +func Logf(format string, v ...interface{}) { + DefaultLogger.Logf(format, v...) +} diff --git a/vendor/github.com/openshift/kubernetes-drain/BUILD.bazel b/vendor/github.com/openshift/kubernetes-drain/BUILD.bazel new file mode 100644 index 000000000000..0bff964a2b3e --- /dev/null +++ b/vendor/github.com/openshift/kubernetes-drain/BUILD.bazel @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["drain.go"], + importmap = "github.com/openshift/cluster-api/vendor/github.com/openshift/kubernetes-drain", + importpath = "github.com/openshift/kubernetes-drain", + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/go-log/log:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/policy/v1beta1:go_default_library", + ], +) diff --git a/vendor/github.com/openshift/kubernetes-drain/LICENSE b/vendor/github.com/openshift/kubernetes-drain/LICENSE new file mode 100644 index 000000000000..d64569567334 --- /dev/null +++ b/vendor/github.com/openshift/kubernetes-drain/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/openshift/kubernetes-drain/drain.go b/vendor/github.com/openshift/kubernetes-drain/drain.go new file mode 100644 index 000000000000..a6e21ee774f6 --- /dev/null +++ b/vendor/github.com/openshift/kubernetes-drain/drain.go @@ -0,0 +1,594 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "errors" + "fmt" + "math" + "sort" + "strings" + "time" + + golog "github.com/go-log/log" + + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + typedextensionsv1beta1 "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" + typedpolicyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" +) + +type DrainOptions struct { + // Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet. + Force bool + + // Ignore DaemonSet-managed pods. + IgnoreDaemonsets bool + + // Period of time in seconds given to each pod to terminate + // gracefully. If negative, the default value specified in the pod + // will be used. + GracePeriodSeconds int + + // The length of time to wait before giving up on deletion or + // eviction. Zero means infinite. + Timeout time.Duration + + // Continue even if there are pods using emptyDir (local data that + // will be deleted when the node is drained). + DeleteLocalData bool + + // Namespace to filter pods on the node. + Namespace string + + // Label selector to filter pods on the node. + Selector labels.Selector + + // Logger allows callers to plug in their preferred logger. + Logger golog.Logger +} + +// Takes a pod and returns a bool indicating whether or not to operate on the +// pod, an optional warning message, and an optional fatal error. +type podFilter func(corev1.Pod) (include bool, w *warning, f *fatal) +type warning struct { + string +} +type fatal struct { + string +} + +const ( + EvictionKind = "Eviction" + EvictionSubresource = "pods/eviction" + + kDaemonsetFatal = "DaemonSet-managed pods (use IgnoreDaemonsets to ignore)" + kDaemonsetWarning = "ignoring DaemonSet-managed pods" + kLocalStorageFatal = "pods with local storage (use DeleteLocalData to override)" + kLocalStorageWarning = "deleting pods with local storage" + kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use Force to override)" + kUnmanagedWarning = "deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" +) + +// GetNodes looks up the nodes (either given by name as arguments or +// by the Selector option). +func GetNodes(client typedcorev1.NodeInterface, nodes []string, selector string) (out []*corev1.Node, err error) { + if len(nodes) == 0 && len(selector) == 0 { + return nil, nil + } + + if len(selector) > 0 && len(nodes) > 0 { + return nil, errors.New("cannot specify both node names and a selector option") + } + + out = []*corev1.Node{} + + for _, node := range nodes { + node, err := client.Get(node, metav1.GetOptions{}) + if err != nil { + return nil, err + } + out = append(out, node) + } + + if len(selector) > 0 { + nodes, err := client.List(metav1.ListOptions{ + LabelSelector: selector, + }) + if err != nil { + return nil, err + } + for _, node := range nodes.Items { + out = append(out, &node) + } + } + + return out, nil +} + +// Drain nodes in preparation for maintenance. +// +// The given nodes will be marked unschedulable to prevent new pods from arriving. +// Drain evicts the pods if the APIServer supports eviction +// (http://kubernetes.io/docs/admin/disruptions/). Otherwise, it will use normal DELETE +// to delete the pods. +// Drain evicts or deletes all pods except mirror pods (which cannot be deleted through +// the API server). If there are DaemonSet-managed pods, Drain will not proceed +// without IgnoreDaemonsets, and regardless it will not delete any +// DaemonSet-managed pods, because those pods would be immediately replaced by the +// DaemonSet controller, which ignores unschedulable markings. If there are any +// pods that are neither mirror pods nor managed by ReplicationController, +// ReplicaSet, DaemonSet, StatefulSet or Job, then Drain will not delete any pods unless you +// use Force. Force will also allow deletion to proceed if the managing resource of one +// or more pods is missing. +// +// Drain waits for graceful termination. You should not operate on the machine until +// the command completes. +// +// When you are ready to put the nodes back into service, use Uncordon, which +// will make the nodes schedulable again. +// +// ![Workflow](http://kubernetes.io/images/docs/kubectl_drain.svg) +func Drain(client kubernetes.Interface, nodes []*corev1.Node, options *DrainOptions) (err error) { + nodeInterface := client.CoreV1().Nodes() + for _, node := range nodes { + if err := Cordon(nodeInterface, node, options.Logger); err != nil { + return err + } + } + + drainedNodes := sets.NewString() + var fatal error + + for _, node := range nodes { + err := DeleteOrEvictPods(client, node, options) + if err == nil { + drainedNodes.Insert(node.Name) + logf(options.Logger, "drained node %q", node.Name) + } else { + log(options.Logger, err) + logf(options.Logger, "unable to drain node %q", node.Name) + remainingNodes := []string{} + fatal = err + for _, remainingNode := range nodes { + if drainedNodes.Has(remainingNode.Name) { + continue + } + remainingNodes = append(remainingNodes, remainingNode.Name) + } + + if len(remainingNodes) > 0 { + sort.Strings(remainingNodes) + logf(options.Logger, "there are pending nodes to be drained: %s", strings.Join(remainingNodes, ",")) + } + } + } + + return fatal +} + +// DeleteOrEvictPods deletes or (where supported) evicts pods from the +// target node and waits until the deletion/eviction completes, +// Timeout elapses, or an error occurs. +func DeleteOrEvictPods(client kubernetes.Interface, node *corev1.Node, options *DrainOptions) error { + pods, err := getPodsForDeletion(client, node, options) + if err != nil { + return err + } + + err = deleteOrEvictPods(client, pods, options) + if err != nil { + pendingPods, newErr := getPodsForDeletion(client, node, options) + if newErr != nil { + return newErr + } + pendingNames := make([]string, len(pendingPods)) + for i, pendingPod := range pendingPods { + pendingNames[i] = pendingPod.Name + } + sort.Strings(pendingNames) + logf(options.Logger, "failed to evict pods from node %q (pending pods: %s): %v", node.Name, strings.Join(pendingNames, ","), err) + } + return err +} + +func getPodController(pod corev1.Pod) *metav1.OwnerReference { + return metav1.GetControllerOf(&pod) +} + +func (o *DrainOptions) unreplicatedFilter(pod corev1.Pod) (bool, *warning, *fatal) { + // any finished pod can be removed + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return true, nil, nil + } + + controllerRef := getPodController(pod) + if controllerRef != nil { + return true, nil, nil + } + if o.Force { + return true, &warning{kUnmanagedWarning}, nil + } + + return false, nil, &fatal{kUnmanagedFatal} +} + +type DaemonSetFilterOptions struct { + client typedextensionsv1beta1.ExtensionsV1beta1Interface + force bool + ignoreDaemonSets bool +} + +func (o *DaemonSetFilterOptions) daemonSetFilter(pod corev1.Pod) (bool, *warning, *fatal) { + // Note that we return false in cases where the pod is DaemonSet managed, + // regardless of flags. We never delete them, the only question is whether + // their presence constitutes an error. + // + // The exception is for pods that are orphaned (the referencing + // management resource - including DaemonSet - is not found). + // Such pods will be deleted if Force is used. + controllerRef := getPodController(pod) + if controllerRef == nil || controllerRef.Kind != "DaemonSet" { + return true, nil, nil + } + + if _, err := o.client.DaemonSets(pod.Namespace).Get(controllerRef.Name, metav1.GetOptions{}); err != nil { + // remove orphaned pods with a warning if Force is used + if apierrors.IsNotFound(err) && o.force { + return true, &warning{err.Error()}, nil + } + return false, nil, &fatal{err.Error()} + } + + if !o.ignoreDaemonSets { + return false, nil, &fatal{kDaemonsetFatal} + } + + return false, &warning{kDaemonsetWarning}, nil +} + +func mirrorPodFilter(pod corev1.Pod) (bool, *warning, *fatal) { + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + return false, nil, nil + } + return true, nil, nil +} + +func hasLocalStorage(pod corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (o *DrainOptions) localStorageFilter(pod corev1.Pod) (bool, *warning, *fatal) { + if !hasLocalStorage(pod) { + return true, nil, nil + } + if !o.DeleteLocalData { + return false, nil, &fatal{kLocalStorageFatal} + } + return true, &warning{kLocalStorageWarning}, nil +} + +// Map of status message to a list of pod names having that status. +type podStatuses map[string][]string + +func (ps podStatuses) message() string { + msgs := []string{} + + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +// getPodsForDeletion receives resource info for a node, and returns all the pods from the given node that we +// are planning on deleting. If there are any pods preventing us from deleting, we return that list in an error. +func getPodsForDeletion(client kubernetes.Interface, node *corev1.Node, options *DrainOptions) (pods []corev1.Pod, err error) { + listOptions := metav1.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String(), + } + if options.Selector != nil { + listOptions.LabelSelector = options.Selector.String() + } + podList, err := client.CoreV1().Pods(options.Namespace).List(listOptions) + if err != nil { + return pods, err + } + + ws := podStatuses{} + fs := podStatuses{} + + daemonSetOptions := &DaemonSetFilterOptions{ + client: client.ExtensionsV1beta1(), + force: options.Force, + ignoreDaemonSets: options.IgnoreDaemonsets, + } + + for _, pod := range podList.Items { + podOk := true + for _, filt := range []podFilter{daemonSetOptions.daemonSetFilter, mirrorPodFilter, options.localStorageFilter, options.unreplicatedFilter} { + filterOk, w, f := filt(pod) + + podOk = podOk && filterOk + if w != nil { + ws[w.string] = append(ws[w.string], pod.Name) + } + if f != nil { + fs[f.string] = append(fs[f.string], pod.Name) + } + + // short-circuit as soon as pod not ok + // at that point, there is no reason to run pod + // through any additional filters + if !podOk { + break + } + } + if podOk { + pods = append(pods, pod) + } + } + + if len(fs) > 0 { + return []corev1.Pod{}, errors.New(fs.message()) + } + if len(ws) > 0 { + log(options.Logger, ws.message()) + } + return pods, nil +} + +func evictPod(client typedpolicyv1beta1.PolicyV1beta1Interface, pod corev1.Pod, policyGroupVersion string, gracePeriodSeconds int) error { + deleteOptions := &metav1.DeleteOptions{} + if gracePeriodSeconds >= 0 { + gracePeriod := int64(gracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriod + } + eviction := &policyv1beta1.Eviction{ + TypeMeta: metav1.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: deleteOptions, + } + return client.Evictions(eviction.Namespace).Evict(eviction) +} + +// deleteOrEvictPods deletes or evicts the pods on the api server +func deleteOrEvictPods(client kubernetes.Interface, pods []corev1.Pod, options *DrainOptions) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := SupportEviction(client) + if err != nil { + return err + } + + getPodFn := func(namespace, name string) (*corev1.Pod, error) { + return client.CoreV1().Pods(options.Namespace).Get(name, metav1.GetOptions{}) + } + + if len(policyGroupVersion) > 0 { + // Remember to change change the URL manipulation func when Evction's version change + return evictPods(client.PolicyV1beta1(), pods, policyGroupVersion, options, getPodFn) + } else { + return deletePods(client.CoreV1(), pods, options, getPodFn) + } +} + +func evictPods(client typedpolicyv1beta1.PolicyV1beta1Interface, pods []corev1.Pod, policyGroupVersion string, options *DrainOptions, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + returnCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod corev1.Pod, returnCh chan error) { + var err error + for { + err = evictPod(client, pod, policyGroupVersion, options.GracePeriodSeconds) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + returnCh <- nil + return + } else if apierrors.IsTooManyRequests(err) { + logf(options.Logger, "error when evicting pod %q (will retry after 5s): %v", pod.Name, err) + time.Sleep(5 * time.Second) + } else { + returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + podArray := []corev1.Pod{pod} + _, err = waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, options.Logger, getPodFn) + if err == nil { + returnCh <- nil + } else { + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, returnCh) + } + + doneCount := 0 + var errors []error + + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if options.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = options.Timeout + } + globalTimeoutCh := time.After(globalTimeout) + numPods := len(pods) + for doneCount < numPods { + select { + case err := <-returnCh: + doneCount++ + if err != nil { + errors = append(errors, err) + } + case <-globalTimeoutCh: + return fmt.Errorf("Drain did not complete within %v", globalTimeout) + } + } + return utilerrors.NewAggregate(errors) +} + +func deletePods(client typedcorev1.CoreV1Interface, pods []corev1.Pod, options *DrainOptions, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if options.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = options.Timeout + } + deleteOptions := &metav1.DeleteOptions{} + if options.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(options.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + for _, pod := range pods { + err := client.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + _, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, options.Logger, getPodFn) + return err +} + +func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, logger golog.Logger, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) { + var verbStr string + if usingEviction { + verbStr = "evicted" + } else { + verbStr = "deleted" + } + + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []corev1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + logf(logger, "pod %q removed (%s)", pod.Name, verbStr) + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} + +// SupportEviction uses Discovery API to find out if the server +// supports the eviction subresource. If supported, it will return +// its groupVersion; otherwise it will return an empty string. +func SupportEviction(clientset kubernetes.Interface) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + +// Cordon marks a node "Unschedulable". This method is idempotent. +func Cordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger) error { + return cordonOrUncordon(client, node, logger, true) +} + +// Uncordon marks a node "Schedulable". This method is idempotent. +func Uncordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger) error { + return cordonOrUncordon(client, node, logger, false) +} + +func cordonOrUncordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger, desired bool) error { + unsched := node.Spec.Unschedulable + if unsched == desired { + return nil + } + + patch := []byte(fmt.Sprintf("{\"spec\":{\"unschedulable\":%t}}", desired)) + _, err := client.Patch(node.Name, types.StrategicMergePatchType, patch) + if err == nil { + verbStr := "cordoned" + if !desired { + verbStr = "un" + verbStr + } + logf(logger, "%s node %q", verbStr, node.Name) + } + return err +} + +func log(logger golog.Logger, v ...interface{}) { + if logger != nil { + logger.Log(v...) + } +} + +func logf(logger golog.Logger, format string, v ...interface{}) { + if logger != nil { + logger.Logf(format, v...) + } +}