Skip to content

Commit

Permalink
nomad: add support for querying consistent subset of services
Browse files Browse the repository at this point in the history
Nomad's service API is adding a ?choose parameter, making use of
HRW hashing to deterministically select a subset of services given
a hash key (e.g. allocID).

Make use of this API in consul-template by expanding the nomadService
function to accept 3 parameters (in addition to the existing 1). These
parameters are <count>, <key>, <service>.

Example usage (in Nomad template):

{{$allocID := env "NOMAD_ALLOC_ID" -}}
{{range nomadService 3 $allocID "redis"}}
  {{.Address}} {{.Port}} | {{.Tags}} @ {{.Datacenter}}
{{- end}}
  • Loading branch information
shoenig committed May 12, 2022
1 parent 7ecad12 commit 64c9afd
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 24 deletions.
16 changes: 16 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type QueryOptions struct {
Datacenter string
Region string
Near string
Choose string
RequireConsistent bool
VaultGrace time.Duration
WaitIndex uint64
Expand Down Expand Up @@ -92,6 +93,10 @@ func (q *QueryOptions) Merge(o *QueryOptions) *QueryOptions {
r.Near = o.Near
}

if o.Choose != "" {
r.Choose = o.Choose
}

if o.RequireConsistent != false {
r.RequireConsistent = o.RequireConsistent
}
Expand Down Expand Up @@ -119,9 +124,16 @@ func (q *QueryOptions) ToConsulOpts() *consulapi.QueryOptions {
}

func (q *QueryOptions) ToNomadOpts() *nomadapi.QueryOptions {
var params map[string]string
if q.Choose != "" {
params = map[string]string{
"choose": q.Choose,
}
}
return &nomadapi.QueryOptions{
AllowStale: q.AllowStale,
Region: q.Region,
Params: params,
WaitIndex: q.WaitIndex,
WaitTime: q.WaitTime,
}
Expand All @@ -146,6 +158,10 @@ func (q *QueryOptions) String() string {
u.Add("near", q.Near)
}

if q.Choose != "" {
u.Add("choose", q.Choose)
}

if q.RequireConsistent {
u.Add("consistent", strconv.FormatBool(q.RequireConsistent))
}
Expand Down
23 changes: 23 additions & 0 deletions dependency/nomad_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (

// NomadServiceQueryRe is the regex that is used to understand a service
// specific Nomad query.
//
// e.g. "<tag=value>.<name>@<region>"
NomadServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + regionRe + `\z`)
)

Expand Down Expand Up @@ -46,6 +48,7 @@ type NomadServiceQuery struct {
region string
name string
tag string
choose string
}

// NewNomadServiceQuery parses a string into a NomadServiceQuery which is
Expand All @@ -56,6 +59,7 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) {
}

m := regexpMatch(NomadServiceQueryRe, s)

return &NomadServiceQuery{
stopCh: make(chan struct{}, 1),
region: m["region"],
Expand All @@ -64,6 +68,21 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) {
}, nil
}

// NewNomadServiceChooseQuery parses s using NewNomadServiceQuery, and then also
// configures the resulting query with the choose parameter set according to the
// count and key arguments.
func NewNomadServiceChooseQuery(count int, key, s string) (*NomadServiceQuery, error) {
query, err := NewNomadServiceQuery(s)
if err != nil {
return nil, err
}

choose := fmt.Sprintf("%d|%s", count, key)
query.choose = choose

return query, nil
}

// Fetch queries the Nomad API defined by the given client and returns a slice
// of NomadService objects.
func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
Expand All @@ -75,6 +94,7 @@ func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interf

opts = opts.Merge(&QueryOptions{
Region: d.region,
Choose: d.choose,
})

