Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
operator: Add option to act as cluster wide (#1777)
Browse files Browse the repository at this point in the history
* Add option for operator to act as cluster wide

for a cluster to be managed widely, user have to add annotation:
  etcd.database.coreos.com/scope: clusterwide
And admin have to run an operator with option "-cluster-wide".

Current implementation have a lack of locking if many operators with
cluster-wide run on different namespaces.
  • Loading branch information
guilhem authored and hongchaodeng committed Feb 13, 2018
1 parent 82b3065 commit 0a4c2c1
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 19 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,8 @@ Follow the [etcd backup operator walkthrough](./doc/user/walkthrough/backup-oper

Follow the [etcd restore operator walkthrough](./doc/user/walkthrough/restore-operator.md) to restore an etcd cluster on Kubernetes from backup.

### Limitations

- The etcd operator only manages the etcd cluster created in the same namespace. Users need to create multiple operators in different namespaces to manage etcd clusters in different namespaces.
### Manage etcd clusters in all namespaces

See [instructions on clusterwide feature](doc/user/clusterwide.md).

[k8s-home]: http://kubernetes.io
4 changes: 4 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var (
printVersion bool

createCRD bool

clusterWide bool
)

func init() {
Expand All @@ -66,6 +68,7 @@ func init() {
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.BoolVar(&createCRD, "create-crd", true, "The operator will not create the EtcdCluster CRD when this flag is set to false.")
flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "GC interval")
flag.BoolVar(&clusterWide, "cluster-wide", false, "Enable operator to watch clusters in all namespaces")
flag.Parse()
}

Expand Down Expand Up @@ -151,6 +154,7 @@ func newControllerConfig() controller.Config {

cfg := controller.Config{
Namespace: namespace,
ClusterWide: clusterWide,
ServiceAccount: serviceAccount,
KubeCli: kubecli,
KubeExtCli: k8sutil.MustNewKubeExtClient(),
Expand Down
12 changes: 12 additions & 0 deletions doc/user/clusterwide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Manage clusters in all namespaces

Default etcd operator behavior is to only manage etcd clusters created in the same namespace.
It is possible to deploy an etcd operator with special option to manage clusterwide etcd clusters.

## Install etcd operator

etcd operator have to run with `-cluster-wide` arg option.

More information in [install guide](doc/user/install_guide.md).

See the example in [example/example-etcd-cluster.yaml](../../example/example-etcd-cluster.yaml)
2 changes: 2 additions & 0 deletions example/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ spec:
image: quay.io/coreos/etcd-operator:v0.9.0
command:
- etcd-operator
# Uncomment to act for resources in all namespaces. More information in doc/clusterwide.md
#- -cluster-wide
env:
- name: MY_POD_NAMESPACE
valueFrom:
Expand Down
4 changes: 4 additions & 0 deletions example/example-etcd-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ apiVersion: "etcd.database.coreos.com/v1beta2"
kind: "EtcdCluster"
metadata:
name: "example-etcd-cluster"
## Adding this annotation make this cluster managed by clusterwide operators
## namespaced operators ignore it
# annotations:
# etcd.database.coreos.com/scope: clusterwide
spec:
size: 3
version: "3.2.13"
22 changes: 14 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Controller struct {

type Config struct {
Namespace string
ClusterWide bool
ServiceAccount string
KubeCli kubernetes.Interface
KubeExtCli apiextensionsclient.Interface
Expand All @@ -61,28 +62,33 @@ func New(cfg Config) *Controller {
}
}

func (c *Controller) handleClusterEvent(event *Event) error {
// handleClusterEvent returns true if cluster is ignored (not managed) by this instance.
func (c *Controller) handleClusterEvent(event *Event) (bool, error) {
clus := event.Object

if !c.managed(clus) {
return true, nil
}

if clus.Status.IsFailed() {
clustersFailed.Inc()
if event.Type == kwatch.Deleted {
delete(c.clusters, clus.Name)
return nil
return false, nil
}
return fmt.Errorf("ignore failed cluster (%s). Please delete its CR", clus.Name)
return false, fmt.Errorf("ignore failed cluster (%s). Please delete its CR", clus.Name)
}

clus.SetDefaults()

if err := clus.Spec.Validate(); err != nil {
return fmt.Errorf("invalid cluster spec. please fix the following problem with the cluster spec: %v", err)
return false, fmt.Errorf("invalid cluster spec. please fix the following problem with the cluster spec: %v", err)
}

switch event.Type {
case kwatch.Added:
if _, ok := c.clusters[clus.Name]; ok {
return fmt.Errorf("unsafe state. cluster (%s) was created before but we received event (%s)", clus.Name, event.Type)
return false, fmt.Errorf("unsafe state. cluster (%s) was created before but we received event (%s)", clus.Name, event.Type)
}

nc := cluster.New(c.makeClusterConfig(), clus)
Expand All @@ -94,21 +100,21 @@ func (c *Controller) handleClusterEvent(event *Event) error {

case kwatch.Modified:
if _, ok := c.clusters[clus.Name]; !ok {
return fmt.Errorf("unsafe state. cluster (%s) was never created but we received event (%s)", clus.Name, event.Type)
return false, fmt.Errorf("unsafe state. cluster (%s) was never created but we received event (%s)", clus.Name, event.Type)
}
c.clusters[clus.Name].Update(clus)
clustersModified.Inc()

case kwatch.Deleted:
if _, ok := c.clusters[clus.Name]; !ok {
return fmt.Errorf("unsafe state. cluster (%s) was never created but we received event (%s)", clus.Name, event.Type)
return false, fmt.Errorf("unsafe state. cluster (%s) was never created but we received event (%s)", clus.Name, event.Type)
}
c.clusters[clus.Name].Delete()
delete(c.clusters, clus.Name)
clustersDeleted.Inc()
clustersTotal.Dec()
}
return nil
return false, nil
}

func (c *Controller) makeClusterConfig() cluster.Config {
Expand Down
66 changes: 61 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"strings"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/cluster"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

func TestHandleClusterEventUpdateFailedCluster(t *testing.T) {
Expand All @@ -40,7 +39,7 @@ func TestHandleClusterEventUpdateFailedCluster(t *testing.T) {
Type: watch.Modified,
Object: clus,
}
err := c.handleClusterEvent(e)
_, err := c.handleClusterEvent(e)
prefix := "ignore failed cluster"
if !strings.HasPrefix(err.Error(), prefix) {
t.Errorf("expect err='%s...', get=%v", prefix, err)
Expand All @@ -65,11 +64,68 @@ func TestHandleClusterEventDeleteFailedCluster(t *testing.T) {

c.clusters[name] = &cluster.Cluster{}

if err := c.handleClusterEvent(e); err != nil {
if _, err := c.handleClusterEvent(e); err != nil {
t.Fatal(err)
}

if c.clusters[name] != nil {
t.Errorf("failed cluster not cleaned up after delete event, cluster struct: %v", c.clusters[name])
}
}

func TestHandleClusterEventClusterwide(t *testing.T) {
c := New(Config{ClusterWide: true})

clus := &api.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Annotations: map[string]string{
"etcd.database.coreos.com/scope": "clusterwide",
},
},
}
e := &Event{
Type: watch.Modified,
Object: clus,
}
if ignored, _ := c.handleClusterEvent(e); ignored {
t.Errorf("cluster shouldn't be ignored")
}
}

func TestHandleClusterEventClusterwideIgnored(t *testing.T) {
c := New(Config{ClusterWide: true})

clus := &api.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
}
e := &Event{
Type: watch.Modified,
Object: clus,
}
if ignored, _ := c.handleClusterEvent(e); !ignored {
t.Errorf("cluster should be ignored")
}
}

func TestHandleClusterEventNamespacedIgnored(t *testing.T) {
c := New(Config{})

clus := &api.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Annotations: map[string]string{
"etcd.database.coreos.com/scope": "clusterwide",
},
},
}
e := &Event{
Type: watch.Modified,
Object: clus,
}
if ignored, _ := c.handleClusterEvent(e); !ignored {
t.Errorf("cluster should be ignored")
}
}
28 changes: 25 additions & 3 deletions pkg/controller/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"time"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"github.com/coreos/etcd-operator/pkg/util/k8sutil"
"github.com/coreos/etcd-operator/pkg/util/probe"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -52,10 +54,17 @@ func (c *Controller) Start() error {
}

func (c *Controller) run() {
var ns string
if c.Config.ClusterWide {
ns = metav1.NamespaceAll
} else {
ns = c.Config.Namespace
}

source := cache.NewListWatchFromClient(
c.Config.EtcdCRCli.EtcdV1beta2().RESTClient(),
api.EtcdClusterResourcePlural,
c.Config.Namespace,
ns,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, &api.EtcdCluster{}, 0, cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -105,7 +114,7 @@ func (c *Controller) onDeleteEtcdClus(obj interface{}) {
}

pt.start()
err := c.handleClusterEvent(ev)
_, err := c.handleClusterEvent(ev)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
}
Expand All @@ -125,9 +134,22 @@ func (c *Controller) syncEtcdClus(clus *api.EtcdCluster) {
}

pt.start()
err := c.handleClusterEvent(ev)
_, err := c.handleClusterEvent(ev)
if err != nil {
c.logger.Warningf("fail to handle event: %v", err)
}
pt.stop()
}

func (c *Controller) managed(clus *api.EtcdCluster) bool {
if v, ok := clus.Annotations[k8sutil.AnnotationScope]; ok {
if c.Config.ClusterWide {
return v == k8sutil.AnnotationClusterWide
}
} else {
if !c.Config.ClusterWide {
return true
}
}
return false
}
5 changes: 5 additions & 0 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ const (
maxNameLength = 63 - randomSuffixLength - 1

defaultKubeAPIRequestTimeout = 30 * time.Second

// AnnotationScope annotation name for defining instance scope. Used for specifing cluster wide clusters.
AnnotationScope = "etcd.database.coreos.com/scope"
//AnnotationClusterWide annotation value for cluster wide clusters.
AnnotationClusterWide = "clusterwide"
)

const TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
Expand Down

0 comments on commit 0a4c2c1

Please sign in to comment.