Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad: add support for querying consistent subset of services #1579

Merged
merged 2 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type QueryOptions struct {
Datacenter string
Region string
Near string
Choose string
RequireConsistent bool
VaultGrace time.Duration
WaitIndex uint64
Expand Down Expand Up @@ -92,6 +93,10 @@ func (q *QueryOptions) Merge(o *QueryOptions) *QueryOptions {
r.Near = o.Near
}

if o.Choose != "" {
r.Choose = o.Choose
}

if o.RequireConsistent != false {
r.RequireConsistent = o.RequireConsistent
}
Expand Down Expand Up @@ -119,9 +124,16 @@ func (q *QueryOptions) ToConsulOpts() *consulapi.QueryOptions {
}

func (q *QueryOptions) ToNomadOpts() *nomadapi.QueryOptions {
var params map[string]string
if q.Choose != "" {
params = map[string]string{
"choose": q.Choose,
}
}
return &nomadapi.QueryOptions{
AllowStale: q.AllowStale,
Region: q.Region,
Params: params,
WaitIndex: q.WaitIndex,
WaitTime: q.WaitTime,
}
Expand All @@ -146,6 +158,10 @@ func (q *QueryOptions) String() string {
u.Add("near", q.Near)
}

if q.Choose != "" {
u.Add("choose", q.Choose)
}

if q.RequireConsistent {
u.Add("consistent", strconv.FormatBool(q.RequireConsistent))
}
Expand Down
23 changes: 23 additions & 0 deletions dependency/nomad_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (

// NomadServiceQueryRe is the regex that is used to understand a service
// specific Nomad query.
//
// e.g. "<tag=value>.<name>@<region>"
NomadServiceQueryRe = regexp.MustCompile(`\A` + tagRe + serviceNameRe + regionRe + `\z`)
)

Expand Down Expand Up @@ -46,6 +48,7 @@ type NomadServiceQuery struct {
region string
name string
tag string
choose string
}

// NewNomadServiceQuery parses a string into a NomadServiceQuery which is
Expand All @@ -56,6 +59,7 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) {
}

m := regexpMatch(NomadServiceQueryRe, s)

return &NomadServiceQuery{
stopCh: make(chan struct{}, 1),
region: m["region"],
Expand All @@ -64,6 +68,21 @@ func NewNomadServiceQuery(s string) (*NomadServiceQuery, error) {
}, nil
}

// NewNomadServiceChooseQuery parses s using NewNomadServiceQuery, and then also
// configures the resulting query with the choose parameter set according to the
// count and key arguments.
func NewNomadServiceChooseQuery(count int, key, s string) (*NomadServiceQuery, error) {
query, err := NewNomadServiceQuery(s)
if err != nil {
return nil, err
}

choose := fmt.Sprintf("%d|%s", count, key)
query.choose = choose

return query, nil
}

// Fetch queries the Nomad API defined by the given client and returns a slice
// of NomadService objects.
func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
Expand All @@ -75,6 +94,7 @@ func (d *NomadServiceQuery) Fetch(client *ClientSet, opts *QueryOptions) (interf

opts = opts.Merge(&QueryOptions{
Region: d.region,
Choose: d.choose,
})

u := &url.URL{
Expand Down Expand Up @@ -138,6 +158,9 @@ func (d *NomadServiceQuery) String() string {
if d.region != "" {
name = name + "@" + d.region
}
if d.choose != "" {
name = name + ":" + d.choose
}
return fmt.Sprintf("nomad.service(%s)", name)
}

Expand Down
96 changes: 93 additions & 3 deletions dependency/nomad_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func TestNewNomadServiceQuery(t *testing.T) {

cases := []struct {
name string
i string
Expand Down Expand Up @@ -97,8 +96,20 @@ func TestNewNomadServiceQuery(t *testing.T) {
}
}

func TestNomadServiceQuery_Fetch(t *testing.T) {
func TestNomadServiceChooseQuery(t *testing.T) {
query, err := NewNomadServiceChooseQuery(4, "abc123", "tag.name@us-east-1")
require.NoError(t, err)

query.stopCh = nil
require.Equal(t, &NomadServiceQuery{
region: "us-east-1",
name: "name",
tag: "tag",
choose: "4|abc123",
}, query)
}

func TestNomadServiceQuery_Fetch(t *testing.T) {
cases := []struct {
name string
i string
Expand Down Expand Up @@ -185,8 +196,59 @@ func TestNomadServiceQuery_Fetch(t *testing.T) {
}
}

func TestNomadServiceQuery_String(t *testing.T) {
func TestNomadServicesQuery_Fetch_3arg(t *testing.T) {
cases := []struct {
name string
service string
count int
key string
exp []*NomadService
}{
{
name: "choose one",
service: "example-cache",
count: 1,
key: "abc123",
exp: []*NomadService{
&NomadService{
Name: "example-cache",
Address: "127.0.0.1",
Datacenter: "dc1",
Tags: ServiceTags([]string{"tag1", "tag2"}),
JobID: "example",
},
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServiceChooseQuery(tc.count, tc.key, tc.service)
if err != nil {
t.Fatal(err)
}

actI, _, err := d.Fetch(testClients, nil)
if err != nil {
t.Fatal(err)
}

act := actI.([]*NomadService)

for _, s := range act {
// clear random fields
s.ID = ""
s.Node = ""
s.Port = 0
s.AllocID = ""
}

require.Equal(t, tc.exp, act)
})
}
}

func TestNomadServiceQuery_String(t *testing.T) {
cases := []struct {
name string
i string
Expand Down Expand Up @@ -224,3 +286,31 @@ func TestNomadServiceQuery_String(t *testing.T) {
})
}
}

func TestNomadServiceQuery_String_3arg(t *testing.T) {
cases := []struct {
name string
i string
count int
key string
exp string
}{
{
"choose",
"redis",
3,
"abc123",
"nomad.service(redis:3|abc123)",
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServiceChooseQuery(3, "abc123", tc.i)
if err != nil {
t.Fatal(err)
}
require.Equal(t, tc.exp, d.String())
})
}
}
18 changes: 9 additions & 9 deletions dependency/nomad_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,16 @@ func TestNewNomadServicesQueryQuery(t *testing.T) {
}
}

func TestNomadServicesQuery_Fetch(t *testing.T) {

func TestNomadServicesQuery_Fetch_1arg(t *testing.T) {
cases := []struct {
name string
i string
exp []*NomadServicesSnippet
name string
service string
exp []*NomadServicesSnippet
}{
{
"all",
"",
[]*NomadServicesSnippet{
name: "all",
service: "",
exp: []*NomadServicesSnippet{
&NomadServicesSnippet{
Name: "example-cache",
Tags: ServiceTags([]string{"tag1", "tag2"}),
Expand All @@ -74,7 +73,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) {

for i, tc := range cases {
t.Run(fmt.Sprintf("%d_%s", i, tc.name), func(t *testing.T) {
d, err := NewNomadServicesQuery(tc.i)
d, err := NewNomadServicesQuery(tc.service)
if err != nil {
t.Fatal(err)
}
Expand All @@ -88,6 +87,7 @@ func TestNomadServicesQuery_Fetch(t *testing.T) {
})
}
}

func TestNomadServicesQuery_String(t *testing.T) {

cases := []struct {
Expand Down
50 changes: 37 additions & 13 deletions template/nomad_funcs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package template

import (
"errors"
"strings"

dep "github.com/hashicorp/consul-template/dependency"
Expand All @@ -10,7 +11,7 @@ import (
// stubs from Nomad.
func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadServicesSnippet, error) {
return func(s ...string) ([]*dep.NomadServicesSnippet, error) {
result := []*dep.NomadServicesSnippet{}
var result []*dep.NomadServicesSnippet

d, err := dep.NewNomadServicesQuery(strings.Join(s, ""))
if err != nil {
Expand All @@ -31,23 +32,46 @@ func nomadServicesFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep

// nomadServiceFunc returns or accumulates a list of service registrations from
// Nomad matching an individual name.
func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...string) ([]*dep.NomadService, error) {
return func(s ...string) ([]*dep.NomadService, error) {
result := []*dep.NomadService{}
func nomadServiceFunc(b *Brain, used, missing *dep.Set) func(...interface{}) ([]*dep.NomadService, error) {
return func(params ...interface{}) ([]*dep.NomadService, error) {
var query *dep.NomadServiceQuery
var err error

d, err := dep.NewNomadServiceQuery(strings.Join(s, ""))
if err != nil {
return nil, err
switch len(params) {
case 1:
service, ok := params[0].(string)
if !ok {
return nil, errors.New("expected string for <service> argument")
}
if query, err = dep.NewNomadServiceQuery(service); err != nil {
return nil, err
}
case 3:
count, ok := params[0].(int)
if !ok {
return nil, errors.New("expected integer for <count> argument")
}
hash, ok := params[1].(string)
if !ok {
return nil, errors.New("expected string for <key> argument")
}
service, ok := params[2].(string)
if !ok {
return nil, errors.New("expected string for <service> argument")
}
if query, err = dep.NewNomadServiceChooseQuery(count, hash, service); err != nil {
return nil, err
}
default:
return nil, errors.New("expected arguments <service> or <count> <key> <service>")
}

used.Add(d)

if value, ok := b.Recall(d); ok {
used.Add(query)
if value, ok := b.Recall(query); ok {
return value.([]*dep.NomadService), nil
}
missing.Add(query)

missing.Add(d)

return result, nil
return nil, nil
}
}