Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a simple informer implementation. #28

Merged
merged 1 commit into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions examples/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"context"
"fmt"
"time"

"github.com/kubernetes-client/go/kubernetes/client"
"github.com/kubernetes-client/go/kubernetes/config"
)

type handler struct{}

func (h handler) OnAdd(obj interface{}) {
ns := obj.(*client.V1Namespace)
fmt.Printf("Added %s\n", ns.Metadata.Name)
}

func (h handler) OnUpdate(oldObj, newObj interface{}) {
ns := newObj.(*client.V1Namespace)
fmt.Printf("Updated %s\n", ns.Metadata.Name)
}

func (h handler) OnDelete(obj interface{}) {
ns := obj.(*client.V1Namespace)
fmt.Printf("Deleted %s\n", ns.Metadata.Name)
}

func main() {
c, err := config.LoadKubeConfig()
if err != nil {
panic(err.Error())
}

// create the clientset
clientset := client.NewAPIClient(c)

lister := func() ([]interface{}, string, error) {
namespaces, _, err := clientset.CoreV1Api.ListNamespace(context.Background(), nil)
if err != nil {
return nil, "", err
}
result := make([]interface{}, len(namespaces.Items))
for ix := range namespaces.Items {
result[ix] = &namespaces.Items[ix]
}
return result, namespaces.Metadata.ResourceVersion, nil
}

watcher := func(resourceVersion string) (<-chan *client.Result, <-chan error) {
watch := client.WatchClient{
Cfg: c,
Client: clientset,
Path: "/api/v1/namespaces",
MakerFn: func() interface{} {
return &client.V1Namespace{}
},
}
results, errors, err := watch.Connect(context.Background(), resourceVersion)
if err != nil {
fmt.Printf("err: %s\n", err.Error())
}
return results, errors
}

extractor := func(obj interface{}) *client.V1ObjectMeta {
return obj.(*client.V1Namespace).Metadata
}

cache := client.Cache{
Extractor: extractor,
Lister: lister,
Watcher: watcher,
}
cache.AddEventHandler(handler{})
go cache.Run(make(chan bool))

for {
fmt.Printf("----------\n")
list := cache.List()
for ix := range list {
ns := list[ix].(*client.V1Namespace)
fmt.Printf("%s %#v\n", ns.Metadata.Name, ns.Metadata.Labels)
}
fmt.Printf("----------\n")
time.Sleep(5 * time.Second)
}
}
187 changes: 187 additions & 0 deletions kubernetes/client/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package client

import (
"log"
"time"
)

type objEntry struct {
metadata *V1ObjectMeta
obj interface{}
}

// ObjectLister is a function that knows how to list objects.
type ObjectLister func() ([]interface{}, string, error)

// ObjectWatcher is a function that knows how to perform a watch.
type ObjectWatcher func(resourceVersion string) (results <-chan *Result, errors <-chan error)

// EventHandler is implemented by objects that want event notifications
type EventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// Informer is an interface for things that can provide notifications
type Informer interface {
AddEventHandler(handler EventHandler)
}

// Lister is an interface for things that can list objects for all namespaces or by namespace
type Lister interface {
List() []interface{}
ByNamespace(namespace string) []interface{}
}

// Validate that we implement the interfaces
var _ Lister = &Cache{}
var _ Informer = &Cache{}

// Cache is an implementation of a List/Watch cache
type Cache struct {
Extractor func(interface{}) *V1ObjectMeta
Lister ObjectLister
Watcher ObjectWatcher
allObjects []objEntry
namespaceObjects map[string][]objEntry
eventHandlers []EventHandler
}

func (c *Cache) AddEventHandler(handler EventHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}

const maxSleep = 60 * time.Second

func (c *Cache) Run(stop <-chan bool) {
sleep := 1 * time.Second
for {
select {
case <-stop:
return
default:
// pass
}
if err := c.ListWatch(); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exponential backoff? otherwise there could be hot looping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

log.Printf("%s\n", err.Error())
time.Sleep(sleep)
sleep = sleep * 2
if sleep > maxSleep {
sleep = maxSleep
}
} else {
sleep = 1
}
}
}

