Skip to content

Commit

Permalink
Add support for watch.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Mar 22, 2019
1 parent 1eac579 commit e0a743c
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
52 changes: 52 additions & 0 deletions examples/watch/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Note: the example only works with the code within the same release/branch.
package main

import (
"context"
"fmt"

"github.com/kubernetes-client/go/kubernetes/client"
"github.com/kubernetes-client/go/kubernetes/config"
)

func main() {
c, err := config.LoadKubeConfig()
if err != nil {
panic(err.Error())
}
cl := client.NewAPIClient(c)

watch := client.WatchClient{
Cfg: c,
Client: cl,
Path: "/api/v1/namespaces",
MakerFn: func() interface{} { return &client.V1Namespace{} },
}
if resultChan, errChan, err := watch.Connect(context.Background()); err != nil {
panic(err)
} else {
for obj := range resultChan {
fmt.Printf("%s\n%#v\n", obj.Type, obj.Object)
}
for err := range errChan {
fmt.Printf("ERROR: %#v", err)
}
fmt.Printf("Closed!\n")
}
}
98 changes: 98 additions & 0 deletions kubernetes/client/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package client

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"
)

// Result is a watch result
type Result struct {
Type string
Object interface{}
}

// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration
Client *APIClient
Path string
MakerFn func() interface{}
}

func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) {
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true"
req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{})
if err != nil {
return nil, nil, err
}
res, err := w.Client.callAPI(req)
if err != nil {
return nil, nil, err
}
if res.StatusCode != 200 {
return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status)
}
resultChan := make(chan *Result)
errChan := make(chan error)
processWatch(res.Body, w.MakerFn, resultChan, errChan)
return resultChan, errChan, nil
}

func processWatch(stream io.Reader, makerFn func() interface{}, resultChan chan<- *Result, errChan chan<- error) {
scanner := bufio.NewScanner(stream)
go func() {
defer close(resultChan)
defer close(errChan)
for scanner.Scan() {
watchObj, err := decode(scanner.Text(), makerFn)
if err != nil {
errChan <- err
return
}
if watchObj != nil {
resultChan <- watchObj
}
}
if err := scanner.Err(); err != nil {
errChan <- err
}
}()
}

func decode(line string, makerFn func() interface{}) (*Result, error) {
if len(line) == 0 {
return nil, nil
}
decoder := json.NewDecoder(strings.NewReader(line))
result := &Result{}
for decoder.More() {
name, err := decoder.Token()
if err != nil {
return nil, err
}
if name == "type" {
token, err := decoder.Token()
if err != nil {
return nil, err
}
var ok bool
result.Type, ok = token.(string)
if !ok {
return nil, fmt.Errorf("Error casting %v to string", token)
}
}
if name == "object" {
obj := makerFn()
if err := decoder.Decode(&obj); err != nil {
return nil, err
}
result.Object = obj
return result, nil
}
}
return nil, nil
}
55 changes: 55 additions & 0 deletions kubernetes/client/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package client

import (
"strings"
"testing"
)

func makerFn() interface{} { return &V1Namespace{} }

func TestDecode(t *testing.T) {
line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}`
result, err := decode(line, makerFn)
if err != nil {
t.Errorf("Unexpeceted non-nil: %v", err)
}
ns, ok := result.Object.(*V1Namespace)
if !ok {
t.Errorf("Cast failed: %v", result.Object)
}
if ns.Kind != "Namespace" || ns.Metadata.Name != "kube-system" {
t.Errorf("Unexpected value %#v", *ns)
}
}

func TestMultiDecode(t *testing.T) {
lines := `
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"MODIFIED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-2","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"DELETED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-3","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
`

results := make(chan *Result)
errs := make(chan error)
processWatch(strings.NewReader(lines), makerFn, results, errs)

out := []*Result{}
outErrs := []error{}

for r := range results {
out = append(out, r)
}

for e := range errs {
outErrs = append(outErrs, e)
}

if len(out) != 3 {
t.Errorf("unexpected results: %v", out)
}
for ix, val := range []string{"ADDED", "MODIFIED", "DELETED"} {
if out[ix].Type != val {
t.Errorf("unexpected value (%d): %v", ix, out[ix])
}
}
}

0 comments on commit e0a743c

Please sign in to comment.