diff --git a/.changelog/14923.txt b/.changelog/14923.txt new file mode 100644 index 00000000000..e63a41619c2 --- /dev/null +++ b/.changelog/14923.txt @@ -0,0 +1,11 @@ +```release-note:bug +event stream: Resolve ACL roles within ACL tokens +``` + +```release-note:bug +event stream: Check ACL token expiry when resolving tokens +``` + +```release-note:improvement +event stream: Added ACL role topic with create and delete types +``` diff --git a/nomad/acl_endpoint.go b/nomad/acl_endpoint.go index 21c487ec533..c95629faa1f 100644 --- a/nomad/acl_endpoint.go +++ b/nomad/acl_endpoint.go @@ -1366,8 +1366,8 @@ func (a *ACL) ListRoles( } // GetRolesByID is used to get a set of ACL Roles as defined by their ID. This -// endpoint is used by the replication process and uses a specific response in -// order to make that process easier. +// endpoint is used by the replication process and Nomad agent client token +// resolution. func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error { // This endpoint is only used by the replication process which is only @@ -1382,11 +1382,17 @@ func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACL } defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now()) - // Check that the caller has a management token and that ACLs are enabled - // properly. - if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil { + // For client typed tokens, allow them to query any roles associated with + // that token. This is used by Nomad agents in client mode which are + // resolving the roles to enforce. + token, err := a.requestACLToken(args.AuthToken) + if err != nil { return err - } else if acl == nil || !acl.IsManagement() { + } + if token == nil { + return structs.ErrTokenNotFound + } + if token.Type != structs.ACLManagementToken && !token.HasRoles(args.ACLRoleIDs) { return structs.ErrPermissionDenied } diff --git a/nomad/acl_endpoint_test.go b/nomad/acl_endpoint_test.go index 67d2c3f2922..42bddfc0d35 100644 --- a/nomad/acl_endpoint_test.go +++ b/nomad/acl_endpoint_test.go @@ -2228,6 +2228,7 @@ func TestACL_GetRolesByID(t *testing.T) { // Try reading a role without setting a correct auth token. aclRoleReq1 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{"nope"}, QueryOptions: structs.QueryOptions{ Region: DefaultRegion, }, @@ -2278,6 +2279,48 @@ func TestACL_GetRolesByID(t *testing.T) { require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID) require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID) + // Create a client token which allows us to test client tokens looking up + // their own role assignments. + clientToken1 := &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + Name: "acl-endpoint-test-role", + Type: structs.ACLClientToken, + Roles: []*structs.ACLTokenRoleLink{{ID: aclRoles[0].ID}}, + } + clientToken1.SetHash() + + require.NoError(t, testServer.fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{clientToken1})) + + // Use the client token in an attempt to look up an ACL role which is + // assigned to the token, and therefore should work. + aclRoleReq4 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[0].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: clientToken1.SecretID, + }, + } + var aclRoleResp4 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4) + require.NoError(t, err) + require.Len(t, aclRoleResp4.ACLRoles, 1) + require.Contains(t, aclRoleResp4.ACLRoles, aclRoles[0].ID) + + // Use the client token in an attempt to look up an ACL role which is NOT + // assigned to the token which should fail. + aclRoleReq5 := &structs.ACLRolesByIDRequest{ + ACLRoleIDs: []string{aclRoles[1].ID}, + QueryOptions: structs.QueryOptions{ + Region: DefaultRegion, + AuthToken: clientToken1.SecretID, + }, + } + var aclRoleResp5 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5) + require.ErrorContains(t, err, "Permission denied") + // Now test a blocking query, where we wait for an update to the set which // is triggered by a deletion. type res struct { @@ -2287,7 +2330,7 @@ func TestACL_GetRolesByID(t *testing.T) { resultCh := make(chan *res) go func(resultCh chan *res) { - aclRoleReq5 := &structs.ACLRolesByIDRequest{ + aclRoleReq6 := &structs.ACLRolesByIDRequest{ ACLRoleIDs: []string{aclRoles[0].ID, aclRoles[1].ID}, QueryOptions: structs.QueryOptions{ Region: DefaultRegion, @@ -2296,9 +2339,9 @@ func TestACL_GetRolesByID(t *testing.T) { MaxQueryTime: 10 * time.Second, }, } - var aclRoleResp5 structs.ACLRolesByIDResponse - err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq5, &aclRoleResp5) - resultCh <- &res{err: err, reply: &aclRoleResp5} + var aclRoleResp6 structs.ACLRolesByIDResponse + err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq6, &aclRoleResp6) + resultCh <- &res{err: err, reply: &aclRoleResp6} }(resultCh) // Delete an ACL role from state which should return the blocking query. diff --git a/nomad/state/events.go b/nomad/state/events.go index fc448babde4..675b02a5b20 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -28,6 +28,8 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.ACLTokenUpsertRequestType: structs.TypeACLTokenUpserted, structs.ACLPolicyDeleteRequestType: structs.TypeACLPolicyDeleted, structs.ACLPolicyUpsertRequestType: structs.TypeACLPolicyUpserted, + structs.ACLRolesDeleteByIDRequestType: structs.TypeACLRoleDeleted, + structs.ACLRolesUpsertRequestType: structs.TypeACLRoleUpserted, structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration, structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration, structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, @@ -77,6 +79,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLPolicy: before, }, }, true + case TableACLRoles: + before, ok := change.Before.(*structs.ACLRole) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLRole, + Key: before.ID, + FilterKeys: []string{before.Name}, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: before, + }, + }, true case "nodes": before, ok := change.Before.(*structs.Node) if !ok { @@ -136,6 +151,19 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { ACLPolicy: after, }, }, true + case TableACLRoles: + after, ok := change.After.(*structs.ACLRole) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicACLRole, + Key: after.ID, + FilterKeys: []string{after.Name}, + Payload: &structs.ACLRoleStreamEvent{ + ACLRole: after, + }, + }, true case "evals": after, ok := change.After.(*structs.Evaluation) if !ok { diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 44a9c326ef1..a1f7902a297 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -1002,6 +1002,59 @@ func Test_eventsFromChanges_ServiceRegistration(t *testing.T) { require.Equal(t, service, eventPayload.Service) } +func Test_eventsFromChanges_ACLRole(t *testing.T) { + ci.Parallel(t) + testState := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer testState.StopEventBroker() + + // Generate a test ACL role. + aclRole := mock.ACLRole() + + // Upsert the role into state, skipping the checks perform to ensure the + // linked policies exist. + writeTxn := testState.db.WriteTxn(10) + updated, err := testState.upsertACLRoleTxn(10, writeTxn, aclRole, true) + require.True(t, updated) + require.NoError(t, err) + writeTxn.Txn.Commit() + + // Pull the events from the stream. + upsertChange := Changes{Changes: writeTxn.Changes(), Index: 10, MsgType: structs.ACLRolesUpsertRequestType} + receivedChange := eventsFromChanges(writeTxn, upsertChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedChange.Events, 1) + require.Equal(t, structs.TopicACLRole, receivedChange.Events[0].Topic) + require.Equal(t, aclRole.ID, receivedChange.Events[0].Key) + require.Equal(t, aclRole.Name, receivedChange.Events[0].FilterKeys[0]) + require.Equal(t, structs.TypeACLRoleUpserted, receivedChange.Events[0].Type) + require.Equal(t, uint64(10), receivedChange.Events[0].Index) + + eventPayload := receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent) + require.Equal(t, aclRole, eventPayload.ACLRole) + + // Delete the previously upserted ACL role. + deleteTxn := testState.db.WriteTxn(20) + require.NoError(t, testState.deleteACLRoleByIDTxn(deleteTxn, aclRole.ID)) + require.NoError(t, deleteTxn.Insert(tableIndex, &IndexEntry{TableACLRoles, 20})) + deleteTxn.Txn.Commit() + + // Pull the events from the stream. + deleteChange := Changes{Changes: deleteTxn.Changes(), Index: 20, MsgType: structs.ACLRolesDeleteByIDRequestType} + receivedDeleteChange := eventsFromChanges(deleteTxn, deleteChange) + + // Check the event, and it's payload are what we are expecting. + require.Len(t, receivedDeleteChange.Events, 1) + require.Equal(t, structs.TopicACLRole, receivedDeleteChange.Events[0].Topic) + require.Equal(t, aclRole.ID, receivedDeleteChange.Events[0].Key) + require.Equal(t, aclRole.Name, receivedDeleteChange.Events[0].FilterKeys[0]) + require.Equal(t, structs.TypeACLRoleDeleted, receivedDeleteChange.Events[0].Type) + require.Equal(t, uint64(20), receivedDeleteChange.Events[0].Index) + + eventPayload = receivedChange.Events[0].Payload.(*structs.ACLRoleStreamEvent) + require.Equal(t, aclRole, eventPayload.ACLRole) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/state/state_store_acl.go b/nomad/state/state_store_acl.go index 55891e25485..790bc292b22 100644 --- a/nomad/state/state_store_acl.go +++ b/nomad/state/state_store_acl.go @@ -183,18 +183,8 @@ func (s *StateStore) DeleteACLRolesByID( defer txn.Abort() for _, roleID := range roleIDs { - - existing, err := txn.First(TableACLRoles, indexID, roleID) - if err != nil { - return fmt.Errorf("ACL role lookup failed: %v", err) - } - if existing == nil { - return errors.New("ACL role not found") - } - - // Delete the existing entry from the table. - if err := txn.Delete(TableACLRoles, existing); err != nil { - return fmt.Errorf("ACL role deletion failed: %v", err) + if err := s.deleteACLRoleByIDTxn(txn, roleID); err != nil { + return err } } @@ -206,6 +196,26 @@ func (s *StateStore) DeleteACLRolesByID( return txn.Commit() } +// deleteACLRoleByIDTxn deletes a single ACL role from the state store using the +// provided write transaction. It is the responsibility of the caller to update +// the index table. +func (s *StateStore) deleteACLRoleByIDTxn(txn *txn, roleID string) error { + + existing, err := txn.First(TableACLRoles, indexID, roleID) + if err != nil { + return fmt.Errorf("ACL role lookup failed: %v", err) + } + if existing == nil { + return errors.New("ACL role not found") + } + + // Delete the existing entry from the table. + if err := txn.Delete(TableACLRoles, existing); err != nil { + return fmt.Errorf("ACL role deletion failed: %v", err) + } + return nil +} + // GetACLRoles returns an iterator that contains all ACL roles stored within // state. func (s *StateStore) GetACLRoles(ws memdb.WatchSet) (memdb.ResultIterator, error) { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index da5f8f0abae..179ffbce629 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" @@ -213,22 +214,22 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return !aclAllowsSubscription(aclObj, sub.req) }) - case *structs.ACLPolicyEvent: - // Re-evaluate each subscriptions permissions since a policy - // change may or may not affect the subscription - e.checkSubscriptionsAgainstPolicyChange() + case *structs.ACLPolicyEvent, *structs.ACLRoleStreamEvent: + // Re-evaluate each subscription permission since a policy or + // role change may alter the permissions of the token being + // used for the subscription. + e.checkSubscriptionsAgainstACLChange() } } } } -// checkSubscriptionsAgainstPolicyChange iterates over the brokers -// subscriptions and evaluates whether the token used for the subscription is -// still valid. If it is not valid it closes the subscriptions belonging to the -// token. -// -// A lock must be held to iterate over the map of subscriptions. -func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { +// checkSubscriptionsAgainstACLChange iterates over the brokers subscriptions +// and evaluates whether the token used for the subscription is still valid. A +// token may become invalid is the assigned policies or roles have been updated +// which removed the required permission. If the token is no long valid, the +// subscription is closed. +func (e *EventBroker) checkSubscriptionsAgainstACLChange() { e.mu.Lock() defer e.mu.Unlock() @@ -257,14 +258,19 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { } } -func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { +func aclObjFromSnapshotForTokenSecretID( + aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) { + aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID) if err != nil { return nil, err } if aclToken == nil { - return nil, errors.New("no token for secret ID") + return nil, structs.ErrTokenNotFound + } + if aclToken.IsExpired(time.Now().UTC()) { + return nil, structs.ErrTokenExpired } // Check if this is a management token @@ -272,7 +278,8 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache * return acl.ManagementACL, nil } - aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)) + aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles)) + for _, policyName := range aclToken.Policies { policy, err := aclSnapshot.ACLPolicyByName(nil, policyName) if err != nil || policy == nil { @@ -281,12 +288,34 @@ func aclObjFromSnapshotForTokenSecretID(aclSnapshot ACLTokenProvider, aclCache * aclPolicies = append(aclPolicies, policy) } + // Iterate all the token role links, so we can unpack these and identify + // the ACL policies. + for _, roleLink := range aclToken.Roles { + + role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID) + if err != nil { + return nil, err + } + if role == nil { + continue + } + + for _, policyLink := range role.Policies { + policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name) + if err != nil || policy == nil { + return nil, errors.New("error finding acl policy") + } + aclPolicies = append(aclPolicies, policy) + } + } + return structs.CompileACLObject(aclCache, aclPolicies) } type ACLTokenProvider interface { ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) + GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error) } type ACLDelegate interface { diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index ea7457aa137..5ccf24cea0e 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -9,6 +9,8 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -174,17 +176,23 @@ type fakeACLTokenProvider struct { policyErr error token *structs.ACLToken tokenErr error + role *structs.ACLRole + roleErr error } -func (p *fakeACLTokenProvider) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) { +func (p *fakeACLTokenProvider) ACLTokenBySecretID(_ memdb.WatchSet, _ string) (*structs.ACLToken, error) { return p.token, p.tokenErr } -func (p *fakeACLTokenProvider) ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error) { +func (p *fakeACLTokenProvider) ACLPolicyByName(_ memdb.WatchSet, _ string) (*structs.ACLPolicy, error) { return p.policy, p.policyErr } -func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { +func (p *fakeACLTokenProvider) GetACLRoleByID(_ memdb.WatchSet, _ string) (*structs.ACLRole, error) { + return p.role, p.roleErr +} + +func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) { ci.Parallel(t) ctx, cancel := context.WithCancel(context.Background()) @@ -578,6 +586,403 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { } } +func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) { + ci.Parallel(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Generate a UUID to use in all tests for the token secret ID and the role + // ID. + tokenSecretID := uuid.Generate() + roleID := uuid.Generate() + + cases := []struct { + name string + aclPolicy *structs.ACLPolicy + roleBeforePolicyLinks []*structs.ACLRolePolicyLink + roleAfterPolicyLinks []*structs.ACLRolePolicyLink + topics map[structs.Topic][]string + event structs.Event + policyEvent structs.Event + shouldUnsubscribe bool + initialSubErr bool + }{ + { + name: "deployments access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicDeployment, + Type: structs.TypeDeploymentUpdate, + Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "evaluations access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicEvaluation, + Type: structs.TypeEvalUpdated, + Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "allocations access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicAllocation, + Type: structs.TypeAllocationUpdated, + Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "nodes access policy link removed", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NodePolicy(acl.PolicyRead), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{}, + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "deployment access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicDeployment, + Type: structs.TypeDeploymentUpdate, + Payload: structs.DeploymentEvent{Deployment: &structs.Deployment{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "evaluations access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicEvaluation, + Type: structs.TypeEvalUpdated, + Payload: structs.EvaluationEvent{Evaluation: &structs.Evaluation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "allocations access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{ + acl.NamespaceCapabilityReadJob}, + ), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicAllocation, + Type: structs.TypeAllocationUpdated, + Payload: structs.AllocationEvent{Allocation: &structs.Allocation{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + { + name: "nodes access no change", + aclPolicy: &structs.ACLPolicy{ + Name: "test-event-broker-acl-policy", + Rules: mock.NodePolicy(acl.PolicyRead), + }, + roleBeforePolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + roleAfterPolicyLinks: []*structs.ACLRolePolicyLink{{Name: "test-event-broker-acl-policy"}}, + shouldUnsubscribe: false, + event: structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tokenSecretID}), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + // Build our fake token provider containing the relevant state + // objects and add this to our new delegate. Keeping the token + // provider setup separate means we can easily update its state. + tokenProvider := &fakeACLTokenProvider{ + policy: tc.aclPolicy, + token: &structs.ACLToken{ + SecretID: tokenSecretID, + Roles: []*structs.ACLTokenRoleLink{{ID: roleID}}, + }, + role: &structs.ACLRole{ + ID: uuid.Short(), + Policies: []*structs.ACLRolePolicyLink{ + {Name: tc.aclPolicy.Name}, + }, + }, + } + aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + + publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) + require.NoError(t, err) + + ns := structs.DefaultNamespace + if tc.event.Namespace != "" { + ns = tc.event.Namespace + } + + sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + Topics: map[structs.Topic][]string{tc.event.Topic: {"*"}}, + Namespace: ns, + Token: tokenSecretID, + }) + + if tc.initialSubErr { + require.Error(t, err) + require.Nil(t, sub) + return + } + + require.NoError(t, err) + publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{tc.event}}) + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + require.NoError(t, err) + + // Overwrite the ACL role policy links with the updated version + // which is expected to cause a change in the subscription. + tokenProvider.role.Policies = tc.roleAfterPolicyLinks + + // Publish ACL event triggering subscription re-evaluation + publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{tc.policyEvent}}) + publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{tc.event}}) + + // If we are expecting to unsubscribe consume the subscription + // until the expected error occurs. + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + if tc.shouldUnsubscribe { + for { + _, err = sub.Next(ctx) + if err != nil { + if err == context.DeadlineExceeded { + require.Fail(t, err.Error()) + } + if err == ErrSubscriptionClosed { + break + } + } + } + } else { + _, err = sub.Next(ctx) + require.NoError(t, err) + } + + publisher.Publish(&structs.Events{Index: 103, Events: []structs.Event{tc.event}}) + + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + if tc.shouldUnsubscribe { + require.Equal(t, ErrSubscriptionClosed, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) { + ci.Parallel(t) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cases := []struct { + name string + inputToken *structs.ACLToken + shouldExpire bool + }{ + { + name: "token does not expire", + inputToken: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), + Type: structs.ACLManagementToken, + }, + shouldExpire: false, + }, + { + name: "token does expire", + inputToken: &structs.ACLToken{ + AccessorID: uuid.Generate(), + SecretID: uuid.Generate(), + ExpirationTime: pointer.Of(time.Now().Add(100000 * time.Hour).UTC()), + Type: structs.ACLManagementToken, + }, + shouldExpire: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + // Build our fake token provider containing the relevant state + // objects and add this to our new delegate. Keeping the token + // provider setup separate means we can easily update its state. + tokenProvider := &fakeACLTokenProvider{token: tc.inputToken} + aclDelegate := &fakeACLDelegate{tokenProvider: tokenProvider} + + publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) + require.NoError(t, err) + + fakeNodeEvent := structs.Event{ + Topic: structs.TopicNode, + Type: structs.TypeNodeRegistration, + Payload: structs.NodeStreamEvent{Node: &structs.Node{}}, + } + + fakeTokenEvent := structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}), + } + + sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ + Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}}, + Token: tc.inputToken.SecretID, + }) + require.NoError(t, err) + require.NotNil(t, sub) + + // Publish an event and check that there is a new item in the + // subscription queue. + publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{fakeNodeEvent}}) + + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + _, err = sub.Next(ctx) + require.NoError(t, err) + + // If the test states the token should expire, set the expiration + // time to a previous time. + if tc.shouldExpire { + tokenProvider.token.ExpirationTime = pointer.Of( + time.Date(1987, time.April, 13, 8, 3, 0, 0, time.UTC), + ) + } + + // Publish some events to trigger re-evaluation of the subscription. + publisher.Publish(&structs.Events{Index: 101, Events: []structs.Event{fakeTokenEvent}}) + publisher.Publish(&structs.Events{Index: 102, Events: []structs.Event{fakeNodeEvent}}) + + // If we are expecting to unsubscribe consume the subscription + // until the expected error occurs. + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(100*time.Millisecond)) + defer cancel() + + if tc.shouldExpire { + for { + if _, err = sub.Next(ctx); err != nil { + if err == context.DeadlineExceeded { + require.Fail(t, err.Error()) + } + if err == ErrSubscriptionClosed { + break + } + } + } + } else { + _, err = sub.Next(ctx) + require.NoError(t, err) + } + }) + } +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/structs/acl.go b/nomad/structs/acl.go index 3c52e7014a7..f33bd708e83 100644 --- a/nomad/structs/acl.go +++ b/nomad/structs/acl.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "golang.org/x/crypto/blake2b" @@ -232,6 +233,24 @@ func (a *ACLToken) IsExpired(t time.Time) bool { return a.ExpirationTime.Before(t) || t.IsZero() } +// HasRoles checks if a given set of role IDs are assigned to the ACL token. It +// does not account for management tokens, therefore it is the responsibility +// of the caller to perform this check, if required. +func (a *ACLToken) HasRoles(roleIDs []string) bool { + + // Generate a set of role IDs that the token is assigned. + roleSet := set.FromFunc(a.Roles, func(roleLink *ACLTokenRoleLink) string { return roleLink.ID }) + + // Iterate the role IDs within the request and check whether these are + // present within the token assignment. + for _, roleID := range roleIDs { + if !roleSet.Contains(roleID) { + return false + } + } + return true +} + // ACLRole is an abstraction for the ACL system which allows the grouping of // ACL policies into a single object. ACL tokens can be created and linked to // a role; the token then inherits all the permissions granted by the policies. diff --git a/nomad/structs/acl_test.go b/nomad/structs/acl_test.go index d39ff2ae797..05a497efd93 100644 --- a/nomad/structs/acl_test.go +++ b/nomad/structs/acl_test.go @@ -297,6 +297,75 @@ func TestACLToken_IsExpired(t *testing.T) { } } +func TestACLToken_HasRoles(t *testing.T) { + testCases := []struct { + name string + inputToken *ACLToken + inputRoleIDs []string + expectedOutput bool + }{ + { + name: "client token request all subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"foo", "bar", "baz"}, + expectedOutput: true, + }, + { + name: "client token request partial subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"foo", "baz"}, + expectedOutput: true, + }, + { + name: "client token request one subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"baz"}, + expectedOutput: true, + }, + { + name: "client token request no subset", + inputToken: &ACLToken{ + Type: ACLClientToken, + Roles: []*ACLTokenRoleLink{ + {ID: "foo"}, + {ID: "bar"}, + {ID: "baz"}, + }, + }, + inputRoleIDs: []string{"new"}, + expectedOutput: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputToken.HasRoles(tc.inputRoleIDs) + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +} + func TestACLRole_SetHash(t *testing.T) { testCases := []struct { name string diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 5b55a4612bd..34c6ad981e5 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -23,6 +23,7 @@ const ( TopicNode Topic = "Node" TopicACLPolicy Topic = "ACLPolicy" TopicACLToken Topic = "ACLToken" + TopicACLRole Topic = "ACLRole" TopicService Topic = "Service" TopicAll Topic = "*" @@ -46,6 +47,8 @@ const ( TypeACLTokenUpserted = "ACLTokenUpserted" TypeACLPolicyDeleted = "ACLPolicyDeleted" TypeACLPolicyUpserted = "ACLPolicyUpserted" + TypeACLRoleDeleted = "ACLRoleDeleted" + TypeACLRoleUpserted = "ACLRoleUpserted" TypeServiceRegistration = "ServiceRegistration" TypeServiceDeregistration = "ServiceDeregistration" ) @@ -151,3 +154,9 @@ func (a *ACLTokenEvent) SecretID() string { type ACLPolicyEvent struct { ACLPolicy *ACLPolicy } + +// ACLRoleStreamEvent holds a newly updated or delete ACL role to be used as an +// event within the event stream. +type ACLRoleStreamEvent struct { + ACLRole *ACLRole +} diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index 7a9b5bef05c..de6f1cae716 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -33,6 +33,7 @@ by default, requiring a management token. | `*` | `management` | | `ACLToken` | `management` | | `ACLPolicy` | `management` | +| `ACLRole` | `management` | | `Job` | `namespace:read-job` | | `Allocation` | `namespace:read-job` | | `Deployment` | `namespace:read-job` | @@ -67,6 +68,7 @@ by default, requiring a management token. | ---------- | ------------------------------- | | ACLToken | ACLToken | | ACLPolicy | ACLPolicy | +| ACLRoles | ACLRole | | Allocation | Allocation (no job information) | | Job | Job | | Evaluation | Evaluation | @@ -83,6 +85,8 @@ by default, requiring a management token. | ACLTokenDeleted | | ACLPolicyUpserted | | ACLPolicyDeleted | +| ACLRoleUpserted | +| ACLRoleDeleted | | AllocationCreated | | AllocationUpdated | | AllocationUpdateDesiredStatus |