diff --git a/helper/raftutil/msgtypes.go b/helper/raftutil/msgtypes.go index 6af7164e524..9371964d3dd 100644 --- a/helper/raftutil/msgtypes.go +++ b/helper/raftutil/msgtypes.go @@ -51,6 +51,9 @@ var msgTypeNames = map[structs.MessageType]string{ structs.OneTimeTokenUpsertRequestType: "OneTimeTokenUpsertRequestType", structs.OneTimeTokenDeleteRequestType: "OneTimeTokenDeleteRequestType", structs.OneTimeTokenExpireRequestType: "OneTimeTokenExpireRequestType", + structs.ServiceRegistrationUpsertRequestType: "ServiceRegistrationUpsertRequestType", + structs.ServiceRegistrationDeleteByIDRequestType: "ServiceRegistrationDeleteByIDRequestType", + structs.ServiceRegistrationDeleteByNodeIDRequestType: "ServiceRegistrationDeleteByNodeIDRequestType", structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType", structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType", } diff --git a/nomad/state/events.go b/nomad/state/events.go index 82bcdbc9c95..863bf0c6c40 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -6,28 +6,31 @@ import ( ) var MsgTypeEvents = map[structs.MessageType]string{ - structs.NodeRegisterRequestType: structs.TypeNodeRegistration, - structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration, - structs.UpsertNodeEventsType: structs.TypeNodeEvent, - structs.EvalUpdateRequestType: structs.TypeEvalUpdated, - structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated, - structs.JobRegisterRequestType: structs.TypeJobRegistered, - structs.AllocUpdateRequestType: structs.TypeAllocationUpdated, - structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent, - structs.JobDeregisterRequestType: structs.TypeJobDeregistered, - structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered, - structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus, - structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain, - structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain, - structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain, - structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate, - structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion, - structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth, - structs.ApplyPlanResultsRequestType: structs.TypePlanResult, - structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted, - structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted, - structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted, - structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted, + structs.NodeRegisterRequestType: structs.TypeNodeRegistration, + structs.NodeDeregisterRequestType: structs.TypeNodeDeregistration, + structs.UpsertNodeEventsType: structs.TypeNodeEvent, + structs.EvalUpdateRequestType: structs.TypeEvalUpdated, + structs.AllocClientUpdateRequestType: structs.TypeAllocationUpdated, + structs.JobRegisterRequestType: structs.TypeJobRegistered, + structs.AllocUpdateRequestType: structs.TypeAllocationUpdated, + structs.NodeUpdateStatusRequestType: structs.TypeNodeEvent, + structs.JobDeregisterRequestType: structs.TypeJobDeregistered, + structs.JobBatchDeregisterRequestType: structs.TypeJobBatchDeregistered, + structs.AllocUpdateDesiredTransitionRequestType: structs.TypeAllocationUpdateDesiredStatus, + structs.NodeUpdateEligibilityRequestType: structs.TypeNodeDrain, + structs.NodeUpdateDrainRequestType: structs.TypeNodeDrain, + structs.BatchNodeUpdateDrainRequestType: structs.TypeNodeDrain, + structs.DeploymentStatusUpdateRequestType: structs.TypeDeploymentUpdate, + structs.DeploymentPromoteRequestType: structs.TypeDeploymentPromotion, + structs.DeploymentAllocHealthRequestType: structs.TypeDeploymentAllocHealth, + structs.ApplyPlanResultsRequestType: structs.TypePlanResult, + structs.ACLTokenDeleteRequestType: structs.TypeACLTokenDeleted, + structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted, + structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted, + structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted, + structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration, + structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration, + structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, } func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { @@ -88,6 +91,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Node: before, }, }, true + case TableServiceRegistrations: + before, ok := change.Before.(*structs.ServiceRegistration) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicServiceRegistration, + Key: before.ID, + FilterKeys: []string{ + before.JobID, + before.ServiceName, + }, + Namespace: before.Namespace, + Payload: &structs.ServiceRegistrationStreamEvent{ + Service: before, + }, + }, true } return structs.Event{}, false } @@ -198,6 +218,23 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Deployment: after, }, }, true + case TableServiceRegistrations: + after, ok := change.After.(*structs.ServiceRegistration) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicServiceRegistration, + Key: after.ID, + FilterKeys: []string{ + after.JobID, + after.ServiceName, + }, + Namespace: after.Namespace, + Payload: &structs.ServiceRegistrationStreamEvent{ + Service: after, + }, + }, true } return structs.Event{}, false diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 078ba43ced4..1c896bc546c 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -952,6 +952,53 @@ func TestNodeDrainEventFromChanges(t *testing.T) { require.Equal(t, strat, nodeEvent.Node.DrainStrategy) } +func Test_eventsFromChanges_ServiceRegistration(t *testing.T) { + t.Parallel() + testState := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer testState.StopEventBroker() + + // Generate test service registration. + service := mock.ServiceRegistrations()[0] + + // Upsert a service registration. + writeTxn := testState.db.WriteTxn(10) + updated, err := testState.upsertServiceRegistrationTxn(10, writeTxn, service) + require.True(t, updated) + require.NoError(t, err) + writeTxn.Txn.Commit() + + // Pull the events from the stream. + registerChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ServiceRegistrationUpsertRequestType} + receivedChange := eventsFromChanges(writeTxn, registerChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedChange.Events, 1) + require.Equal(t, structs.TopicServiceRegistration, receivedChange.Events[0].Topic) + require.Equal(t, structs.TypeServiceRegistration, receivedChange.Events[0].Type) + require.Equal(t, uint64(10), receivedChange.Events[0].Index) + + eventPayload := receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent) + require.Equal(t, service, eventPayload.Service) + + // Delete the previously upserted service registration. + deleteTxn := testState.db.WriteTxn(20) + require.NoError(t, testState.deleteServiceRegistrationByIDTxn(uint64(20), deleteTxn, service.Namespace, service.ID)) + writeTxn.Txn.Commit() + + // Pull the events from the stream. + deregisterChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ServiceRegistrationDeleteByIDRequestType} + receivedDeleteChange := eventsFromChanges(deleteTxn, deregisterChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedDeleteChange.Events, 1) + require.Equal(t, structs.TopicServiceRegistration, receivedDeleteChange.Events[0].Topic) + require.Equal(t, structs.TypeServiceDeregistration, receivedDeleteChange.Events[0].Type) + require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index) + + eventPayload = receivedChange.Events[0].Payload.(*structs.ServiceRegistrationStreamEvent) + require.Equal(t, service, eventPayload.Service) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/structs/event.go b/nomad/structs/event.go index c244488eb06..542f3b47cb1 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -16,14 +16,15 @@ type EventStreamWrapper struct { type Topic string const ( - TopicDeployment Topic = "Deployment" - TopicEvaluation Topic = "Evaluation" - TopicAllocation Topic = "Allocation" - TopicJob Topic = "Job" - TopicNode Topic = "Node" - TopicACLPolicy Topic = "ACLPolicy" - TopicACLToken Topic = "ACLToken" - TopicAll Topic = "*" + TopicDeployment Topic = "Deployment" + TopicEvaluation Topic = "Evaluation" + TopicAllocation Topic = "Allocation" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicACLPolicy Topic = "ACLPolicy" + TopicACLToken Topic = "ACLToken" + TopicServiceRegistration Topic = "ServiceRegistration" + TopicAll Topic = "*" TypeNodeRegistration = "NodeRegistration" TypeNodeDeregistration = "NodeDeregistration" @@ -45,6 +46,8 @@ const ( TypeACLTokenUpserted = "ACLTokenUpserted" TypeACLPolicyDeleted = "ACLPolicyDeleted" TypeACLPolicyUpserted = "ACLPolicyUpserted" + TypeServiceRegistration = "ServiceRegistration" + TypeServiceDeregistration = "ServiceDeregistration" ) // Event represents a change in Nomads state. @@ -123,6 +126,12 @@ type ACLTokenEvent struct { secretID string } +// ServiceRegistrationStreamEvent holds a newly updated or deleted service +// registration. +type ServiceRegistrationStreamEvent struct { + Service *ServiceRegistration +} + // NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates // a copy of the passed in ACLToken and empties out the copied tokens SecretID func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9a30381cf53..9188c820494 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -105,6 +105,9 @@ const ( OneTimeTokenUpsertRequestType MessageType = 44 OneTimeTokenDeleteRequestType MessageType = 45 OneTimeTokenExpireRequestType MessageType = 46 + ServiceRegistrationUpsertRequestType MessageType = 47 + ServiceRegistrationDeleteByIDRequestType MessageType = 48 + ServiceRegistrationDeleteByNodeIDRequestType MessageType = 49 // Namespace types were moved from enterprise and therefore start at 64 NamespaceUpsertRequestType MessageType = 64