From 2dc6c6bce7146829231ed8e2215f1fe84bf9701f Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 6 May 2022 10:05:38 -0500 Subject: [PATCH] nomad: add support for querying consistent subset of services Nomad's service API is adding a ?choose parameter, making use of HRW hashing to deterministically select a subset of services given a hash key (e.g. allocID). Make use of this API in consul-template by expanding the nomadService function to accept 3 parameters (in addition to the existing 1). These parameters are , , . Example usage (in Nomad template): {{$allocID := env "NOMAD_ALLOC_ID" -}} {{range nomadService 3 $allocID "redis"}} {{.Address}} {{.Port}} | {{.Tags}} @ {{.Datacenter}} {{- end}} --- dependency/dependency.go | 16 ++++++ dependency/nomad_service.go | 23 ++++++++ dependency/nomad_service_test.go | 96 ++++++++++++++++++++++++++++++- dependency/nomad_services_test.go | 18 +++--- template/nomad_funcs.go | 50 +++++++++++----- 5 files changed, 178 insertions(+), 25 deletions(-) diff --git a/dependency/dependency.go b/dependency/dependency.go index 554e81da3..1202187ba 100644 --- a/dependency/dependency.go +++ b/dependency/dependency.go @@ -53,6 +53,7 @@ type QueryOptions struct { Datacenter string Region string Near string + Choose string RequireConsistent bool VaultGrace time.Duration WaitIndex uint64 @@ -92,6 +93,10 @@ func (q *QueryOptions) Merge(o *QueryOptions) *QueryOptions { r.Near = o.Near } + if o.Choose != "" { + r.Choose = o.Choose + } + if o.RequireConsistent != false { r.RequireConsistent = o.RequireConsistent } @@ -119,9 +124,16 @@ func (q *QueryOptions) ToConsulOpts() *consulapi.QueryOptions { } func (q *QueryOptions) ToNomadOpts() *nomadapi.QueryOptions { + var params map[string]string + if q.Choose != "" { + params = map[string]string{ + "choose": q.Choose, + } + } return &nomadapi.QueryOptions{ AllowStale: q.AllowStale, Region: q.Region, + Params: params, WaitIndex: q.WaitIndex, WaitTime: q.WaitTime, } @@ -146,6 +158,10 @@ func (q *QueryOptions) String() string { u.Add("near", q.Near) } + if q.Choose != "" { + u.Add("choose", q.Choose) + } + if q.RequireConsistent { u.Add("consistent", strconv.FormatBool(q.RequireConsistent)) } diff --git a/dependency/nomad_service.go b/dependency/nomad_service.go index b5532ff6e..e3e50f2cf 100644 --- a/dependency/nomad_service.go +++ b/dependency/nomad_service.go @@ -17,6 +17,8 @@ var ( // NomadServiceQueryRe is the regex that is used to understand a service // specific Nomad query. + // + // e.g. ".@" NomadServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + regionRe + `\z`) ) @@ -46,6 +48,7 @@ type NomadServiceQuery struct { region string name string tag string + choose string } // NewNomadServiceQuery parses a string into a NomadServiceQuery which is @@ -56,6 +59,7 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) { } m := regexpMatch(NomadServiceQueryRe, s) + return &NomadServiceQuery{ stopCh: make(chan struct{}, 1), region: m["region"], @@ -64,6 +68,21 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) { }, nil } +// NewNomadServiceChooseQuery parses s using NewNomadServiceQuery, and then also +// configures the resulting query with the choose parameter set according to the +// count and key arguments. +func NewNomadServiceChooseQuery(count int, key, s string) (*NomadServiceQuery, error) { + query, err := NewNomadServiceQuery(s) + if err != nil { + return nil, err + } + + choose := fmt.Sprintf("%d|%s", count, key) + query.choose = choose + + return query, nil +} + // Fetch queries the Nomad API defined by the given client and returns a slice // of NomadService objects. func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) { @@ -75,6 +94,7 @@ func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interf opts = opts.Merge(&QueryOptions{ Region: d.region, + Choose: d.choose, }) u := &url.URL{ @@ -138,6 +158,9 @@ func (d *NomadServiceQuery) String() string { if d.region != "" { name = name + "@" + d.region } + if d.choose != "" { + name = name + ":" + d.choose + } return fmt.Sprintf("nomad.service(%s)", name) } diff --git a/dependency/nomad_service_test.go b/dependency/nomad_service_test.go index a631e8084..3a882b450 100644 --- a/dependency/nomad_service_test.go +++ b/dependency/nomad_service_test.go @@ -9,7 +9,6 @@ import ( ) func TestNewNomadServiceQuery(t *testing.T) { - cases := []struct { name string i string @@ -97,8 +96,20 @@ func TestNewNomadServiceQuery(t *testing.T) { } } -func TestNomadServiceQuery_Fetch(t *testing.T) { +func TestNomadServiceChooseQuery(t *testing.T) { + query, err := NewNomadServiceChooseQuery(4, "abc123", "tag.name@us-east-1") + require.NoError(t, err) + + query.stopCh = nil + require.Equal(t, &NomadServiceQuery{ + region: "us-east-1", + name: "name", + tag: "tag", + choose: "4|abc123", + }, query) +} +func TestNomadServiceQuery_Fetch(t *testing.T) { cases := []struct { name string i string @@ -185,8 +196,59 @@ func TestNomadServiceQuery_Fetch(t *testing.T) { } } -func TestNomadServiceQuery_String(t *testing.T) { +func TestNomadServicesQuery_Fetch_3arg(t *testing.T) { + cases := []struct { + name string + service string + count int + key string + exp []*NomadService + }{ + { + name: "choose one", + service: "example-cache", + count: 1, + key: "abc123", + exp: []*NomadService{ + &NomadService{ + Name: "example-cache", + Address: "127.0.0.1", + Datacenter: "dc1", + Tags: ServiceTags([]string{"tag1", "tag2"}), + JobID: "example", + }, + }, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { + d, err := NewNomadServiceChooseQuery(tc.count, tc.key, tc.service) + if err != nil { + t.Fatal(err) + } + + actI, _, err := d.Fetch(testClients, nil) + if err != nil { + t.Fatal(err) + } + + act := actI.([]*NomadService) + + for _, s := range act { + // clear random fields + s.ID = "" + s.Node = "" + s.Port = 0 + s.AllocID = "" + } + + require.Equal(t, tc.exp, act) + }) + } +} +func TestNomadServiceQuery_String(t *testing.T) { cases := []struct { name string i string @@ -224,3 +286,31 @@ func TestNomadServiceQuery_String(t *testing.T) { }) } } + +func TestNomadServiceQuery_String_3arg(t *testing.T) { + cases := []struct { + name string + i string + count int + key string + exp string + }{ + { + "choose", + "redis", + 3, + "abc123", + "nomad.service(redis:3|abc123)", + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { + d, err := NewNomadServiceChooseQuery(3, "abc123", tc.i) + if err != nil { + t.Fatal(err) + } + require.Equal(t, tc.exp, d.String()) + }) + } +} diff --git a/dependency/nomad_services_test.go b/dependency/nomad_services_test.go index c0736d774..76bb154d6 100644 --- a/dependency/nomad_services_test.go +++ b/dependency/nomad_services_test.go @@ -53,17 +53,16 @@ func TestNewNomadServicesQueryQuery(t *testing.T) { } } -func TestNomadServicesQuery_Fetch(t *testing.T) { - +func TestNomadServicesQuery_Fetch_1arg(t *testing.T) { cases := []struct { - name string - i string - exp []*NomadServicesSnippet + name string + service string + exp []*NomadServicesSnippet }{ { - "all", - "", - []*NomadServicesSnippet{ + name: "all", + service: "", + exp: []*NomadServicesSnippet{ &NomadServicesSnippet{ Name: "example-cache", Tags: ServiceTags([]string{"tag1", "tag2"}), @@ -74,7 +73,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) { for i, tc := range cases { t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) { - d, err := NewNomadServicesQuery(tc.i) + d, err := NewNomadServicesQuery(tc.service) if err != nil { t.Fatal(err) } @@ -88,6 +87,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) { }) } } + func TestNomadServicesQuery_String(t *testing.T) { cases := []struct { diff --git a/template/nomad_funcs.go b/template/nomad_funcs.go index 0ee913131..1bb2c1dba 100644 --- a/template/nomad_funcs.go +++ b/template/nomad_funcs.go @@ -1,6 +1,7 @@ package template import ( + "errors" "strings" dep "github.com/hashicorp/consul-template/dependency" @@ -10,7 +11,7 @@ import ( // stubs from Nomad. func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadServicesSnippet, error) { return func(s ...string) ([]*dep.NomadServicesSnippet, error) { - result := []*dep.NomadServicesSnippet{} + var result []*dep.NomadServicesSnippet d, err := dep.NewNomadServicesQuery(strings.Join(s, "")) if err != nil { @@ -31,23 +32,46 @@ func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep // nomadServiceFunc returns or accumulates a list of service registrations from // Nomad matching an individual name. -func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadService, error) { - return func(s ...string) ([]*dep.NomadService, error) { - result := []*dep.NomadService{} +func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...interface{}) ([]*dep.NomadService, error) { + return func(params ...interface{}) ([]*dep.NomadService, error) { + var query *dep.NomadServiceQuery + var err error - d, err := dep.NewNomadServiceQuery(strings.Join(s, "")) - if err != nil { - return nil, err + switch len(params) { + case 1: + service, ok := params[0].(string) + if !ok { + return nil, errors.New("expected string for argument") + } + if query, err = dep.NewNomadServiceQuery(service); err != nil { + return nil, err + } + case 3: + count, ok := params[0].(int) + if !ok { + return nil, errors.New("expected integer for argument") + } + hash, ok := params[1].(string) + if !ok { + return nil, errors.New("expected string for argument") + } + service, ok := params[2].(string) + if !ok { + return nil, errors.New("expected string for argument") + } + if query, err = dep.NewNomadServiceChooseQuery(count, hash, service); err != nil { + return nil, err + } + default: + return nil, errors.New("expected arguments or ") } - used.Add(d) - - if value, ok := b.Recall(d); ok { + used.Add(query) + if value, ok := b.Recall(query); ok { return value.([]*dep.NomadService), nil } + missing.Add(query) - missing.Add(d) - - return result, nil + return nil, nil } }