Skip to content

Commit

Permalink
Merge 55d927d into backport/gh-17642-watch-checks-tag/regularly-just-…
Browse files Browse the repository at this point in the history
…macaw
  • Loading branch information
hc-github-team-consul-core authored Jun 30, 2023
2 parents c7327cd + 55d927d commit 48429e5
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
12 changes: 12 additions & 0 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func (s *HTTPHandlers) HealthServiceChecks(resp http.ResponseWriter, req *http.R
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"}
}

// build tag filter
params := req.URL.Query()
if tags, ok := params["tag"]; ok {
for i, tag := range tags {
expr := fmt.Sprintf(`%s in ServiceTags`, tag)
if i < len(tags)-1 {
expr += " and "
}
args.Filter += expr
}
}

// Make the RPC request
var out structs.IndexedHealthChecks
defer setMeta(resp, &out.QueryMeta)
Expand Down
10 changes: 10 additions & 0 deletions api/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,17 @@ func (h *Health) Node(node string, q *QueryOptions) (HealthChecks, *QueryMeta, e

// Checks is used to return the checks associated with a service
func (h *Health) Checks(service string, q *QueryOptions) (HealthChecks, *QueryMeta, error) {
return h.ChecksTags(service, nil, q)
}

// ChecksTags is used to return the checks associated with a service filtered by tags
func (h *Health) ChecksTags(service string, tags []string, q *QueryOptions) (HealthChecks, *QueryMeta, error) {
r := h.c.newRequest("GET", "/v1/health/checks/"+service)
if len(tags) > 0 {
for _, tag := range tags {
r.params.Add("tag", tag)
}
}
r.setQueryOptions(q)
rtt, resp, err := h.c.doRequest(r)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
if state != "" {
checks, meta, err = health.StateTags(state, tags, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
checks, meta, err = health.ChecksTags(service, tags, &opts)
}
if err != nil {
return nil, nil, err
Expand Down
106 changes: 105 additions & 1 deletion api/watch/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,111 @@ func TestChecksWatch_Service(t *testing.T) {
}
}

func TestChecksWatch_Service_Tags(t *testing.T) {
func TestChecksWatch_Service_Tag(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

s.WaitForSerfCheck(t)

var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)

plan := mustParse(t, `{"type":"checks", "service":"foobar", "tag":["b", "a"]}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()

// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()

// we want to find this one
reg := &api.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}

// we don't want to find this one
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a"},
},
Check: &api.AgentCheck{
Node: "bar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}

// Wait for second wakeup.
<-notifyCh

plan.Stop()
wg.Wait()

require.Len(t, wakeups, 2)

{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "foobar", v[0].CheckID)
}
}

func TestChecksWatch_Tag(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
Expand Down

0 comments on commit 48429e5

Please sign in to comment.