Skip to content

Commit

Permalink
Merge pull request #12139 from hashicorp/f-gh-260
Browse files Browse the repository at this point in the history
service discovery: add state store functionality
  • Loading branch information
jrasell authored Mar 2, 2022
2 parents b76e04d + 1603323 commit f3f5a77
Show file tree
Hide file tree
Showing 9 changed files with 1,558 additions and 1 deletion.
17 changes: 17 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
CSIVolumeSnapshot SnapshotType = 18
ScalingEventsSnapshot SnapshotType = 19
EventSinkSnapshot SnapshotType = 20
ServiceRegistrationSnapshot SnapshotType = 21
// Namespace appliers were moved from enterprise and therefore start at 64
NamespaceSnapshot SnapshotType = 64
)
Expand Down Expand Up @@ -1663,6 +1664,22 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// COMPAT(1.0): Allow 1.0-beta clusterers to gracefully handle
case EventSinkSnapshot:
return nil

case ServiceRegistrationSnapshot:

// Create a new ServiceRegistration object, so we can decode the
// message into it.
serviceRegistration := new(structs.ServiceRegistration)

if err := dec.Decode(serviceRegistration); err != nil {
return err
}

// Perform the restoration.
if err := restore.ServiceRegistrationRestore(serviceRegistration); err != nil {
return err
}

default:
// Check if this is an enterprise only object being restored
restorer, ok := n.enterpriseRestorers[snapType]
Expand Down
31 changes: 31 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,3 +2255,34 @@ func Namespace() *structs.Namespace {
ns.SetHash()
return ns
}

// ServiceRegistrations generates an array containing two unique service
// registrations.
func ServiceRegistrations() []*structs.ServiceRegistration {
return []*structs.ServiceRegistration{
{
ID: "_nomad-task-2873cf75-42e5-7c45-ca1c-415f3e18be3d-group-cache-example-cache-db",
ServiceName: "example-cache",
Namespace: "default",
NodeID: "17a6d1c0-811e-2ca9-ded0-3d5d6a54904c",
Datacenter: "dc1",
JobID: "example",
AllocID: "2873cf75-42e5-7c45-ca1c-415f3e18be3d",
Tags: []string{"foo"},
Address: "192.168.10.1",
Port: 23000,
},
{
ID: "_nomad-task-ca60e901-675a-0ab2-2e57-2f3b05fdc540-group-api-countdash-api-http",
ServiceName: "countdash-api",
Namespace: "platform",
NodeID: "ba991c17-7ce5-9c20-78b7-311e63578583",
Datacenter: "dc2",
JobID: "countdash-api",
AllocID: "ca60e901-675a-0ab2-2e57-2f3b05fdc540",
Tags: []string{"bar"},
Address: "192.168.200.200",
Port: 29000,
},
}
}
91 changes: 90 additions & 1 deletion nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,18 @@ import (
)

const (
TableNamespaces = "namespaces"
tableIndex = "index"

TableNamespaces = "namespaces"
TableServiceRegistrations = "service_registrations"
)

const (
indexID = "id"
indexJob = "job"
indexNodeID = "node_id"
indexAllocID = "alloc_id"
indexServiceName = "service_name"
)

var (
Expand Down Expand Up @@ -58,6 +69,7 @@ func init() {
scalingPolicyTableSchema,
scalingEventTableSchema,
namespaceTableSchema,
serviceRegistrationsTableSchema,
}...)
}

Expand Down Expand Up @@ -1033,3 +1045,80 @@ func namespaceTableSchema() *memdb.TableSchema {
},
}
}

// serviceRegistrationsTableSchema returns the MemDB schema for Nomad native
// service registrations.
func serviceRegistrationsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: TableServiceRegistrations,
Indexes: map[string]*memdb.IndexSchema{
// The serviceID in combination with namespace forms a unique
// identifier for a service registration. This is used to look up
// and delete services in individual isolation.
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "ID",
},
},
},
},
indexServiceName: {
Name: indexServiceName,
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "ServiceName",
},
},
},
},
indexJob: {
Name: indexJob,
AllowMissing: false,
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},
&memdb.StringFieldIndex{
Field: "JobID",
},
},
},
},
// The nodeID index allows lookups and deletions to be performed
// for an entire node. This is primarily used when a node becomes
// lost.
indexNodeID: {
Name: indexNodeID,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "NodeID",
},
},
indexAllocID: {
Name: indexAllocID,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "AllocID",
},
},
},
}
}
9 changes: 9 additions & 0 deletions nomad/state/state_store_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,12 @@ func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error {
}
return nil
}

// ServiceRegistrationRestore is used to restore a single service registration
// into the service_registrations table.
func (r *StateRestore) ServiceRegistrationRestore(service *structs.ServiceRegistration) error {
if err := r.txn.Insert(TableServiceRegistrations, service); err != nil {
return fmt.Errorf("service registration insert failed: %v", err)
}
return nil
}
31 changes: 31 additions & 0 deletions nomad/state/state_store_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,34 @@ func TestStateStore_RestoreSchedulerConfig(t *testing.T) {

require.Equal(schedConfig, out)
}

func TestStateStore_ServiceRegistrationRestore(t *testing.T) {
t.Parallel()
testState := testStateStore(t)

// Set up our test registrations and index.
expectedIndex := uint64(13)
serviceRegs := mock.ServiceRegistrations()

restore, err := testState.Restore()
require.NoError(t, err)

// Iterate the service registrations, restore, and commit. Set the indexes
// on the objects, so we can check these.
for i := range serviceRegs {
serviceRegs[i].ModifyIndex = expectedIndex
serviceRegs[i].CreateIndex = expectedIndex
require.NoError(t, restore.ServiceRegistrationRestore(serviceRegs[i]))
}
require.NoError(t, restore.Commit())

// Check the state is now populated as we expect and that we can find the
// restored registrations.
ws := memdb.NewWatchSet()

for i := range serviceRegs {
out, err := testState.GetServiceRegistrationByID(ws, serviceRegs[i].Namespace, serviceRegs[i].ID)
require.NoError(t, err)
require.Equal(t, serviceRegs[i], out)
}
}
Loading

0 comments on commit f3f5a77

Please sign in to comment.