Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Migrate to Custom Resources #113

Merged
merged 2 commits into from
Jul 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
branch = "master"
name = "github.com/stretchr/testify"

[[constraint]]
name = "k8s.io/apiextensions-apiserver"
revision = "be41f5093e2b05c7a0befe35b04b715eb325ab43" # just before refactor to move to k8s.io/api/

# Transitive dependencies used by Service Catalog. Not resolved properly, have to override :(

[[override]]
Expand Down
48 changes: 28 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,49 @@ It may or may not fulfil the requirements of https://github.com/kubernetes/kuber
## The idea

What if we build a service that allows us to manage Kubernetes' built-in resources and other
[Third Party Resources](https://github.com/kubernetes/kubernetes/blob/master/docs/design/extending-api.md)
(TPR) in a generic way? Similar to how AWS CloudFormation (or Google Deployment Manager) allows us to manage any
[Custom Resources](https://kubernetes.io/docs/concepts/api-extension/custom-resources/) (CRs) in a generic way?
Similar to how AWS CloudFormation (or Google Deployment Manager) allows us to manage any
AWS/GCE and custom resource. Then we could expose all the resources we need
to integrate as Third Party Resources and manage them declaratively. This is an open architecture
with Kubernetes as its core. Other microservices can create/update/watch TPRs to co-ordinate their work/lifecycle.
with Kubernetes as its core. Other microservices can create/update/watch CRs to co-ordinate their work/lifecycle.

## Implementation

A group of resources is defined using a Bundle (just like a Stack for AWS CloudFormation).
The Bundle itself is also a Kubernetes TPR.
The Bundle itself is also a Kubernetes CR.
Smith watches for new instances of a Bundle (and events to existing ones), picks them up and processes them.

Processing involves parsing the bundle, building a dependency graph (which is implicitly defined in the bundle),
walking the graph, and creating/updating necessary resources. Each created/referenced resource gets
an label pointing at the origin Bundle.

### Example bundle
TPR definitions:
CR definitions:
```yaml
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "Resource bundle definition"
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: bundle.smith.atlassian.com
versions:
- name: v1
name: bundles.smith.atlassian.com
spec:
group: smith.atlassian.com
version: v1
names:
kind: Bundle
plural: bundles
singular: bundle
```
```yaml
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "Postgresql resource definition"
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresql-resource.smith-sql.atlassian.com # must use another group due to https://github.com/kubernetes/kubernetes/issues/23831
versions:
- name: v1
name: postgresql-resources.smith.atlassian.com
spec:
group: smith.atlassian.com
version: v1
names:
kind: PostgresqlResource
plural: postgresqlresources
singular: postgresqlresource
```
Bundle:
```yaml
Expand All @@ -64,7 +72,7 @@ spec:
resources:
- name: db1
spec:
apiVersion: smith-sql.atlassian.com/v1
apiVersion: smith.atlassian.com/v1
kind: PostgresqlResource
metadata:
name: db1
Expand Down Expand Up @@ -122,7 +130,7 @@ and following the same behaviour, semantics and code "style" as native Kubernete
- [Service Catalog](https://github.com/kubernetes-incubator/service-catalog) support: objects with kind `Instance` and `Binding`.
See [an example](https://github.com/atlassian/smith/tree/master/examples/service_catalog) and
[recording of the presentation](https://youtu.be/7fgPgtQh5Es) to [Service Catalog SIG](https://github.com/kubernetes/community/tree/master/sig-service-catalog);
- Dynamic TPR support via [special annotations](https://github.com/atlassian/smith/blob/master/docs/design/managing-resources.md#defined-annotations);
- Dynamic Custom Resources support via [special annotations](https://github.com/atlassian/smith/blob/master/docs/design/managing-resources.md#defined-annotations);
- References between objects in the graph to pull parts of objects/fields from dependencies;
- Smith will delete objects which were removed from a Bundle when Bundle reconciliation is performed (e.g. on a Bundle update).

Expand All @@ -137,7 +145,7 @@ Mirantis App Controller (discussed here https://github.com/kubernetes/kubernetes

* Graph of dependencies is defined explicitly.
* It uses polling and blocks while waiting for the resource to become READY.
* The goal of Smith is to manage instances of TPRs. App Controller cannot manage them as of this writing.
* The goal of Smith is to manage instances of CRs. App Controller cannot manage them as of this writing.

### On [Helm](https://helm.sh/)
Helm is a package manager for Kubernetes. Smith operates on a lower level, even though it can be used by a human,
Expand Down
53 changes: 30 additions & 23 deletions cmd/smith/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/ash2k/stager"
sc_v1a1 "github.com/kubernetes-incubator/service-catalog/pkg/apis/servicecatalog/v1alpha1"
scClientset "github.com/kubernetes-incubator/service-catalog/pkg/client/clientset_generated/clientset"
crdClientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
crdInformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
api_v1 "k8s.io/client-go/pkg/api/v1"
apps_v1b1 "k8s.io/client-go/pkg/apis/apps/v1beta1"
Expand All @@ -36,7 +37,7 @@ import (
)

var (
tprCreationBackoff = wait.Backoff{
crdCreationBackoff = wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.2,
Jitter: 0.1,
Expand Down Expand Up @@ -70,6 +71,10 @@ func (a *App) Run(ctx context.Context) error {
return err
}
}
crdClient, err := crdClientset.NewForConfig(a.RestConfig)
if err != nil {
return err
}
sc := smart.NewClient(a.RestConfig, a.ServiceCatalogConfig, clientset, scClient)
scheme, err := FullScheme(a.ServiceCatalogConfig != nil)
if err != nil {
Expand All @@ -89,25 +94,30 @@ func (a *App) Run(ctx context.Context) error {
bundleInf := client.BundleInformer(bundleClient, a.Namespace, a.ResyncPeriod)
multiStore.AddInformer(smith.BundleGVK, bundleInf)

resourceInfs, tprInf := a.resourceInformers(clientset, scClient)
multiStore.AddInformer(tprGVK, tprInf)
informerFactory := crdInformers.NewSharedInformerFactory(crdClient, a.ResyncPeriod)
crdInf := informerFactory.Apiextensions().V1beta1().CustomResourceDefinitions().Informer()
crdStore, err := store.NewCrd(crdInf, scheme.DeepCopy)
if err != nil {
return err
}
multiStore.AddInformer(crdGVK, crdInf)
stage = stgr.NextStage()
stage.StartWithChannel(tprInf.Run) // Must be after store.AddInformer()
stage.StartWithChannel(crdInf.Run) // Must be after store.AddInformer()

// We must wait for tprInf to populate its cache to avoid reading from an empty cache
// in Ready Checker and in EnsureTprExists().
if !cache.WaitForCacheSync(ctx.Done(), tprInf.HasSynced) {
return errors.New("wait for TPR Informer was cancelled")
// We must wait for crdInf to populate its cache to avoid reading from an empty cache
// in Ready Checker and in EnsureCrdExists().
if !cache.WaitForCacheSync(ctx.Done(), crdInf.HasSynced) {
return errors.New("wait for CRD Informer was cancelled")
}

// Ensure ThirdPartyResource Bundle exists
err = wait.ExponentialBackoff(tprCreationBackoff, func() (bool /*done*/, error) {
if err := resources.EnsureTprExists(ctx, clientset, multiStore, resources.BundleTpr()); err != nil {
// Ensure CRD Bundle exists
err = wait.ExponentialBackoff(crdCreationBackoff, func() (bool /*done*/, error) {
if err := resources.EnsureCrdExists(ctx, scheme, crdClient, multiStore, resources.BundleCrd()); err != nil {
// TODO be smarter about what is retried
if err == context.Canceled || err == context.DeadlineExceeded {
return true, err
}
log.Printf("Failed to create TPR %s: %v", smith.BundleResourceName, err)
log.Printf("Failed to create CRD %s: %v", smith.BundleResourceName, err)
return false, nil
}
return true, nil
Expand All @@ -121,7 +131,8 @@ func (a *App) Run(ctx context.Context) error {
if err != nil {
return err
}
cntrlr := a.controller(bundleInf, tprInf, bundleClient, bs, sc, scheme, multiStore, resourceInfs)
resourceInfs := a.resourceInformers(clientset, scClient)
cntrlr := a.controller(bundleInf, crdInf, bundleClient, bs, crdStore, sc, scheme, multiStore, resourceInfs)

// Add all informers to Multi store and start them
for gvk, inf := range resourceInfs {
Expand All @@ -134,15 +145,15 @@ func (a *App) Run(ctx context.Context) error {
return ctx.Err()
}

func (a *App) controller(bundleInf, tprInf cache.SharedIndexInformer, bundleClient rest.Interface, bundleStore controller.BundleStore,
func (a *App) controller(bundleInf, crdInf cache.SharedIndexInformer, bundleClient rest.Interface, bundleStore controller.BundleStore, crdStore readychecker.CrdStore,
sc smith.SmartClient, scheme *runtime.Scheme, cStore controller.Store, resourceInfs map[schema.GroupVersionKind]cache.SharedIndexInformer) *controller.BundleController {

// Ready Checker
readyTypes := []map[schema.GroupKind]readychecker.IsObjectReady{ready_types.MainKnownTypes}
if a.ServiceCatalogConfig != nil {
readyTypes = append(readyTypes, ready_types.ServiceCatalogKnownTypes)
}
rc := readychecker.New(&store.Tpr{Store: cStore}, readyTypes...)
rc := readychecker.New(crdStore, readyTypes...)

// Object cleanup
cleanupTypes := []map[schema.GroupKind]cleanup.SpecCleanup{clean_types.MainKnownTypes}
Expand All @@ -158,12 +169,10 @@ func (a *App) controller(bundleInf, tprInf cache.SharedIndexInformer, bundleClie
}
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bundle")

return controller.New(bundleInf, tprInf, bundleClient, bundleStore, sc, rc, scheme, cStore, specCheck, queue, a.Workers, a.ResyncPeriod, resourceInfs)
return controller.New(bundleInf, crdInf, bundleClient, bundleStore, sc, rc, scheme, cStore, specCheck, queue, a.Workers, a.ResyncPeriod, resourceInfs)
}

func (a *App) resourceInformers(mainClient kubernetes.Interface, scClient scClientset.Interface) (map[schema.GroupVersionKind]cache.SharedIndexInformer, cache.SharedIndexInformer) {
mainSif := informers.NewSharedInformerFactory(mainClient, a.ResyncPeriod)

func (a *App) resourceInformers(mainClient kubernetes.Interface, scClient scClientset.Interface) map[schema.GroupVersionKind]cache.SharedIndexInformer {
// Core API types
infs := map[schema.GroupVersionKind]cache.SharedIndexInformer{
ext_v1b1.SchemeGroupVersion.WithKind("Ingress"): a.ingressInformer(mainClient),
Expand All @@ -182,7 +191,5 @@ func (a *App) resourceInformers(mainClient kubernetes.Interface, scClient scClie
infs[sc_v1a1.SchemeGroupVersion.WithKind("Instance")] = a.instanceInformer(scClient)
}

tprInf := mainSif.Extensions().V1beta1().ThirdPartyResources().Informer()

return infs, tprInf
return infs
}
4 changes: 3 additions & 1 deletion cmd/smith/app/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/atlassian/smith"

sc_v1a1 "github.com/kubernetes-incubator/service-catalog/pkg/apis/servicecatalog/v1alpha1"
apiext_v1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
api_v1 "k8s.io/client-go/pkg/api/v1"
Expand All @@ -13,7 +14,7 @@ import (
)

var (
tprGVK = ext_v1b1.SchemeGroupVersion.WithKind("ThirdPartyResource")
crdGVK = apiext_v1b1.SchemeGroupVersion.WithKind("CustomResourceDefinition")
)

func FullScheme(serviceCatalog bool) (*runtime.Scheme, error) {
Expand All @@ -24,6 +25,7 @@ func FullScheme(serviceCatalog bool) (*runtime.Scheme, error) {
sb.Register(api_v1.SchemeBuilder...)
sb.Register(apps_v1b1.SchemeBuilder...)
sb.Register(settings_v1a1.SchemeBuilder...)
sb.Register(apiext_v1b1.SchemeBuilder...)
if serviceCatalog {
sb.Register(sc_v1a1.SchemeBuilder...)
} else {
Expand Down
Loading