From 6abeed0ccc83bcc904d51308d27af4d274fd78d5 Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 15 Jun 2023 19:01:24 +0000 Subject: [PATCH 1/6] backport of commit 0d5d06fab871eba4abe50147c156efbdc84152aa --- api/watch/funcs.go | 37 ++++++++++++++ api/watch/funcs_test.go | 104 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) diff --git a/api/watch/funcs.go b/api/watch/funcs.go index cc4f33327711..3cd575268398 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -186,6 +186,13 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { state = "any" } + var ( + tags []string + ) + if err := assignValueStringSlice(params, "tag", &tags); err != nil { + return nil, err + } + fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) @@ -201,11 +208,41 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { if err != nil { return nil, nil, err } + if len(tags) > 0 { + checks = filterChecksByTags(checks, tags) + } + return WaitIndexVal(meta.LastIndex), checks, err } return fn, nil } +// filterChecksByTags filters HealthChecks by the tags +func filterChecksByTags(checks consulapi.HealthChecks, tags []string) consulapi.HealthChecks { + filteredChecks := consulapi.HealthChecks{} + + for _, check := range checks { + // the check is appended to the filteredChecks if its tags + // contain every tag in tags + svcTagMap := map[string]struct{}{} + for _, svcTag := range check.ServiceTags { + svcTagMap[svcTag] = struct{}{} + } + + count := 0 + for _, tag := range tags { + if _, ok := svcTagMap[tag]; ok { + count++ + } + + if count == len(tags) { + filteredChecks = append(filteredChecks, check) + } + } + } + return filteredChecks +} + // eventWatch is used to watch for events, optionally filtering on name func eventWatch(params map[string]interface{}) (WatcherFunc, error) { // The stale setting doesn't apply to events. diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index 1c23654f0426..c7e02044066e 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -769,6 +769,110 @@ func TestChecksWatch_Service(t *testing.T) { } } +func TestChecksWatch_Service_Tags(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.HealthCheck + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"checks", "tag":["a", "b"]}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.HealthCheck) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + { + catalog := c.Catalog() + + // we don't want to find this one + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "foo", + Service: "foo", + Tags: []string{"a"}, + }, + Check: &api.AgentCheck{ + Node: "foo", + CheckID: "foo", + Name: "foo", + Status: api.HealthPassing, + ServiceID: "foo", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we want to find this one + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "bar", + Service: "bar", + Tags: []string{"a", "b"}, + }, + Check: &api.AgentCheck{ + Node: "bar", + CheckID: "bar", + Name: "bar", + Status: api.HealthPassing, + ServiceID: "bar", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 2) + + { + v := wakeups[0] + require.Len(t, v, 0) + } + { + v := wakeups[1] + require.Len(t, v, 1) + require.Equal(t, "bar", v[0].CheckID) + } +} + func TestEventWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) From 2ce9b34a0b96e9dfba172c6c3cd7bddcf06a7e7c Mon Sep 17 00:00:00 2001 From: cskh Date: Fri, 16 Jun 2023 03:26:04 +0000 Subject: [PATCH 2/6] backport of commit eaf50ef6e012c5379aecf21f60c2ff984f8a34f8 --- agent/consul/health_endpoint_test.go | 6 ++++++ agent/health_endpoint.go | 13 ++++++++++++ api/health.go | 13 ++++++++++++ api/watch/funcs.go | 31 +--------------------------- api/watch/funcs_test.go | 2 +- 5 files changed, 34 insertions(+), 31 deletions(-) diff --git a/agent/consul/health_endpoint_test.go b/agent/consul/health_endpoint_test.go index fc1e34fae397..e31263a98718 100644 --- a/agent/consul/health_endpoint_test.go +++ b/agent/consul/health_endpoint_test.go @@ -1765,5 +1765,11 @@ func TestHealth_RPC_Filter(t *testing.T) { out = new(structs.IndexedHealthChecks) require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out)) require.Len(t, out.HealthChecks, 1) + + args.State = api.HealthAny + args.Filter = "connect in ServiceTags and v2 in ServiceTags" + out = new(structs.IndexedHealthChecks) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out)) + require.Len(t, out.HealthChecks, 1) }) } diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 6ea64528b07b..9edb50e64bc2 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "net/http" "net/url" "strconv" @@ -34,6 +35,18 @@ func (s *HTTPHandlers) HealthChecksInState(resp http.ResponseWriter, req *http.R return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing check state"} } + // build tag filter + params := req.URL.Query() + if tags, ok := params["tag"]; ok { + for i, tag := range tags { + expr := fmt.Sprintf(`%s in ServiceTags`, tag) + if i < len(tags)-1 { + expr += " and " + } + args.Filter += expr + } + } + // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) diff --git a/api/health.go b/api/health.go index a89b4b7273f5..a9fe717e772d 100644 --- a/api/health.go +++ b/api/health.go @@ -363,6 +363,12 @@ func (h *Health) service(service string, tags []string, passingOnly bool, q *Que // State is used to retrieve all the checks in a given state. // The wildcard "any" state can also be used for all checks. func (h *Health) State(state string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { + return h.StateTags(state, nil, q) +} + +// StateTags is used to retrieve all the checks in a given state and tags. +// The wildcard "any" state can also be used for all checks. +func (h *Health) StateTags(state string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { switch state { case HealthAny: case HealthWarning: @@ -373,6 +379,13 @@ func (h *Health) State(state string, q *QueryOptions) (HealthChecks, *QueryMeta, } r := h.c.newRequest("GET", "/v1/health/state/"+state) r.setQueryOptions(q) + + if len(tags) > 0 { + for _, tag := range tags { + r.params.Add("tag", tag) + } + } + rtt, resp, err := h.c.doRequest(r) if err != nil { return nil, nil, err diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 3cd575268398..39337447f6aa 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -201,48 +201,19 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { var meta *consulapi.QueryMeta var err error if state != "" { - checks, meta, err = health.State(state, &opts) + checks, meta, err = health.StateTags(state, tags, &opts) } else { checks, meta, err = health.Checks(service, &opts) } if err != nil { return nil, nil, err } - if len(tags) > 0 { - checks = filterChecksByTags(checks, tags) - } return WaitIndexVal(meta.LastIndex), checks, err } return fn, nil } -// filterChecksByTags filters HealthChecks by the tags -func filterChecksByTags(checks consulapi.HealthChecks, tags []string) consulapi.HealthChecks { - filteredChecks := consulapi.HealthChecks{} - - for _, check := range checks { - // the check is appended to the filteredChecks if its tags - // contain every tag in tags - svcTagMap := map[string]struct{}{} - for _, svcTag := range check.ServiceTags { - svcTagMap[svcTag] = struct{}{} - } - - count := 0 - for _, tag := range tags { - if _, ok := svcTagMap[tag]; ok { - count++ - } - - if count == len(tags) { - filteredChecks = append(filteredChecks, check) - } - } - } - return filteredChecks -} - // eventWatch is used to watch for events, optionally filtering on name func eventWatch(params map[string]interface{}) (WatcherFunc, error) { // The stale setting doesn't apply to events. diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index c7e02044066e..5044cb5edf45 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -781,7 +781,7 @@ func TestChecksWatch_Service_Tags(t *testing.T) { notifyCh = make(chan struct{}) ) - plan := mustParse(t, `{"type":"checks", "tag":["a", "b"]}`) + plan := mustParse(t, `{"type":"checks", "tag":["b", "a"]}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore From 601611f8217275d5bae836d46186369e9ab25583 Mon Sep 17 00:00:00 2001 From: cskh Date: Fri, 16 Jun 2023 03:53:02 +0000 Subject: [PATCH 3/6] backport of commit 55d927d1ae7704f2b3ab9598cd1bb71620a1a24a --- agent/health_endpoint.go | 12 +++++ api/health.go | 10 ++++ api/watch/funcs.go | 2 +- api/watch/funcs_test.go | 106 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 128 insertions(+), 2 deletions(-) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 9edb50e64bc2..2c15e81d6e3f 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -137,6 +137,18 @@ func (s *HTTPHandlers) HealthServiceChecks(resp http.ResponseWriter, req *http.R return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"} } + // build tag filter + params := req.URL.Query() + if tags, ok := params["tag"]; ok { + for i, tag := range tags { + expr := fmt.Sprintf(`%s in ServiceTags`, tag) + if i < len(tags)-1 { + expr += " and " + } + args.Filter += expr + } + } + // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) diff --git a/api/health.go b/api/health.go index a9fe717e772d..67ec9e92cc0c 100644 --- a/api/health.go +++ b/api/health.go @@ -258,7 +258,17 @@ func (h *Health) Node(node string, q *QueryOptions) (HealthChecks, *QueryMeta, e // Checks is used to return the checks associated with a service func (h *Health) Checks(service string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { + return h.ChecksTags(service, nil, q) +} + +// ChecksTags is used to return the checks associated with a service filtered by tags +func (h *Health) ChecksTags(service string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { r := h.c.newRequest("GET", "/v1/health/checks/"+service) + if len(tags) > 0 { + for _, tag := range tags { + r.params.Add("tag", tag) + } + } r.setQueryOptions(q) rtt, resp, err := h.c.doRequest(r) if err != nil { diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 39337447f6aa..60cd11ef7ef6 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -203,7 +203,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { if state != "" { checks, meta, err = health.StateTags(state, tags, &opts) } else { - checks, meta, err = health.Checks(service, &opts) + checks, meta, err = health.ChecksTags(service, tags, &opts) } if err != nil { return nil, nil, err diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index 5044cb5edf45..8bdd2016931d 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -769,7 +769,111 @@ func TestChecksWatch_Service(t *testing.T) { } } -func TestChecksWatch_Service_Tags(t *testing.T) { +func TestChecksWatch_Service_Tag(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.HealthCheck + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"checks", "service":"foobar", "tag":["b", "a"]}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.HealthCheck) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + { + catalog := c.Catalog() + + // we want to find this one + reg := &api.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "foobar", + Service: "foobar", + Tags: []string{"a", "b"}, + }, + Check: &api.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: api.HealthPassing, + ServiceID: "foobar", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we don't want to find this one + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + Service: &api.AgentService{ + ID: "foobar", + Service: "foobar", + Tags: []string{"a"}, + }, + Check: &api.AgentCheck{ + Node: "bar", + CheckID: "foobar", + Name: "foobar", + Status: api.HealthPassing, + ServiceID: "foobar", + }, + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 2) + + { + v := wakeups[0] + require.Len(t, v, 0) + } + { + v := wakeups[1] + require.Len(t, v, 1) + require.Equal(t, "foobar", v[0].CheckID) + } +} + +func TestChecksWatch_Tag(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() From dac5d18ac1bfb5854b15c6215017b8ad622c7cea Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 22 Jun 2023 13:53:22 +0000 Subject: [PATCH 4/6] backport of commit 7c0660f52204d2f3fe172dea84393665d1f3d05e --- agent/health_endpoint.go | 25 ------------------------- api/health.go | 23 ----------------------- api/watch/funcs.go | 20 +++++++++----------- api/watch/funcs_test.go | 32 ++++---------------------------- command/watch/watch.go | 5 +++++ 5 files changed, 18 insertions(+), 87 deletions(-) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 2c15e81d6e3f..6ea64528b07b 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -1,7 +1,6 @@ package agent import ( - "fmt" "net/http" "net/url" "strconv" @@ -35,18 +34,6 @@ func (s *HTTPHandlers) HealthChecksInState(resp http.ResponseWriter, req *http.R return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing check state"} } - // build tag filter - params := req.URL.Query() - if tags, ok := params["tag"]; ok { - for i, tag := range tags { - expr := fmt.Sprintf(`%s in ServiceTags`, tag) - if i < len(tags)-1 { - expr += " and " - } - args.Filter += expr - } - } - // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) @@ -137,18 +124,6 @@ func (s *HTTPHandlers) HealthServiceChecks(resp http.ResponseWriter, req *http.R return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"} } - // build tag filter - params := req.URL.Query() - if tags, ok := params["tag"]; ok { - for i, tag := range tags { - expr := fmt.Sprintf(`%s in ServiceTags`, tag) - if i < len(tags)-1 { - expr += " and " - } - args.Filter += expr - } - } - // Make the RPC request var out structs.IndexedHealthChecks defer setMeta(resp, &out.QueryMeta) diff --git a/api/health.go b/api/health.go index 67ec9e92cc0c..a89b4b7273f5 100644 --- a/api/health.go +++ b/api/health.go @@ -258,17 +258,7 @@ func (h *Health) Node(node string, q *QueryOptions) (HealthChecks, *QueryMeta, e // Checks is used to return the checks associated with a service func (h *Health) Checks(service string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { - return h.ChecksTags(service, nil, q) -} - -// ChecksTags is used to return the checks associated with a service filtered by tags -func (h *Health) ChecksTags(service string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { r := h.c.newRequest("GET", "/v1/health/checks/"+service) - if len(tags) > 0 { - for _, tag := range tags { - r.params.Add("tag", tag) - } - } r.setQueryOptions(q) rtt, resp, err := h.c.doRequest(r) if err != nil { @@ -373,12 +363,6 @@ func (h *Health) service(service string, tags []string, passingOnly bool, q *Que // State is used to retrieve all the checks in a given state. // The wildcard "any" state can also be used for all checks. func (h *Health) State(state string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { - return h.StateTags(state, nil, q) -} - -// StateTags is used to retrieve all the checks in a given state and tags. -// The wildcard "any" state can also be used for all checks. -func (h *Health) StateTags(state string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) { switch state { case HealthAny: case HealthWarning: @@ -389,13 +373,6 @@ func (h *Health) StateTags(state string, tags []string, q *QueryOptions) (Health } r := h.c.newRequest("GET", "/v1/health/state/"+state) r.setQueryOptions(q) - - if len(tags) > 0 { - for _, tag := range tags { - r.params.Add("tag", tag) - } - } - rtt, resp, err := h.c.doRequest(r) if err != nil { return nil, nil, err diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 60cd11ef7ef6..1c798b536694 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -172,13 +172,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { return nil, err } - var service, state string + var service, state, filter string if err := assignValue(params, "service", &service); err != nil { return nil, err } if err := assignValue(params, "state", &state); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } if service != "" && state != "" { return nil, fmt.Errorf("Cannot specify service and state") } @@ -186,13 +189,6 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { state = "any" } - var ( - tags []string - ) - if err := assignValueStringSlice(params, "tag", &tags); err != nil { - return nil, err - } - fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) @@ -200,15 +196,17 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) { var checks []*consulapi.HealthCheck var meta *consulapi.QueryMeta var err error + if filter != "" { + opts.Filter = filter + } if state != "" { - checks, meta, err = health.StateTags(state, tags, &opts) + checks, meta, err = health.State(state, &opts) } else { - checks, meta, err = health.ChecksTags(service, tags, &opts) + checks, meta, err = health.Checks(service, &opts) } if err != nil { return nil, nil, err } - return WaitIndexVal(meta.LastIndex), checks, err } return fn, nil diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index 8bdd2016931d..2baf9047bf9e 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -769,7 +769,7 @@ func TestChecksWatch_Service(t *testing.T) { } } -func TestChecksWatch_Service_Tag(t *testing.T) { +func TestChecksWatch_Service_Filter(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() @@ -781,7 +781,7 @@ func TestChecksWatch_Service_Tag(t *testing.T) { notifyCh = make(chan struct{}) ) - plan := mustParse(t, `{"type":"checks", "service":"foobar", "tag":["b", "a"]}`) + plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore @@ -808,8 +808,6 @@ func TestChecksWatch_Service_Tag(t *testing.T) { <-notifyCh { catalog := c.Catalog() - - // we want to find this one reg := &api.CatalogRegistration{ Node: "foobar", Address: "1.1.1.1", @@ -830,28 +828,6 @@ func TestChecksWatch_Service_Tag(t *testing.T) { if _, err := catalog.Register(reg, nil); err != nil { t.Fatalf("err: %v", err) } - - // we don't want to find this one - reg = &api.CatalogRegistration{ - Node: "bar", - Address: "2.2.2.2", - Datacenter: "dc1", - Service: &api.AgentService{ - ID: "foobar", - Service: "foobar", - Tags: []string{"a"}, - }, - Check: &api.AgentCheck{ - Node: "bar", - CheckID: "foobar", - Name: "foobar", - Status: api.HealthPassing, - ServiceID: "foobar", - }, - } - if _, err := catalog.Register(reg, nil); err != nil { - t.Fatalf("err: %v", err) - } } // Wait for second wakeup. @@ -873,7 +849,7 @@ func TestChecksWatch_Service_Tag(t *testing.T) { } } -func TestChecksWatch_Tag(t *testing.T) { +func TestChecksWatch_Filter(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() @@ -885,7 +861,7 @@ func TestChecksWatch_Tag(t *testing.T) { notifyCh = make(chan struct{}) ) - plan := mustParse(t, `{"type":"checks", "tag":["b", "a"]}`) + plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`) plan.Handler = func(idx uint64, raw interface{}) { if raw == nil { return // ignore diff --git a/command/watch/watch.go b/command/watch/watch.go index f4e9211c9c6a..09de50871374 100644 --- a/command/watch/watch.go +++ b/command/watch/watch.go @@ -42,6 +42,7 @@ type cmd struct { state string name string shell bool + filter string } func (c *cmd) init() { @@ -68,6 +69,7 @@ func (c *cmd) init() { "Specifies the states to watch. Optional for 'checks' type.") c.flags.StringVar(&c.name, "name", "", "Specifies an event name to watch. Only for 'event' type.") + c.flags.StringVar(&c.filter, "filter", "", "Filter to use with the request") c.http = &flags.HTTPFlags{} flags.Merge(c.flags, c.http.ClientFlags()) @@ -125,6 +127,9 @@ func (c *cmd) Run(args []string) int { if c.service != "" { params["service"] = c.service } + if c.filter != "" { + params["filter"] = c.filter + } if len(c.tag) > 0 { params["tag"] = c.tag } From 585ba754ac6d57b0b67119eecd03df339776c27b Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 22 Jun 2023 15:51:20 +0000 Subject: [PATCH 5/6] backport of commit af5e30a1e35c6b762cdc47ce56f38d8440bb3f28 --- api/watch/funcs.go | 21 ++++ api/watch/funcs_test.go | 240 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 261 insertions(+) diff --git a/api/watch/funcs.go b/api/watch/funcs.go index 1c798b536694..5fcfbffa1fe0 100644 --- a/api/watch/funcs.go +++ b/api/watch/funcs.go @@ -89,13 +89,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { // servicesWatch is used to watch the list of available services func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() services, meta, err := catalog.Services(&opts) if err != nil { @@ -109,13 +116,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { // nodesWatch is used to watch the list of available nodes func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } fn := func(p *Plan) (BlockingParamVal, interface{}, error) { catalog := p.client.Catalog() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := catalog.Nodes(&opts) if err != nil { @@ -129,9 +143,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { // serviceWatch is used to watch a specific service for changes func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false + filter := "" if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } + if err := assignValue(params, "filter", &filter); err != nil { + return nil, err + } var ( service string @@ -155,6 +173,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) { health := p.client.Health() opts := makeQueryOptionsWithContext(p, stale) + if filter != "" { + opts.Filter = filter + } defer p.cancelFunc() nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts) if err != nil { diff --git a/api/watch/funcs_test.go b/api/watch/funcs_test.go index 2baf9047bf9e..db2d4a70e720 100644 --- a/api/watch/funcs_test.go +++ b/api/watch/funcs_test.go @@ -375,6 +375,82 @@ func TestServicesWatch(t *testing.T) { } +func TestServicesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups []map[string][]string + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.(map[string][]string) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register some services + { + agent := c.Agent() + + // we don't want to find this + reg := &api.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // // we want to find this + reg = &api.AgentServiceRegistration{ + ID: "bar", + Name: "bar", + Tags: []string{"a", "b"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + _, ok := v["bar"] + require.True(t, ok) + } +} + func TestNodesWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -450,6 +526,82 @@ func TestNodesWatch(t *testing.T) { } } +func TestNodesWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) // wait for AE to sync + + var ( + wakeups [][]*api.Node + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.Node) + if !ok { + return // ignore + } + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // Register 2 nodes + { + catalog := c.Catalog() + + // we want to find this node + reg := &api.CatalogRegistration{ + Node: "foo", + Address: "1.1.1.1", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // we don't want to find this node + reg = &api.CatalogRegistration{ + Node: "bar", + Address: "2.2.2.2", + Datacenter: "dc1", + } + if _, err := catalog.Register(reg, nil); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + // Start the watch nodes plan + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for first wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + require.Equal(t, "foo", v[0].Node) + } +} + func TestServiceWatch(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -613,6 +765,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) { } } +func TestServiceWatch_Filter(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + s.WaitForSerfCheck(t) + + var ( + wakeups [][]*api.ServiceEntry + notifyCh = make(chan struct{}) + ) + + plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`) + plan.Handler = func(idx uint64, raw interface{}) { + if raw == nil { + return // ignore + } + v, ok := raw.([]*api.ServiceEntry) + if !ok { + return // ignore + } + + wakeups = append(wakeups, v) + notifyCh <- struct{}{} + } + + // register some services + { + agent := c.Agent() + + // we do not want to find this one. + reg := &api.AgentServiceRegistration{ + ID: "foobarbiff", + Name: "foo", + Tags: []string{"bar", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we do not want to find this one. + reg = &api.AgentServiceRegistration{ + ID: "foobuzzbiff", + Name: "foo", + Tags: []string{"buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + + // we want to find this one + reg = &api.AgentServiceRegistration{ + ID: "foobarbuzzbiff", + Name: "foo", + Tags: []string{"bar", "buzz", "biff"}, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := plan.Run(s.HTTPAddr); err != nil { + t.Errorf("err: %v", err) + } + }() + defer plan.Stop() + + // Wait for second wakeup. + <-notifyCh + + plan.Stop() + wg.Wait() + + require.Len(t, wakeups, 1) + + { + v := wakeups[0] + require.Len(t, v, 1) + + require.Equal(t, "foobarbuzzbiff", v[0].Service.ID) + require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags) + } +} + func TestChecksWatch_State(t *testing.T) { t.Parallel() c, s := makeClient(t) From fe861b23eef77ed31336929eda9dbcd36154e30e Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 22 Jun 2023 19:04:21 +0000 Subject: [PATCH 6/6] backport of commit 10a501b42be418e3bb6d66b152ed4f079f1d9100 --- .changelog/17780.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/17780.txt diff --git a/.changelog/17780.txt b/.changelog/17780.txt new file mode 100644 index 000000000000..b90925a8b9fd --- /dev/null +++ b/.changelog/17780.txt @@ -0,0 +1,3 @@ +```release-note:feature +cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service. +```