u := &url.URL{
Expand Down Expand Up @@ -138,6 +158,9 @@ func (d *NomadServiceQuery) String() string {
if d.region != "" {
name = name + "@" + d.region
}
if d.choose != "" {
name = name + ":" + d.choose
}
return fmt.Sprintf("nomad.service(%s)", name)
}

Expand Down
44 changes: 41 additions & 3 deletions dependency/nomad_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestNewNomadServiceQuery(t *testing.T) {

cases := []struct {
name string
i string
Expand Down Expand Up @@ -97,8 +96,20 @@ func TestNewNomadServiceQuery(t *testing.T) {
}
}

func TestNomadServiceQuery_Fetch(t *testing.T) {
func TestNomadServiceChooseQuery(t *testing.T) {
query, err := NewNomadServiceChooseQuery(4, "abc123", "tag.name@us-east-1")
require.NoError(t, err)

query.stopCh = nil
require.Equal(t, &NomadServiceQuery{
region: "us-east-1",
name: "name",
tag: "tag",
choose: "4|abc123",
}, query)
}

func TestNomadServiceQuery_Fetch(t *testing.T) {
cases := []struct {
name string
i string
Expand Down Expand Up @@ -186,7 +197,6 @@ func TestNomadServiceQuery_Fetch(t *testing.T) {
}

func TestNomadServiceQuery_String(t *testing.T) {

cases := []struct {
name string
i string
Expand Down Expand Up @@ -224,3 +234,31 @@ func TestNomadServiceQuery_String(t *testing.T) {
})
}
}

func TestNomadServiceQuery_String_3arg(t *testing.T) {
cases := []struct {
name string
i string
count int
key string
exp string
}{
{
"choose",
"redis",
3,
"abc123",
"nomad.service(redis:3|abc123)",
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServiceChooseQuery(3, "abc123", tc.i)
if err != nil {
t.Fatal(err)
}
require.Equal(t, tc.exp, d.String())
})
}
}
55 changes: 47 additions & 8 deletions dependency/nomad_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,55 @@ func TestNewNomadServicesQueryQuery(t *testing.T) {
}
}

func TestNomadServicesQuery_Fetch(t *testing.T) {
func TestNomadServicesQuery_Fetch_1arg(t *testing.T) {
cases := []struct {
name string
service string
exp []*NomadServicesSnippet
}{
{
name: "all",
service: "",
exp: []*NomadServicesSnippet{
&NomadServicesSnippet{
Name: "example-cache",
Tags: ServiceTags([]string{"tag1", "tag2"}),
},
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServicesQuery(tc.service)
if err != nil {
t.Fatal(err)
}

act, _, err := d.Fetch(testClients, nil)
if err != nil {
t.Fatal(err)
}

require.Equal(t, tc.exp, act)
})
}
}

func TestNomadServicesQuery_Fetch_3arg(t *testing.T) {
cases := []struct {
name string
i string
exp []*NomadServicesSnippet
name string
service string
count int
key string
exp []*NomadServicesSnippet
}{
{
"all",
"",
[]*NomadServicesSnippet{
name: "choose one",
service: "example-cache",
count: 1,
key: "abc123",
exp: []*NomadServicesSnippet{
&NomadServicesSnippet{
Name: "example-cache",
Tags: ServiceTags([]string{"tag1", "tag2"}),
Expand All @@ -74,7 +112,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) {

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServicesQuery(tc.i)
d, err := NewNomadServiceChooseQuery(tc.count, tc.key, tc.service)
if err != nil {
t.Fatal(err)
}
Expand All @@ -88,6 +126,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) {
})
}
}

func TestNomadServicesQuery_String(t *testing.T) {

cases := []struct {
Expand Down
50 changes: 37 additions & 13 deletions template/nomad_funcs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package template

import (
"errors"
"strings"

dep "github.com/hashicorp/consul-template/dependency"
Expand All @@ -10,7 +11,7 @@ import (
// stubs from Nomad.
func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadServicesSnippet, error) {
return func(s ...string) ([]*dep.NomadServicesSnippet, error) {
result := []*dep.NomadServicesSnippet{}
var result []*dep.NomadServicesSnippet

d, err := dep.NewNomadServicesQuery(strings.Join(s, ""))
if err != nil {
Expand All @@ -31,23 +32,46 @@ func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep

// nomadServiceFunc returns or accumulates a list of service registrations from
// Nomad matching an individual name.
func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadService, error) {
return func(s ...string) ([]*dep.NomadService, error) {
result := []*dep.NomadService{}
func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...interface{}) ([]*dep.NomadService, error) {
return func(params ...interface{}) ([]*dep.NomadService, error) {
var query *dep.NomadServiceQuery
var err error

d, err := dep.NewNomadServiceQuery(strings.Join(s, ""))
if err != nil {
return nil, err
switch len(params) {
case 1:
service, ok := params[0].(string)
if !ok {
return nil, errors.New("expected string for <service> argument")
}
if query, err = dep.NewNomadServiceQuery(service); err != nil {
return nil, err
}
case 3:
count, ok := params[0].(int)
if !ok {
return nil, errors.New("expected integer for <count> argument")
}
hash, ok := params[1].(string)
if !ok {
return nil, errors.New("expected string for <key> argument")
}
service, ok := params[2].(string)
if !ok {
return nil, errors.New("expected string for <service> argument")
}
if query, err = dep.NewNomadServiceChooseQuery(count, hash, service); err != nil {
return nil, err
}
default:
return nil, errors.New("expected arguments <service> or <count> <key> <service>")
}

used.Add(d)

if value, ok := b.Recall(d); ok {
used.Add(query)
if value, ok := b.Recall(query); ok {
return value.([]*dep.NomadService), nil
}
missing.Add(query)

missing.Add(d)

return result, nil
return nil, nil
}
}

0 comments on commit 64c9afd

Please sign in to comment.