Skip to content

Commit

Permalink
Add watch from resource version (and tests)
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Mar 30, 2019
1 parent 3ce0a05 commit 583d7de
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
2 changes: 1 addition & 1 deletion examples/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {
Path: "/api/v1/namespaces",
MakerFn: func() interface{} { return &client.V1Namespace{} },
}
if resultChan, errChan, err := watch.Connect(context.Background()); err != nil {
if resultChan, errChan, err := watch.Connect(context.Background(), ""); err != nil {
panic(err)
} else {
for obj := range resultChan {
Expand Down
11 changes: 8 additions & 3 deletions kubernetes/client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ type WatchClient struct {
MakerFn func() interface{}
}

// Connect initiates a watch to the server. TODO: support watch from resource version
func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) {
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true"
// Connect initiates a watch to the server.
func (w *WatchClient) Connect(ctx context.Context, resourceVersion string) (<-chan *Result, <-chan error, error) {
params := []string{"watch=true"}
if len(resourceVersion) != 0 {
params = append(params, "resourceVersion="+resourceVersion)
}
queryStr := "?" + strings.Join(params, "&")
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + queryStr
req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{})
if err != nil {
return nil, nil, err
Expand Down
45 changes: 41 additions & 4 deletions kubernetes/client/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
func makerFn() interface{} { return &V1Namespace{} }

type staticHandler struct {
Code int
Body string
Code int
Body string
QueryParams url.Values
}

func (s *staticHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(s.Code)
res.Write([]byte(s.Body))
s.QueryParams = req.URL.Query()
}

func TestFullError(t *testing.T) {
Expand All @@ -43,7 +45,7 @@ func TestFullError(t *testing.T) {
MakerFn: makerFn,
}

if _, _, err := watch.Connect(context.Background()); err == nil {
if _, _, err := watch.Connect(context.Background(), ""); err == nil {
t.Error("unexpected nil error")
}
}
Expand All @@ -70,7 +72,7 @@ func TestFull(t *testing.T) {
MakerFn: makerFn,
}

resultChan, errChan, err := watch.Connect(context.Background())
resultChan, errChan, err := watch.Connect(context.Background(), "")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -100,6 +102,41 @@ func TestFull(t *testing.T) {
}
}

func TestResourceVersion(t *testing.T) {
handler := &staticHandler{
Code: 200,
Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`,
}
server := httptest.NewServer(handler)
defer server.Close()

u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

cfg := &Configuration{}
cfg.Host = u.Host
cfg.Scheme = u.Scheme

watch := WatchClient{
Cfg: cfg,
Client: NewAPIClient(cfg),
MakerFn: makerFn,
}

version := "12345"
_, _, err = watch.Connect(context.Background(), version)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

versionOut := handler.QueryParams["resourceVersion"][0]
if versionOut != version {
t.Errorf("unexpected resource version %s vs %s", version, versionOut)
}
}

func TestDecode(t *testing.T) {
line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}`
result, err := decode(line, makerFn)
Expand Down

0 comments on commit 583d7de

Please sign in to comment.