From 82099caedae5e9a761d1639a5ac3a22407ba089e Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 22 Jun 2023 11:51:20 -0400 Subject: [PATCH] Add filter for watch nodes --- api/watch/funcs.go | 7 ++++ api/watch/funcs_test.go | 76 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 2f141277fe14d..d6275da5233d3 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -112,13 +112,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { // nodesWatch is used to watch the list of available nodes func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index fbc20cb6a2fb5..7d4cf66f4cced 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -453,6 +453,82 @@ func TestNodesWatch(t *testing.T) { } } +func TestNodesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) // wait for AE to sync + + var ( + wakeups [][]*api.Node + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.Node) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register 2 nodes + { + catalog := c.Catalog() + + // we want to find this node + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we don't want to find this node + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + // Start the watch nodes plan + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + require.Equal(t, "foo", v[0].Node) + } +} + func TestServiceWatch(t *testing.T) { t.Parallel() c, s := makeClient(t)