Skip to content

Commit

Permalink
Add filter for watch nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
huikang committed Jun 22, 2023
1 parent 7c0660f commit 82099ca
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
7 changes: 7 additions & 0 deletions api/watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions api/watch/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,82 @@ func TestNodesWatch(t *testing.T) {
}
}

func TestNodesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t) // wait for AE to sync

var (
wakeups [][]*api.Node
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.Node)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

// Register 2 nodes
{
catalog := c.Catalog()

// we want to find this node
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}

// we don't want to find this node
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}

var wg sync.WaitGroup
wg.Add(1)
// Start the watch nodes plan
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for first wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 1)

{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foo", v[0].Node)
}
}

func TestServiceWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down

0 comments on commit 82099ca

Please sign in to comment.