Skip to content

Commit

Permalink
Merge pull request #12171 from hashicorp/f-gh-259
Browse files Browse the repository at this point in the history
service discovery: add RPC endpoints and FSM logic
  • Loading branch information
jrasell authored Mar 4, 2022
2 parents fdcb730 + a674fb3 commit 621a89b
Show file tree
Hide file tree
Showing 18 changed files with 2,562 additions and 18 deletions.
10 changes: 10 additions & 0 deletions helper/ipaddr/ipaddr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ipaddr

// IsAny checks if the given IP address is an IPv4 or IPv6 ANY address.
func IsAny(ip string) bool {
return isAnyV4(ip) || isAnyV6(ip)
}

func isAnyV4(ip string) bool { return ip == "0.0.0.0" }

func isAnyV6(ip string) bool { return ip == "::" || ip == "[::]" }
53 changes: 53 additions & 0 deletions helper/ipaddr/ipaddr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ipaddr

import (
"net"
"testing"

"github.com/stretchr/testify/require"
)

func Test_IsAny(t *testing.T) {
testCases := []struct {
inputIP string
expectedOutput bool
name string
}{
{
inputIP: "0.0.0.0",
expectedOutput: true,
name: "string ipv4 any IP",
},
{
inputIP: "::",
expectedOutput: true,
name: "string ipv6 any IP",
},
{
inputIP: net.IPv4zero.String(),
expectedOutput: true,
name: "net.IP ipv4 any",
},
{
inputIP: net.IPv6zero.String(),
expectedOutput: true,
name: "net.IP ipv6 any",
},
{
inputIP: "10.10.10.10",
expectedOutput: false,
name: "internal ipv4 address",
},
{
inputIP: "8.8.8.8",
expectedOutput: false,
name: "public ipv4 address",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedOutput, IsAny(tc.inputIP))
})
}
}
64 changes: 64 additions & 0 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,67 @@ func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransiti
reply.Index = index
return nil
}

// GetServiceRegistrations returns a list of service registrations which belong
// to the passed allocation ID.
func (a *Alloc) GetServiceRegistrations(
args *structs.AllocServiceRegistrationsRequest,
reply *structs.AllocServiceRegistrationsResponse) error {

if done, err := a.srv.forward(structs.AllocServiceRegistrationsRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_service_registrations"}, time.Now())

// If ACLs are enabled, ensure the caller has the read-job namespace
// capability.
aclObj, err := a.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
}

// Set up the blocking query.
return a.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {

// Read the allocation to ensure its namespace matches the request
// args.
alloc, err := stateStore.AllocByID(ws, args.AllocID)
if err != nil {
return err
}

// Guard against the alloc not-existing or that the namespace does
// not match the request arguments.
if alloc == nil || alloc.Namespace != args.RequestNamespace() {
return nil
}

// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationsByAllocID(ws, args.AllocID)
if err != nil {
return err
}

// Set up our output after we have checked the error.
services := make([]*structs.ServiceRegistration, 0)

// Iterate the iterator, appending all service registrations
// returned to the reply.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
services = append(services, raw.(*structs.ServiceRegistration))
}
reply.Services = services

// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return a.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
Loading

0 comments on commit 621a89b

Please sign in to comment.