func (c *Cache) ListWatch() error {
objects, resourceVersion, err := c.Lister()
if err != nil {
return err
}
for ix := range objects {
meta := c.Extractor(objects[ix])
c.AddOrUpdate(meta, objects[ix])
}
results, errors := c.Watcher(resourceVersion)
for {
select {
case result, ok := <-results:
if !ok {
return nil
}
c.ProcessResult(result)
case err := <-errors:
return err
}
}
}

func (c *Cache) ProcessResult(res *Result) {
metadata := c.Extractor(res.Object)

switch res.Type {
case Added, Modified:
c.AddOrUpdate(metadata, res.Object)
case Deleted:
c.Delete(metadata, res.Object)
}
}

func (c *Cache) AddOrUpdate(metadata *V1ObjectMeta, obj interface{}) {
var oldObj interface{}
c.allObjects, oldObj = InsertOrUpdate(c.allObjects, metadata, obj)
if len(metadata.Namespace) > 0 {
c.namespaceObjects[metadata.Namespace], _ =
InsertOrUpdate(c.namespaceObjects[metadata.Namespace], metadata, obj)
}
for ix := range c.eventHandlers {
if oldObj == nil {
c.eventHandlers[ix].OnAdd(obj)
} else {
c.eventHandlers[ix].OnUpdate(oldObj, obj)
}
}
}

func (c *Cache) Delete(metadata *V1ObjectMeta, obj interface{}) {
var deleted bool
c.allObjects, deleted = Delete(c.allObjects, metadata)
if len(metadata.Namespace) > 0 {
c.namespaceObjects[metadata.Namespace], _ =
Delete(c.namespaceObjects[metadata.Namespace], metadata)
}
if deleted {
for ix := range c.eventHandlers {
c.eventHandlers[ix].OnDelete(obj)
}
}
}

func (c *Cache) List() []interface{} {
result := make([]interface{}, len(c.allObjects))
for ix := range c.allObjects {
result[ix] = c.allObjects[ix].obj
}
return result
}

func (c *Cache) ByNamespace(namespace string) []interface{} {
list := c.namespaceObjects[namespace]
result := make([]interface{}, len(list))
for ix := range list {
result[ix] = list[ix].obj
}
return result
}

func InsertOrUpdate(list []objEntry, metadata *V1ObjectMeta, obj interface{}) ([]objEntry, interface{}) {
ix := FindObject(list, metadata)
if ix == -1 {
return append(list, objEntry{metadata: metadata, obj: obj}), nil
}
oldObj := list[ix]
list[ix] = objEntry{metadata: metadata, obj: obj}
return list, oldObj
}

func Delete(list []objEntry, metadata *V1ObjectMeta) ([]objEntry, bool) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to expose the function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to add tests, so yes.

ix := FindObject(list, metadata)
if ix == -1 {
return list, false
}
return append(list[:ix], list[ix+1:]...), true
}

func FindObject(list []objEntry, metadata *V1ObjectMeta) int {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean to expose the function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

for ix := range list {
entry := &list[ix]
if entry.metadata.Namespace == metadata.Namespace &&
entry.metadata.Name == metadata.Name {
return ix
}
}
return -1
}
4 changes: 4 additions & 0 deletions kubernetes/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type Result struct {
Object interface{}
}

const Added = "ADDED"
const Modified = "MODIFIED"
const Deleted = "DELETED"

// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/config/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (l *KubeConfigLoader) refreshAzureToken() error {
AccessToken: l.user.AuthProvider.Config["access-token"],
RefreshToken: l.user.AuthProvider.Config["refresh-token"],
ExpiresIn: json.Number(l.user.AuthProvider.Config["expires-in"]),
ExpiresOn: json.Number(l.user.AuthProvider.Config["expires-in"]),
ExpiresOn: json.Number(l.user.AuthProvider.Config["expires-on"]),
}
sptToken, err := adal.NewServicePrincipalTokenFromManualToken(*config, clientID, resource, token)
if err := sptToken.Refresh(); err != nil {
Expand Down