Skip to content

Commit

Permalink
Merge pull request #12172 from hashicorp/f-gh-262
Browse files Browse the repository at this point in the history
service discovery: add HTTP endpoints and sdk wrapper
  • Loading branch information
jrasell authored Mar 4, 2022
2 parents 621a89b + 16cb776 commit b10547d
Show file tree
Hide file tree
Showing 13 changed files with 900 additions and 0 deletions.
8 changes: 8 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal st
return err
}

// Services is used to return a list of service registrations associated to the
// specified allocID.
func (a *Allocations) Services(allocID string, q *QueryOptions) ([]*ServiceRegistration, *QueryMeta, error) {
var resp []*ServiceRegistration
qm, err := a.client.query("/v1/allocation/"+allocID+"/services", &resp, q)
return resp, qm, err
}

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Expand Down
4 changes: 4 additions & 0 deletions api/allocations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,7 @@ func TestAllocations_ShouldMigrate(t *testing.T) {
require.False(t, DesiredTransition{}.ShouldMigrate())
require.False(t, DesiredTransition{Migrate: boolToPtr(false)}.ShouldMigrate())
}

func TestAllocations_Services(t *testing.T) {
// TODO(jrasell) add tests once registration process is in place.
}
8 changes: 8 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ func (j *Jobs) Stable(jobID string, version uint64, stable bool,
return &resp, wm, nil
}

