Skip to content

Commit

Permalink
Add filter for watch nodes, services, and service
Browse files Browse the repository at this point in the history
- unit test added
  • Loading branch information
huikang committed Jun 22, 2023
1 parent 7c0660f commit af5e30a
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 0 deletions.
21 changes: 21 additions & 0 deletions api/watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
services, meta, err := catalog.Services(&opts)
if err != nil {
Expand All @@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
Expand All @@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

var (
service string
Expand All @@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
if err != nil {
Expand Down
240 changes: 240 additions & 0 deletions api/watch/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,82 @@ func TestServicesWatch(t *testing.T) {

}

func TestServicesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t)

var (
wakeups []map[string][]string
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(map[string][]string)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

// Register some services
{
agent := c.Agent()

// we don't want to find this
reg := &api.AgentServiceRegistration{
ID: "foo",
Name: "foo",
Tags: []string{"b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}

// // we want to find this
reg = &api.AgentServiceRegistration{
ID: "bar",
Name: "bar",
Tags: []string{"a", "b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for second wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 1)

{
v := wakeups[0]
require.Len(t, v, 1)
_, ok := v["bar"]
require.True(t, ok)
}
}

func TestNodesWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -453,6 +529,82 @@ func TestNodesWatch(t *testing.T) {
}
}

func TestNodesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t) // wait for AE to sync

var (
wakeups [][]*api.Node
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.Node)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

// Register 2 nodes
{
catalog := c.Catalog()

// we want to find this node
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}

// we don't want to find this node
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}

var wg sync.WaitGroup
wg.Add(1)
// Start the watch nodes plan
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for first wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 1)

{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foo", v[0].Node)
}
}

func TestServiceWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -616,6 +768,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) {
}
}

func TestServiceWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t)

var (
wakeups [][]*api.ServiceEntry
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.ServiceEntry)
if !ok {
return // ignore
}

wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

// register some services
{
agent := c.Agent()

// we do not want to find this one.
reg := &api.AgentServiceRegistration{
ID: "foobarbiff",
Name: "foo",
Tags: []string{"bar", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}

// we do not want to find this one.
reg = &api.AgentServiceRegistration{
ID: "foobuzzbiff",
Name: "foo",
Tags: []string{"buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}

// we want to find this one
reg = &api.AgentServiceRegistration{
ID: "foobarbuzzbiff",
Name: "foo",
Tags: []string{"bar", "buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for second wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 1)

{
v := wakeups[0]
require.Len(t, v, 1)

require.Equal(t, "foobarbuzzbiff", v[0].Service.ID)
require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags)
}
}

func TestChecksWatch_State(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down

0 comments on commit af5e30a

Please sign in to comment.