Skip to content

Commit

Permalink
Merge pull request #12140 from hashicorp/f-gh-258
Browse files Browse the repository at this point in the history
events: add state objects and logic for service registrations.
  • Loading branch information
jrasell authored Mar 2, 2022
2 parents f3f5a77 + 12265ee commit fdcb730
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 30 deletions.
3 changes: 3 additions & 0 deletions helper/raftutil/msgtypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 59 additions & 22 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,31 @@ import (
)

var MsgTypeEvents = map[structs.MessageType]string{
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocationUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.NodeRegisterRequestType: structs.TypeNodeRegistration,
structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration,
structs.UpsertNodeEventsType: structs.TypeNodeEvent,
structs.EvalUpdateRequestType: structs.TypeEvalUpdated,
structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated,
structs.JobRegisterRequestType: structs.TypeJobRegistered,
structs.AllocUpdateRequestType: structs.TypeAllocationUpdated,
structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent,
structs.JobDeregisterRequestType: structs.TypeJobDeregistered,
structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered,
structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus,
structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain,
structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain,
structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate,
structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion,
structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth,
structs.ApplyPlanResultsRequestType: structs.TypePlanResult,
structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted,
structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted,
structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted,
structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted,
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down Expand Up @@ -88,6 +91,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Node: before,
},
}, true
case TableServiceRegistrations:
before, ok := change.Before.(*structs.ServiceRegistration)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Key: before.ID,
FilterKeys: []string{
before.JobID,
before.ServiceName,
},
Namespace: before.Namespace,
Payload: &structs.ServiceRegistrationStreamEvent{
Service: before,
},
}, true
}
return structs.Event{}, false
}
Expand Down Expand Up @@ -198,6 +218,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Deployment: after,
},
}, true
case TableServiceRegistrations:
after, ok := change.After.(*structs.ServiceRegistration)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicServiceRegistration,
Key: after.ID,
FilterKeys: []string{
after.JobID,
after.ServiceName,
},
Namespace: after.Namespace,
Payload: &structs.ServiceRegistrationStreamEvent{
Service: after,
},
}, true
}

return structs.Event{}, false
Expand Down
47 changes: 47 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,53 @@ func TestNodeDrainEventFromChanges(t *testing.T) {
require.Equal(t, strat, nodeEvent.Node.DrainStrategy)
}

func Test_eventsFromChanges_ServiceRegistration(t *testing.T) {
t.Parallel()
testState := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer testState.StopEventBroker()

// Generate test service registration.
service := mock.ServiceRegistrations()[0]

// Upsert a service registration.
writeTxn := testState.db.WriteTxn(10)
updated, err := testState.upsertServiceRegistrationTxn(10, writeTxn, service)
require.True(t, updated)
require.NoError(t, err)
writeTxn.Txn.Commit()

// Pull the events from the stream.
registerChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ServiceRegistrationUpsertRequestType}
receivedChange := eventsFromChanges(writeTxn, registerChange)

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type)
require.Equal(t, uint64(10), receivedChange.Events[0].Index)

eventPayload := receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
require.Equal(t, service, eventPayload.Service)

// Delete the previously upserted service registration.
deleteTxn := testState.db.WriteTxn(20)
require.NoError(t, testState.deleteServiceRegistrationByIDTxn(uint64(20), deleteTxn, service.Namespace, service.ID))
writeTxn.Txn.Commit()

// Pull the events from the stream.
deregisterChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ServiceRegistrationDeleteByIDRequestType}
receivedDeleteChange := eventsFromChanges(deleteTxn, deregisterChange)

// Check the event, and it's payload are what we are expecting.
require.Len(t, receivedDeleteChange.Events, 1)
require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic)
require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type)
require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index)

eventPayload = receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent)
require.Equal(t, service, eventPayload.Service)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
25 changes: 17 additions & 8 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ type EventStreamWrapper struct {
type Topic string

const (
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicAll Topic = "*"
TopicDeployment Topic = "Deployment"
TopicEvaluation Topic = "Evaluation"
TopicAllocation Topic = "Allocation"
TopicJob Topic = "Job"
TopicNode Topic = "Node"
TopicACLPolicy Topic = "ACLPolicy"
TopicACLToken Topic = "ACLToken"
TopicServiceRegistration Topic = "ServiceRegistration"
TopicAll Topic = "*"

TypeNodeRegistration = "NodeRegistration"
TypeNodeDeregistration = "NodeDeregistration"
Expand All @@ -45,6 +46,8 @@ const (
TypeACLTokenUpserted = "ACLTokenUpserted"
TypeACLPolicyDeleted = "ACLPolicyDeleted"
TypeACLPolicyUpserted = "ACLPolicyUpserted"
TypeServiceRegistration = "ServiceRegistration"
TypeServiceDeregistration = "ServiceDeregistration"
)

// Event represents a change in Nomads state.
Expand Down Expand Up @@ -123,6 +126,12 @@ type ACLTokenEvent struct {
secretID string
}

// ServiceRegistrationStreamEvent holds a newly updated or deleted service
// registration.
type ServiceRegistrationStreamEvent struct {
Service *ServiceRegistration
}

// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates
// a copy of the passed in ACLToken and empties out the copied tokens SecretID
func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent {
Expand Down
3 changes: 3 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ const (
OneTimeTokenUpsertRequestType MessageType = 44
OneTimeTokenDeleteRequestType MessageType = 45
OneTimeTokenExpireRequestType MessageType = 46
ServiceRegistrationUpsertRequestType MessageType = 47
ServiceRegistrationDeleteByIDRequestType MessageType = 48
ServiceRegistrationDeleteByNodeIDRequestType MessageType = 49

// Namespace types were moved from enterprise and therefore start at 64
NamespaceUpsertRequestType MessageType = 64
Expand Down

0 comments on commit fdcb730

Please sign in to comment.