diff --git a/examples/watch/watch.go b/examples/watch/watch.go new file mode 100644 index 0000000..d197b71 --- /dev/null +++ b/examples/watch/watch.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: the example only works with the code within the same release/branch. +package main + +import ( + "context" + "fmt" + + "github.com/kubernetes-client/go/kubernetes/client" + "github.com/kubernetes-client/go/kubernetes/config" +) + +func main() { + c, err := config.LoadKubeConfig() + if err != nil { + panic(err.Error()) + } + cl := client.NewAPIClient(c) + + watch := client.WatchClient{ + Cfg: c, + Client: cl, + Path: "/api/v1/namespaces", + MakerFn: func() interface{} { return &client.V1Namespace{} }, + } + if resultChan, errChan, err := watch.Connect(context.Background()); err != nil { + panic(err) + } else { + for obj := range resultChan { + fmt.Printf("%s\n%#v\n", obj.Type, obj.Object) + } + for err := range errChan { + fmt.Printf("ERROR: %#v", err) + } + fmt.Printf("Closed!\n") + } +} diff --git a/kubernetes/client/watch.go b/kubernetes/client/watch.go new file mode 100644 index 0000000..111d0b5 --- /dev/null +++ b/kubernetes/client/watch.go @@ -0,0 +1,98 @@ +package client + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "strings" +) + +// Result is a watch result +type Result struct { + Type string + Object interface{} +} + +// WatchClient is a client for Watching the Kubernetes API +type WatchClient struct { + Cfg *Configuration + Client *APIClient + Path string + MakerFn func() interface{} +} + +func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) { + url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true" + req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{}) + if err != nil { + return nil, nil, err + } + res, err := w.Client.callAPI(req) + if err != nil { + return nil, nil, err + } + if res.StatusCode != 200 { + return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status) + } + resultChan := make(chan *Result) + errChan := make(chan error) + processWatch(res.Body, w.MakerFn, resultChan, errChan) + return resultChan, errChan, nil +} + +func processWatch(stream io.Reader, makerFn func() interface{}, resultChan chan<- *Result, errChan chan<- error) { + scanner := bufio.NewScanner(stream) + go func() { + defer close(resultChan) + defer close(errChan) + for scanner.Scan() { + watchObj, err := decode(scanner.Text(), makerFn) + if err != nil { + errChan <- err + return + } + if watchObj != nil { + resultChan <- watchObj + } + } + if err := scanner.Err(); err != nil { + errChan <- err + } + }() +} + +func decode(line string, makerFn func() interface{}) (*Result, error) { + if len(line) == 0 { + return nil, nil + } + decoder := json.NewDecoder(strings.NewReader(line)) + result := &Result{} + for decoder.More() { + name, err := decoder.Token() + if err != nil { + return nil, err + } + if name == "type" { + token, err := decoder.Token() + if err != nil { + return nil, err + } + var ok bool + result.Type, ok = token.(string) + if !ok { + return nil, fmt.Errorf("Error casting %v to string", token) + } + } + if name == "object" { + obj := makerFn() + if err := decoder.Decode(&obj); err != nil { + return nil, err + } + result.Object = obj + return result, nil + } + } + return nil, nil +} diff --git a/kubernetes/client/watch_test.go b/kubernetes/client/watch_test.go new file mode 100644 index 0000000..3c34815 --- /dev/null +++ b/kubernetes/client/watch_test.go @@ -0,0 +1,55 @@ +package client + +import ( + "strings" + "testing" +) + +func makerFn() interface{} { return &V1Namespace{} } + +func TestDecode(t *testing.T) { + line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}` + result, err := decode(line, makerFn) + if err != nil { + t.Errorf("Unexpeceted non-nil: %v", err) + } + ns, ok := result.Object.(*V1Namespace) + if !ok { + t.Errorf("Cast failed: %v", result.Object) + } + if ns.Kind != "Namespace" || ns.Metadata.Name != "kube-system" { + t.Errorf("Unexpected value %#v", *ns) + } +} + +func TestMultiDecode(t *testing.T) { + lines := ` + {"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} + {"type":"MODIFIED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-2","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} + {"type":"DELETED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-3","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}} + ` + + results := make(chan *Result) + errs := make(chan error) + processWatch(strings.NewReader(lines), makerFn, results, errs) + + out := []*Result{} + outErrs := []error{} + + for r := range results { + out = append(out, r) + } + + for e := range errs { + outErrs = append(outErrs, e) + } + + if len(out) != 3 { + t.Errorf("unexpected results: %v", out) + } + for ix, val := range []string{"ADDED", "MODIFIED", "DELETED"} { + if out[ix].Type != val { + t.Errorf("unexpected value (%d): %v", ix, out[ix]) + } + } +}