// Services is used to return a list of service registrations associated to the
// specified jobID.
func (j *Jobs) Services(jobID string, q *QueryOptions) ([]*ServiceRegistration, *QueryMeta, error) {
var resp []*ServiceRegistration
qm, err := j.client.query("/v1/job/"+jobID+"/services", &resp, q)
return resp, qm, err
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand Down
4 changes: 4 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2356,3 +2356,7 @@ func TestJobs_ScaleStatus(t *testing.T) {
// Check that the result is what we expect
require.Equal(groupCount, result.TaskGroups[groupName].Desired)
}

func TestJobs_Services(t *testing.T) {
// TODO(jrasell) add tests once registration process is in place.
}
129 changes: 129 additions & 0 deletions api/service_registrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package api

import (
"fmt"
"net/url"
)

// ServiceRegistrations is used to query the service endpoints.
type ServiceRegistrations struct {
client *Client
}

// ServiceRegistration is an instance of a single allocation advertising itself
// as a named service with a specific address. Each registration is constructed
// from the job specification Service block. Whether the service is registered
// within Nomad, and therefore generates a ServiceRegistration is controlled by
// the Service.Provider parameter.
type ServiceRegistration struct {

// ID is the unique identifier for this registration. It currently follows
// the Consul service registration format to provide consistency between
// the two solutions.
ID string

// ServiceName is the human friendly identifier for this service
// registration.
ServiceName string

// Namespace represents the namespace within which this service is
// registered.
Namespace string

// NodeID is Node.ID on which this service registration is currently
// running.
NodeID string

// Datacenter is the DC identifier of the node as identified by
// Node.Datacenter.
Datacenter string

// JobID is Job.ID and represents the job which contained the service block
// which resulted in this service registration.
JobID string

// AllocID is Allocation.ID and represents the allocation within which this
// service is running.
AllocID string

// Tags are determined from either Service.Tags or Service.CanaryTags and
// help identify this service. Tags can also be used to perform lookups of
// services depending on their state and role.
Tags []string

// Address is the IP address of this service registration. This information
// comes from the client and is not guaranteed to be routable; this depends
// on cluster network topology.
Address string

// Port is the port number on which this service registration is bound. It
// is determined by a combination of factors on the client.
Port int

CreateIndex uint64
ModifyIndex uint64
}

// ServiceRegistrationListStub represents all service registrations held within a
// single namespace.
type ServiceRegistrationListStub struct {

// Namespace details the namespace in which these services have been
// registered.
Namespace string

// Services is a list of services found within the namespace.
Services []*ServiceRegistrationStub
}

// ServiceRegistrationStub is the stub object describing an individual
// namespaced service. The object is built in a manner which would allow us to
// add additional fields in the future, if we wanted.
type ServiceRegistrationStub struct {

// ServiceName is the human friendly name for this service as specified
// within Service.Name.
ServiceName string

// Tags is a list of unique tags found for this service. The list is
// de-duplicated automatically by Nomad.
Tags []string
}

// ServiceRegistrations returns a new handle on the services endpoints.
func (c *Client) ServiceRegistrations() *ServiceRegistrations {
return &ServiceRegistrations{client: c}
}

// List can be used to list all service registrations currently stored within
// the target namespace. It returns a stub response object.
func (s *ServiceRegistrations) List(q *QueryOptions) ([]*ServiceRegistrationListStub, *QueryMeta, error) {
var resp []*ServiceRegistrationListStub
qm, err := s.client.query("/v1/services", &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}

// Get is used to return a list of service registrations whose name matches the
// specified parameter.
func (s *ServiceRegistrations) Get(serviceName string, q *QueryOptions) ([]*ServiceRegistration, *QueryMeta, error) {
var resp []*ServiceRegistration
qm, err := s.client.query("/v1/service/"+url.PathEscape(serviceName), &resp, q)
if err != nil {
return nil, qm, err
}
return resp, qm, nil
}

// Delete can be used to delete an individual service registration as defined
// by its service name and service ID.
func (s *ServiceRegistrations) Delete(serviceName, serviceID string, q *WriteOptions) (*WriteMeta, error) {
path := fmt.Sprintf("/v1/service/%s/%s", url.PathEscape(serviceName), url.PathEscape(serviceID))
wm, err := s.client.delete(path, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
17 changes: 17 additions & 0 deletions api/service_registrations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package api

import (
"testing"
)

func TestServiceRegistrations_List(t *testing.T) {
// TODO(jrasell) add tests once registration process is in place.
}

func TestServiceRegistrations_Get(t *testing.T) {
// TODO(jrasell) add tests once registration process is in place.
}

func TestServiceRegistrations_Delete(t *testing.T) {
// TODO(jrasell) add tests once registration process is in place.
}
35 changes: 35 additions & 0 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re
switch tokens[1] {
case "stop":
return s.allocStop(allocID, resp, req)
case "services":
return s.allocServiceRegistrations(resp, req, allocID)
}

return nil, CodedError(404, resourceNotFoundErr)
Expand Down Expand Up @@ -167,6 +169,39 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht
return &out, nil
}

// allocServiceRegistrations returns a list of all service registrations
// assigned to the job identifier. It is callable via the
// /v1/allocation/:alloc_id/services HTTP API and uses the
// structs.AllocServiceRegistrationsRPCMethod RPC method.
func (s *HTTPServer) allocServiceRegistrations(
resp http.ResponseWriter, req *http.Request, allocID string) (interface{}, error) {

// The endpoint only supports GET requests.
if req.Method != http.MethodGet {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

// Set up the request args and parse this to ensure the query options are
// set.
args := structs.AllocServiceRegistrationsRequest{AllocID: allocID}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

// Perform the RPC request.
var reply structs.AllocServiceRegistrationsResponse
if err := s.agent.RPC(structs.AllocServiceRegistrationsRPCMethod, &args, &reply); err != nil {
return nil, err
}

setMeta(resp, &reply.QueryMeta)

if reply.Services == nil {
return nil, CodedError(http.StatusNotFound, allocNotFoundErr)
}
return reply.Services, nil
}

func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/")

Expand Down
113 changes: 113 additions & 0 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,119 @@ func TestHTTP_AllocStop(t *testing.T) {
})
}

func TestHTTP_allocServiceRegistrations(t *testing.T) {
t.Parallel()

testCases := []struct {
testFn func(srv *TestAgent)
name string
}{
{
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
testState := s.Agent.server.State()

// Generate an alloc and upsert this.
alloc := mock.Alloc()
require.NoError(t, testState.UpsertAllocs(
structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc}))

// Generate a service registration, assigned the allocID to the
// mocked allocation ID, and upsert this.
serviceReg := mock.ServiceRegistrations()[0]
serviceReg.AllocID = alloc.ID
require.NoError(t, testState.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 20, []*structs.ServiceRegistration{serviceReg}))

// Build the HTTP request.
path := fmt.Sprintf("/v1/allocation/%s/services", alloc.ID)
req, err := http.NewRequest(http.MethodGet, path, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.NoError(t, err)

// Check the response.
require.Equal(t, "20", respW.Header().Get("X-Nomad-Index"))
require.ElementsMatch(t, []*structs.ServiceRegistration{serviceReg},
obj.([]*structs.ServiceRegistration))
},
name: "alloc has registrations",
},
{
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
testState := s.Agent.server.State()

// Generate an alloc and upsert this.
alloc := mock.Alloc()
require.NoError(t, testState.UpsertAllocs(
structs.MsgTypeTestSetup, 10, []*structs.Allocation{alloc}))

// Build the HTTP request.
path := fmt.Sprintf("/v1/allocation/%s/services", alloc.ID)
req, err := http.NewRequest(http.MethodGet, path, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.NoError(t, err)

// Check the response.
require.Equal(t, "1", respW.Header().Get("X-Nomad-Index"))
require.ElementsMatch(t, []*structs.ServiceRegistration{},
obj.([]*structs.ServiceRegistration))
},
name: "alloc without registrations",
},
{
testFn: func(s *TestAgent) {

// Build the HTTP request.
path := fmt.Sprintf("/v1/allocation/%s/services", uuid.Generate())
req, err := http.NewRequest(http.MethodGet, path, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.Error(t, err)
require.Contains(t, err.Error(), "allocation not found")
require.Nil(t, obj)
},
name: "alloc not found",
},
{
testFn: func(s *TestAgent) {

// Build the HTTP request.
path := fmt.Sprintf("/v1/allocation/%s/services", uuid.Generate())
req, err := http.NewRequest(http.MethodHead, path, nil)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.AllocSpecificRequest(respW, req)
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "alloc not found",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
httpTest(t, nil, tc.testFn)
})
}
}

func TestHTTP_AllocStats(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
4 changes: 4 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ func (s HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest))
s.mux.HandleFunc("/v1/agent/host", s.wrap(s.AgentHostRequest))

// Register our service registration handlers.
s.mux.HandleFunc("/v1/services", s.wrap(s.ServiceRegistrationListRequest))
s.mux.HandleFunc("/v1/service/", s.wrap(s.ServiceRegistrationRequest))

// Monitor is *not* an untrusted endpoint despite the log contents
// potentially containing unsanitized user input. Monitor, like
// "/v1/client/fs/logs", explicitly sets a "text/plain" or
Expand Down
Loading

0 comments on commit b10547d

Please sign in to comment.