Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add leader election. #48

Merged
merged 1 commit into from
May 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ script:
- go fmt $(go list ./... | grep -v vendor) | wc -l | grep 0
- go vet $(go list ./... | grep -v vendor)
- go test $(go list ./... | grep -v vendor)
- go build cmd/csi-attacher/main.go
- make
after_success:
- if [ "${TRAVIS_BRANCH}" == "master" ] && [ "${TRAVIS_PULL_REQUEST}" == "false" ]; then
make container;
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,20 @@ $ csi-attacher -dummy -kubeconfig ~/.kube/config -v 5
### Real attacher

#### Running on command line
With `hack/local-up-cluster.sh`:
For debugging, it's possible to run the attacher on command line:

```sh
$ csi-attacher -kubeconfig ~/.kube/config -v 5 -csi-address /run/csi/socket
```

#### Running in a stateful set
#### Running in a deployment
It is necessary to create a new service account and give it enough privileges to run the attacher. We provide one omnipotent yaml file that creates everything that's necessary, however it should be split into multiple files in production.

```sh
$ kubectl create deploy/kubernetes/statefulset.yaml
$ kubectl create deploy/kubernetes/deployment.yaml
```

Note that the attacher does not scale with more replicas. Only one attacher is elected as leader and running. The others are waiting for the leader to die. They re-elect a new active leader in ~15 seconds after death of the old leader.

## Vendoring

Expand Down
83 changes: 83 additions & 0 deletions cmd/csi-attacher/leader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"os"
"time"

"github.com/golang/glog"
"github.com/kubernetes-csi/external-attacher/pkg/connection"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// waitForLeader waits until this particular external attacher becomes a leader.
func waitForLeader(clientset *kubernetes.Clientset, namespace string, identity string, lockName string) {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(namespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s %s", lockName, string(identity))})

rlConfig := resourcelock.ResourceLockConfig{
Identity: identity,
EventRecorder: eventRecorder,
}
lock, err := resourcelock.New(resourcelock.ConfigMapsResourceLock, namespace, connection.SanitizeDriverName(lockName), clientset.CoreV1(), rlConfig)
if err != nil {
glog.Error(err)
os.Exit(1)
}

elected := make(chan struct{})

leaderConfig := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(stop <-chan struct{}) {
glog.V(2).Info("Became leader, starting")
close(elected)
},
OnStoppedLeading: func() {
glog.Error("Stopped leading")
os.Exit(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leader election code is new to me. Why quit and kill the program which will restart the container which may look like many errors to users. Is there another way to just go back into waiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is, but the fact that leader lost its leadership means that something really bad happened to the leader (or API server) - it was not able to update an API object for 30 seconds.

Kubernetes controller-manager dies too:

https://github.com/jsafrane/kubernetes/blob/master/cmd/kube-controller-manager/app/controllermanager.go#L211

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool 👍

},
OnNewLeader: func(identity string) {
glog.V(3).Infof("Current leader: %s", identity)
},
},
}

go leaderelection.RunOrDie(leaderConfig)

// wait for being elected
<-elected
}
20 changes: 20 additions & 0 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var (
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
dummy = flag.Bool("dummy", false, "Run in dummy mode, i.e. not connecting to CSI driver and marking everything as attached. Expected CSI driver name is \"csi/dummy\".")
showVersion = flag.Bool("version", false, "Show version.")

enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where this attacher runs.")
leaderElectionIdentity = flag.String("leader-election-identity", "", "Unique idenity of this attcher. Typically name of the pod where the attacher runs.")
)

var (
Expand Down Expand Up @@ -81,6 +85,7 @@ func main() {
glog.Error(err.Error())
os.Exit(1)
}

factory := informers.NewSharedInformerFactory(clientset, *resync)

var handler controller.Handler
Expand Down Expand Up @@ -142,6 +147,21 @@ func main() {
}
}

if *enableLeaderElection {
// Leader election was requested.
if leaderElectionNamespace == nil || *leaderElectionNamespace == "" {
glog.Error("-leader-election-namespace must not be empty")
os.Exit(1)
}
if leaderElectionIdentity == nil || *leaderElectionIdentity == "" {
glog.Error("-leader-election-identity must not be empty")
os.Exit(1)
}
// Name of config map with leader election lock
lockName := "external-attacher-leader-" + attacher
waitForLeader(clientset, *leaderElectionNamespace, *leaderElectionIdentity, lockName)
}

ctrl := controller.NewCSIAttachController(
clientset,
attacher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ metadata:
name: csi-attacher

---
# Attacher must be able to work with PVs, nodes and VolumeAttachments
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
Expand All @@ -36,12 +37,44 @@ metadata:
subjects:
- kind: ServiceAccount
name: csi-attacher
# replace with non-default namespace name
namespace: default
roleRef:
kind: ClusterRole
name: external-attacher-runner
apiGroup: rbac.authorization.k8s.io

---
# Attacher must be able to work with config map in current namespace
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
# replace with non-default namespace name
namespace: default
name: external-attacher-cfg
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "watch", "list", "delete", "update", "create"]

---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-attacher-role-cfg
# replace with non-default namespace name
namespace: default
subjects:
- kind: ServiceAccount
name: csi-attacher
# replace with non-default namespace name
namespace: default
roleRef:
kind: Role
name: external-attacher-cfg
apiGroup: rbac.authorization.k8s.io


---
kind: Service
apiVersion: v1
Expand All @@ -57,26 +90,39 @@ spec:
port: 12345

---
kind: StatefulSet
apiVersion: apps/v1beta1
kind: Deployment
apiVersion: apps/v1
metadata:
name: csi-attacher
spec:
serviceName: "csi-attacher"
replicas: 1
replicas: 3
selector:
matchLabels:
external-attacher: mock-driver
template:
metadata:
labels:
app: csi-attacher
external-attacher: mock-driver
spec:
serviceAccount: csi-attacher
containers:
- name: csi-attacher
image: docker.io/k8scsi/csi-attacher
image: quay.io/k8scsi/csi-attacher
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
- "--leader-election"
- "--leader-election-namespace=$(MY_NAMESPACE)"
- "--leader-election-identity=$(MY_NAME)"
env:
- name: MY_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MY_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/mock.socket
imagePullPolicy: "IfNotPresent"
Expand All @@ -85,7 +131,8 @@ spec:
mountPath: /var/lib/csi/sockets/pluginproxy/

- name: mock-driver
image: docker.io/k8scsi/mock-plugin
image: quay.io/k8scsi/mock-plugin
imagePullPolicy: "IfNotPresent"
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/mock.socket
Expand Down