diff --git a/command/agent/service_registration_endpoint.go b/command/agent/service_registration_endpoint.go index 9107f800697..3d6151bea83 100644 --- a/command/agent/service_registration_endpoint.go +++ b/command/agent/service_registration_endpoint.go @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h func (s *HTTPServer) serviceGetRequest( resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) { - args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName} + args := structs.ServiceRegistrationByNameRequest{ + ServiceName: serviceName, + Choose: req.URL.Query().Get("choose"), + } if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 3684fc46074..f417a8c8518 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -2,6 +2,9 @@ package nomad import ( "net/http" + "sort" + "strconv" + "strings" "time" "github.com/armon/go-metrics" @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService( http.StatusBadRequest, "failed to read result page: %v", err) } + // Select which subset and the order of services to return if using ?choose + if args.Choose != "" { + chosen, chooseErr := s.choose(services, args.Choose) + if chooseErr != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to choose services: %v", chooseErr) + } + services = chosen + } + // Populate the reply. reply.Services = services reply.NextToken = nextToken @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService( }) } +// choose uses rendezvous hashing to make a stable selection of a subset of services +// to return. +// +// parameter must in the form "|", where number is the number of services +// to select, and key is incorporated in the hashing function with each service - +// creating a unique yet consistent priority distribution pertaining to the requester. +// In practice (i.e. via consul-template), the key is the AllocID generating a request +// for upstream services. +// +// https://en.wikipedia.org/wiki/Rendezvous_hashing +// w := priority (i.e. hash value) +// h := hash function +// O := object - (i.e. requesting service - using key (allocID) as a proxy) +// S := site (i.e. destination service) +func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) { + // extract the number of services + tokens := strings.SplitN(parameter, "|", 2) + if len(tokens) != 2 { + return nil, structs.ErrMalformedChooseParameter + } + n, err := strconv.Atoi(tokens[0]) + if err != nil { + return nil, structs.ErrMalformedChooseParameter + } + + // extract the hash key + key := tokens[1] + if key == "" { + return nil, structs.ErrMalformedChooseParameter + } + + // if there are fewer services than requested, go with the number of services + if l := len(services); l < n { + n = l + } + + type pair struct { + hash string + service *structs.ServiceRegistration + } + + // associate hash for each service + priorities := make([]*pair, len(services)) + for i, service := range services { + priorities[i] = &pair{ + hash: service.HashWith(key), + service: service, + } + } + + // sort by the hash; creating random distribution of priority + sort.SliceStable(priorities, func(i, j int) bool { + return priorities[i].hash < priorities[j].hash + }) + + // choose top n services + chosen := make([]*structs.ServiceRegistration, n) + for i := 0; i < n; i++ { + chosen[i] = priorities[i].service + } + + return chosen, nil +} + // handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can // either be called by Nomad nodes, or by external clients. func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error { @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, } } default: - // In the event we got any error other than notfound, consider this + // In the event we got any error other than ErrTokenNotFound, consider this // terminal. if err != structs.ErrTokenNotFound { return err diff --git a/nomad/service_registration_endpoint_test.go b/nomad/service_registration_endpoint_test.go index 164f4d8a16f..a3174821e17 100644 --- a/nomad/service_registration_endpoint_test.go +++ b/nomad/service_registration_endpoint_test.go @@ -1174,6 +1174,84 @@ func TestServiceRegistration_GetService(t *testing.T) { }, name: "filtering and pagination", }, + { + name: "choose 2", + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // insert 3 instances of service s1 + nodeID, jobID, allocID := "node_id", "job_id", "alloc_id" + services := []*structs.ServiceRegistration{ + { + ID: "id_1", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag1"}, + Address: "10.0.0.1", + Port: 9001, + CreateIndex: 101, + ModifyIndex: 201, + }, + { + ID: "id_2", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag2"}, + Address: "10.0.0.2", + Port: 9002, + CreateIndex: 102, + ModifyIndex: 202, + }, + { + ID: "id_3", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag3"}, + Address: "10.0.0.3", + Port: 9003, + CreateIndex: 103, + ModifyIndex: 103, + }, + } + require.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: "s1", + Choose: "2|abc123", // select 2 in consistent order + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + require.NoError(t, err) + + result := serviceRegResp.Services + + require.Len(t, result, 2) + require.Equal(t, "10.0.0.3", result[0].Address) + require.Equal(t, "10.0.0.2", result[1].Address) + }, + }, } for _, tc := range testCases { @@ -1184,3 +1262,79 @@ func TestServiceRegistration_GetService(t *testing.T) { }) } } + +func TestServiceRegistration_chooseErr(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + require.Empty(t, result) + require.ErrorIs(t, err, structs.ErrMalformedChooseParameter) + } + + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s2"}, + {ID: "abc003", ServiceName: "s3"}, + } + + try(regs, "") + try(regs, "1|") + try(regs, "|abc") + try(regs, "a|abc") +} + +func TestServiceRegistration_choose(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input, exp []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + require.NoError(t, err) + require.Equal(t, exp, result) + } + + // zero services + try(nil, []*structs.ServiceRegistration{}, "1|aaa") + try(nil, []*structs.ServiceRegistration{}, "2|aaa") + + // some unique services + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + } + + // same key, increasing n -> maintains order (n=1) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + }, "1|aaa") + + // same key, increasing n -> maintains order (n=2) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "2|aaa") + + // same key, increasing n -> maintains order (n=3) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|aaa") + + // unique key -> different orders + try(regs, []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "3|bbb") + + // another key -> another order + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|ccc") +} diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 69747a40e2b..b98d814761f 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -20,6 +20,7 @@ const ( errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" errMissingAllocID = "Missing allocation ID" errIncompatibleFiltering = "Filter expression cannot be used with other filter parameters" + errMalformedChooseParameter = "Parameter for choose must be in form '|'" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -55,6 +56,7 @@ var ( ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) ErrIncompatibleFiltering = errors.New(errIncompatibleFiltering) + ErrMalformedChooseParameter = errors.New(errMalformedChooseParameter) ErrUnknownNode = errors.New(ErrUnknownNodePrefix) diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 31554fe5d8f..9929e83fc7c 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -1,6 +1,8 @@ package structs import ( + "crypto/md5" + "encoding/binary" "fmt" "github.com/hashicorp/nomad/helper" @@ -171,6 +173,25 @@ func (s *ServiceRegistration) GetNamespace() string { return s.Namespace } +// HashWith generates a unique value representative of s based on the contents of s. +func (s *ServiceRegistration) HashWith(key string) string { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(s.Port)) + + sum := md5.New() + sum.Write(buf) + sum.Write([]byte(s.AllocID)) + sum.Write([]byte(s.ID)) + sum.Write([]byte(s.Namespace)) + sum.Write([]byte(s.Address)) + sum.Write([]byte(s.ServiceName)) + for _, tag := range s.Tags { + sum.Write([]byte(tag)) + } + sum.Write([]byte(key)) + return fmt.Sprintf("%x", sum.Sum(nil)) +} + // ServiceRegistrationUpsertRequest is the request object used to upsert one or // more service registrations. type ServiceRegistrationUpsertRequest struct { @@ -245,6 +266,7 @@ type ServiceRegistrationStub struct { // of services matching a specific name. type ServiceRegistrationByNameRequest struct { ServiceName string + Choose string // stable selection of n services QueryOptions } diff --git a/nomad/structs/service_registration_test.go b/nomad/structs/service_registration_test.go index da9c0773bc9..691ab4223fd 100644 --- a/nomad/structs/service_registration_test.go +++ b/nomad/structs/service_registration_test.go @@ -447,3 +447,27 @@ func TestServiceRegistrationByNameRequest_StaleReadSupport(t *testing.T) { req := &ServiceRegistrationByNameRequest{} require.True(t, req.IsRead()) } + +func TestServiceRegistration_HashWith(t *testing.T) { + a := ServiceRegistration{ + Address: "10.0.0.1", + Port: 9999, + } + + // same service, same key -> same hash + require.Equal(t, a.HashWith("aaa"), a.HashWith("aaa")) + + // same service, different key -> different hash + require.NotEqual(t, a.HashWith("aaa"), a.HashWith("bbb")) + + b := ServiceRegistration{ + Address: "10.0.0.2", + Port: 9998, + } + + // different service, same key -> different hash + require.NotEqual(t, a.HashWith("aaa"), b.HashWith("aaa")) + + // different service, different key -> different hash + require.NotEqual(t, a.HashWith("aaa"), b.HashWith("bbb")) +} diff --git a/website/content/api-docs/services.mdx b/website/content/api-docs/services.mdx index 465a2ce9aee..7905d3cb99d 100644 --- a/website/content/api-docs/services.mdx +++ b/website/content/api-docs/services.mdx @@ -94,6 +94,11 @@ The table below shows this endpoint's support for used to filter the results. Consider using pagination or a query parameter to reduce resource used to serve the request. +- `choose` `(string: "")` - Specifies the number of services to return and a hash + key. Must be in the form `|`. Nomad uses [rendezvous hashing][hash] to deliver + consistent results for a given key, and stable results when the number of services + changes. + ### Sample Request ```shell-session @@ -173,3 +178,5 @@ $ curl \ --request DELETE \ https://localhost:4646/v1/service/example-cache-redis/_nomad-task-ba731da0-6df9-9858-ef23-806e9758a899-redis-example-cache-redis-db ``` + +[hash]: https://en.wikipedia.org/wiki/Rendezvous_hashing \ No newline at end of file