diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 2f141277fe14..0d0f6e100c2b 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { // servicesWatch is used to watch the list of available services func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { @@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { // nodesWatch is used to watch the list of available nodes func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { @@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { // serviceWatch is used to watch a specific service for changes func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } var ( service string @@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts) if err != nil { diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index fbc20cb6a2fb..91318009ceac 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -378,6 +378,82 @@ func TestServicesWatch(t *testing.T) { } +func TestServicesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups []map[string][]string + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(map[string][]string) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register some services + { + agent := c.Agent() + + // we don't want to find this + reg := &api.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // // we want to find this + reg = &api.AgentServiceRegistration{ + ID: "bar", + Name: "bar", + Tags: []string{"a", "b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + _, ok := v["bar"] + require.True(t, ok) + } +} + func TestNodesWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -453,6 +529,82 @@ func TestNodesWatch(t *testing.T) { } } +func TestNodesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) // wait for AE to sync + + var ( + wakeups [][]*api.Node + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.Node) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register 2 nodes + { + catalog := c.Catalog() + + // we want to find this node + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we don't want to find this node + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + // Start the watch nodes plan + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + require.Equal(t, "foo", v[0].Node) + } +} + func TestServiceWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -616,6 +768,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) { } } +func TestServiceWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.ServiceEntry + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.ServiceEntry) + if !ok { + return // ignore + } + + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // register some services + { + agent := c.Agent() + + // we do not want to find this one. + reg := &api.AgentServiceRegistration{ + ID: "foobarbiff", + Name: "foo", + Tags: []string{"bar", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we do not want to find this one. + reg = &api.AgentServiceRegistration{ + ID: "foobuzzbiff", + Name: "foo", + Tags: []string{"buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we want to find this one + reg = &api.AgentServiceRegistration{ + ID: "foobarbuzzbiff", + Name: "foo", + Tags: []string{"bar", "buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + + require.Equal(t, "foobarbuzzbiff", v[0].Service.ID) + require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags) + } +} + func TestChecksWatch_State(t *testing.T) { t.Parallel() c, s := makeClient(t)